From e8182dcd0987a7326a8c73df902c156e44a7cebf Mon Sep 17 00:00:00 2001 From: eaglewatcherwb Date: Fri, 4 Jan 2019 16:35:41 +0800 Subject: [PATCH] [FLINK-10866] Only load queryable state when explicitly configured Change-Id: Id79c8bf97002a387a80be563a43dce3210143dc2 --- .../queryable_state_configuration.html | 5 +++++ .../configuration/QueryableStateOptions.java | 10 ++++++++++ .../HAQueryableStateFsBackendITCase.java | 1 + .../HAQueryableStateRocksDBBackendITCase.java | 1 + .../NonHAQueryableStateFsBackendITCase.java | 1 + ...onHAQueryableStateRocksDBBackendITCase.java | 1 + .../runtime/io/network/NetworkEnvironment.java | 4 ++-- .../taskexecutor/TaskManagerServices.java | 18 ++++++++++++------ .../TaskManagerServicesConfiguration.java | 9 +++++++-- 9 files changed, 40 insertions(+), 10 deletions(-) diff --git a/docs/_includes/generated/queryable_state_configuration.html b/docs/_includes/generated/queryable_state_configuration.html index c457c40e14715..5940fdc822991 100644 --- a/docs/_includes/generated/queryable_state_configuration.html +++ b/docs/_includes/generated/queryable_state_configuration.html @@ -42,5 +42,10 @@ 0 Number of query Threads for queryable state server. Uses the number of slots if set to 0. + +
queryable-state.enable
+ false + Option whether the queryable state proxy and server should be enabled where possible and configurable. + diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java index 6a8041926b7dc..20c6b53a2090e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java @@ -106,6 +106,16 @@ public class QueryableStateOptions { .defaultValue(0) .withDescription("Number of query Threads for queryable state server. Uses the number of slots if set to 0."); + /** Option whether the queryable state proxy and server should be enabled where possible and configurable. + * + *

Queryable state proxy and server are still more experimental features, hence disabled unless they are enable + * in user configuration. */ + public static final ConfigOption ENABLE_QUERYABLE_STATE_PROXY_SERVER = + key("queryable-state.enable") + .defaultValue(false) + .withDescription("Option whether the queryable state proxy and server should be enabled where possible" + + " and configurable."); + // ------------------------------------------------------------------------ // Client Options // ------------------------------------------------------------------------ diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index ef6a267df0fbd..43fdf13a4bb21 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -95,6 +95,7 @@ public static void tearDown() throws Exception { private static Configuration getConfig() throws Exception { Configuration config = new Configuration(); + config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index 46202c322cb5c..8a622436977fb 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -95,6 +95,7 @@ public static void tearDown() throws Exception { private static Configuration getConfig() throws Exception { Configuration config = new Configuration(); + config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index f1382556eb7c7..183ad9e59b5c1 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -79,6 +79,7 @@ public static void tearDown() { private static Configuration getConfig() { Configuration config = new Configuration(); + config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index 3ad0dda52e2be..34713f38f0209 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -78,6 +78,7 @@ public static void tearDown() { private static Configuration getConfig() { Configuration config = new Configuration(); + config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d124456c213a1..89e23dac1b23e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -345,7 +345,7 @@ public void start() throws IOException { } catch (Throwable ie) { kvStateServer.shutdown(); kvStateServer = null; - throw new IOException("Failed to start the Queryable State Data Server.", ie); + LOG.error("Failed to start the Queryable State Data Server.", ie); } } @@ -355,7 +355,7 @@ public void start() throws IOException { } catch (Throwable ie) { kvStateProxy.shutdown(); kvStateProxy = null; - throw new IOException("Failed to start the Queryable State Client Proxy.", ie); + LOG.error("Failed to start the Queryable State Client Proxy.", ie); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index c46d800bc9534..f3feb2dab7199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -419,32 +419,38 @@ private static NetworkEnvironment createNetworkEnvironment( QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); - int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? + KvStateClientProxy kvClientProxy = null; + KvStateServer kvStateServer = null; + + if (qsConfig != null) { + int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads(); - int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? + int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads(); - final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy( + + kvClientProxy = QueryableStateUtils.createKvStateClientProxy( taskManagerServicesConfiguration.getTaskManagerAddress(), qsConfig.getProxyPortRange(), numProxyServerNetworkThreads, numProxyServerQueryThreads, new DisabledKvStateRequestStats()); - int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? + int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads(); - int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? + int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads(); - final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer( + kvStateServer = QueryableStateUtils.createKvStateServer( taskManagerServicesConfiguration.getTaskManagerAddress(), qsConfig.getStateServerPortRange(), numStateServerNetworkThreads, numStateServerQueryThreads, kvStateRegistry, new DisabledKvStateRequestStats()); + } // we start the network first, to make sure it can allocate its buffers first return new NetworkEnvironment( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index c27e54d764972..5acbddebfeda7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Iterator; @@ -66,6 +67,7 @@ public class TaskManagerServicesConfiguration { private final NetworkEnvironmentConfiguration networkConfig; + @Nullable private final QueryableStateConfiguration queryableStateConfig; /** @@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration( String[] localRecoveryStateRootDirectories, boolean localRecoveryEnabled, NetworkEnvironmentConfiguration networkConfig, - QueryableStateConfiguration queryableStateConfig, + @Nullable QueryableStateConfiguration queryableStateConfig, int numberOfSlots, long configuredMemory, MemoryType memoryType, @@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration( this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories); this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled); this.networkConfig = checkNotNull(networkConfig); - this.queryableStateConfig = checkNotNull(queryableStateConfig); + this.queryableStateConfig = queryableStateConfig; this.numberOfSlots = checkNotNull(numberOfSlots); this.configuredMemory = configuredMemory; @@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final Configuration config) { * Creates the {@link QueryableStateConfiguration} from the given Configuration. */ private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) { + if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) { + return null; + } final Iterator proxyPorts = NetUtils.getPortRangeFromString( config.getString(QueryableStateOptions.PROXY_PORT_RANGE));