Skip to content

Commit

Permalink
[FLINK-27931][sql-gateway] Introduce the SqlGateway to assemble all c…
Browse files Browse the repository at this point in the history
…omponents

This closes apache#19894
  • Loading branch information
fsk119 committed Jul 22, 2022
1 parent 61a9d83 commit 81379a5
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,30 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

/** Test {@link SqlGatewayEndpointFactoryUtils}. */
public class SqlGatewayEndpointFactoryUtilsTest {

@Test
public void testCreateEndpoints() {
Map<String, String> config = getDefaultConfig();
String id = UUID.randomUUID().toString();
Map<String, String> config = getDefaultConfig(id);
config.put("sql-gateway.endpoint.type", "mocked;fake");
List<SqlGatewayEndpoint> actual =
createSqlGatewayEndpoint(
new MockedSqlGatewayService(), Configuration.fromMap(config));
MockedSqlGatewayEndpoint expectedMocked =
new MockedSqlGatewayEndpoint("localhost", 9999, "Hello World.");
assertEquals(
new HashSet<>(Arrays.asList(expectedMocked, FakeSqlGatewayEndpoint.INSTANCE)),
new HashSet<>(actual));
new MockedSqlGatewayEndpoint(id, "localhost", 9999, "Hello World.");
assertThat(actual)
.isEqualTo(Arrays.asList(expectedMocked, FakeSqlGatewayEndpoint.INSTANCE));
}

@Test
Expand Down Expand Up @@ -110,6 +110,7 @@ public void testCreateEndpointWithUnconsumedOptions() {
+ "Supported options:\n\n"
+ "description\n"
+ "host\n"
+ "id\n"
+ "port");
}

Expand All @@ -125,7 +126,12 @@ private void validateException(Map<String, String> config, String errorMessage)
}

private Map<String, String> getDefaultConfig() {
return getDefaultConfig(UUID.randomUUID().toString());
}

private Map<String, String> getDefaultConfig(String id) {
Map<String, String> config = new HashMap<>();
config.put("sql-gateway.endpoint.mocked.id", id);
config.put("sql-gateway.endpoint.type", "mocked");
config.put("sql-gateway.endpoint.mocked.host", "localhost");
config.put("sql-gateway.endpoint.mocked.port", "9999");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,43 @@
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** Mocked {@link SqlGatewayEndpoint}. */
public class MockedSqlGatewayEndpoint implements SqlGatewayEndpoint {

private static final Set<String> RUNNING_ENDPOINTS = ConcurrentHashMap.newKeySet();

private final String id;
private final String host;
private final int port;
private final String description;

public MockedSqlGatewayEndpoint(String host, int port, String description) {
public static boolean isRunning(String id) {
return RUNNING_ENDPOINTS.contains(id);
}

public MockedSqlGatewayEndpoint(String id, String host, int port, String description) {
this.id = id;
this.host = host;
this.port = port;
this.description = description;

if (RUNNING_ENDPOINTS.contains(id)) {
throw new IllegalArgumentException(
"There are endpoints with the same id is running. Please specify an unique identifier.");
}
}

@Override
public void start() throws Exception {
// do nothing
RUNNING_ENDPOINTS.add(id);
}

@Override
public void stop() throws Exception {
// do nothing
RUNNING_ENDPOINTS.remove(id);
}

@Override
Expand All @@ -54,13 +69,14 @@ public boolean equals(Object o) {
return false;
}
MockedSqlGatewayEndpoint that = (MockedSqlGatewayEndpoint) o;
return port == that.port
return Objects.equals(id, that.id)
&& port == that.port
&& Objects.equals(host, that.host)
&& Objects.equals(description, that.description);
}

@Override
public int hashCode() {
return Objects.hash(host, port, description);
return Objects.hash(id, host, port, description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
/** Factory to create the {@link SqlGatewayEndpoint}. */
public class MockedSqlGatewayEndpointFactory implements SqlGatewayEndpointFactory {

public static final ConfigOption<String> ID =
ConfigOptions.key("id").stringType().noDefaultValue();
public static final ConfigOption<String> HOST =
ConfigOptions.key("host").stringType().noDefaultValue();
public static final ConfigOption<Integer> PORT =
Expand All @@ -45,6 +47,7 @@ public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context) {
helper.validate();

return new MockedSqlGatewayEndpoint(
helper.getOptions().get(ID),
helper.getOptions().get(HOST),
helper.getOptions().get(PORT),
helper.getOptions().get(DESCRIPTION));
Expand All @@ -58,6 +61,7 @@ public String factoryIdentifier() {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ID);
options.add(HOST);
options.add(PORT);
return options;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.cli.SqlGatewayOptions;
import org.apache.flink.table.gateway.cli.SqlGatewayOptionsParser;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.session.SessionManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/** Main entry point for the SQL Gateway. */
public class SqlGateway {

private static final Logger LOG = LoggerFactory.getLogger(SqlGateway.class);

private final List<SqlGatewayEndpoint> endpoints;
private final Properties dynamicConfig;
private final CountDownLatch latch;

private SessionManager sessionManager;

public SqlGateway(Properties dynamicConfig) {
this.endpoints = new ArrayList<>();
this.dynamicConfig = dynamicConfig;
this.latch = new CountDownLatch(1);
}

public void start() throws Exception {
DefaultContext context =
DefaultContext.load(ConfigurationUtils.createConfiguration(dynamicConfig));
sessionManager = new SessionManager(context);

sessionManager.start();
SqlGatewayService sqlGatewayService = new SqlGatewayServiceImpl(sessionManager);

endpoints.addAll(
SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(
sqlGatewayService, context.getFlinkConfig()));

for (SqlGatewayEndpoint endpoint : endpoints) {
try {
endpoint.start();
} catch (Throwable t) {
LOG.error("Failed to start the endpoint.", t);
stop();
throw new SqlGatewayException("Failed to start the endpoint.", t);
}
}
}

public void stop() {
for (SqlGatewayEndpoint endpoint : endpoints) {
stopEndpointSilently(endpoint);
}
if (sessionManager != null) {
sessionManager.stop();
}
latch.countDown();
}

public void waitUntilStop() throws Exception {
latch.await();
}

public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "SqlGateway", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);

startSqlGateway(System.out, args);
}

@VisibleForTesting
static void startSqlGateway(PrintStream stream, String[] args) {
SqlGatewayOptions cliOptions = SqlGatewayOptionsParser.parseSqlGatewayOptions(args);

if (cliOptions.isPrintHelp()) {
SqlGatewayOptionsParser.printHelpSqlGateway(stream);
return;
}

SqlGateway gateway = new SqlGateway(cliOptions.getDynamicConfigs());
try {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(gateway));
gateway.start();
gateway.waitUntilStop();
} catch (Throwable t) {
// User uses ctrl + c to cancel the Gateway manually
if (t instanceof InterruptedException) {
LOG.info("Caught " + t.getClass().getSimpleName() + ". Shutting down.");
return;
}
// make space in terminal
stream.println();
stream.println();
LOG.error(
"SqlGateway must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
t);
throw new SqlGatewayException(
"Unexpected exception. This is a bug. Please consider filing an issue.", t);
}
}

private void stopEndpointSilently(SqlGatewayEndpoint endpoint) {
try {
endpoint.stop();
} catch (Exception e) {
LOG.error("Failed to stop the endpoint. Ignore.", e);
}
}

// --------------------------------------------------------------------------------------------

private static class ShutdownThread extends Thread {

private final SqlGateway gateway;

public ShutdownThread(SqlGateway gateway) {
this.gateway = gateway;
}

@Override
public void run() {
// Shutdown the gateway
System.out.println("\nShutting down the Flink SqlGateway...");
LOG.info("Shutting down the Flink SqlGateway...");

try {
gateway.stop();
} catch (Exception e) {
LOG.error("Failed to shut down the Flink SqlGateway: " + e.getMessage(), e);
System.out.println("Failed to shut down the Flink SqlGateway: " + e.getMessage());
}

LOG.info("Flink SqlGateway has been shutdown.");
System.out.println("Flink SqlGateway has been shutdown.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.cli;

import java.util.Properties;

/** Options to configure the {@code SqlGateway}. */
public class SqlGatewayOptions {

private final boolean isPrintHelp;
private final Properties dynamicConfigs;

public SqlGatewayOptions(boolean isPrintHelp, Properties dynamicConfigs) {
this.isPrintHelp = isPrintHelp;
this.dynamicConfigs = dynamicConfigs;
}

public boolean isPrintHelp() {
return isPrintHelp;
}

public Properties getDynamicConfigs() {
return dynamicConfigs;
}
}
Loading

0 comments on commit 81379a5

Please sign in to comment.