Skip to content

Commit

Permalink
[FLINK-10866] Only load queryable state when explicitly configured
Browse files Browse the repository at this point in the history
Change-Id: Id79c8bf97002a387a80be563a43dce3210143dc2
  • Loading branch information
eaglewatcherwb authored and aljoscha committed Jan 10, 2019
1 parent 85dff90 commit e8182dc
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/queryable_state_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@
<td style="word-wrap: break-word;">0</td>
<td>Number of query Threads for queryable state server. Uses the number of slots if set to 0.</td>
</tr>
<tr>
<td><h5>queryable-state.enable</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Option whether the queryable state proxy and server should be enabled where possible and configurable.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ public class QueryableStateOptions {
.defaultValue(0)
.withDescription("Number of query Threads for queryable state server. Uses the number of slots if set to 0.");

/** Option whether the queryable state proxy and server should be enabled where possible and configurable.
*
* <p>Queryable state proxy and server are still more experimental features, hence disabled unless they are enable
* in user configuration. */
public static final ConfigOption<Boolean> ENABLE_QUERYABLE_STATE_PROXY_SERVER =
key("queryable-state.enable")
.defaultValue(false)
.withDescription("Option whether the queryable state proxy and server should be enabled where possible" +
" and configurable.");

// ------------------------------------------------------------------------
// Client Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
private static Configuration getConfig() throws Exception {

Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
private static Configuration getConfig() throws Exception {

Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static void tearDown() {

private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static void tearDown() {

private static Configuration getConfig() {
Configuration config = new Configuration();
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void start() throws IOException {
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
throw new IOException("Failed to start the Queryable State Data Server.", ie);
LOG.error("Failed to start the Queryable State Data Server.", ie);
}
}

Expand All @@ -355,7 +355,7 @@ public void start() throws IOException {
} catch (Throwable ie) {
kvStateProxy.shutdown();
kvStateProxy = null;
throw new IOException("Failed to start the Queryable State Client Proxy.", ie);
LOG.error("Failed to start the Queryable State Client Proxy.", ie);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,32 +419,38 @@ private static NetworkEnvironment createNetworkEnvironment(

QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
KvStateClientProxy kvClientProxy = null;
KvStateServer kvStateServer = null;

if (qsConfig != null) {
int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(

kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
new DisabledKvStateRequestStats());

int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numStateServerNetworkThreads,
numStateServerQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());
}

// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class TaskManagerServicesConfiguration {

private final NetworkEnvironmentConfiguration networkConfig;

@Nullable
private final QueryableStateConfiguration queryableStateConfig;

/**
Expand Down Expand Up @@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration(
String[] localRecoveryStateRootDirectories,
boolean localRecoveryEnabled,
NetworkEnvironmentConfiguration networkConfig,
QueryableStateConfiguration queryableStateConfig,
@Nullable QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
long configuredMemory,
MemoryType memoryType,
Expand All @@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration(
this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
this.networkConfig = checkNotNull(networkConfig);
this.queryableStateConfig = checkNotNull(queryableStateConfig);
this.queryableStateConfig = queryableStateConfig;
this.numberOfSlots = checkNotNull(numberOfSlots);

this.configuredMemory = configuredMemory;
Expand Down Expand Up @@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final Configuration config) {
* Creates the {@link QueryableStateConfiguration} from the given Configuration.
*/
private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) {
return null;
}

final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.PROXY_PORT_RANGE));
Expand Down

0 comments on commit e8182dc

Please sign in to comment.