Skip to content

Commit

Permalink
[FLINK-7823][QS] Update Queryable State configuration parameters.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Nov 7, 2017
1 parent e8931bd commit 84746a8
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public class QueryableStateOptions {
// Server Options
// ------------------------------------------------------------------------

/** Flag to indicate whether to start the queryable state server. */
public static final ConfigOption<Boolean> SERVER_ENABLE =
key("query.server.enable")
.defaultValue(true);

/**
* The config parameter defining the server port range of the queryable state proxy.
*
Expand All @@ -59,6 +54,16 @@ public class QueryableStateOptions {
key("query.proxy.ports")
.defaultValue("9069");

/** Number of network (event loop) threads for the client proxy (0 => #slots). */
public static final ConfigOption<Integer> PROXY_NETWORK_THREADS =
key("query.proxy.network-threads")
.defaultValue(0);

/** Number of async query threads for the client proxy (0 => #slots). */
public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS =
key("query.proxy.query-threads")
.defaultValue(0);

/**
* The config parameter defining the server port range of the queryable state server.
*
Expand Down Expand Up @@ -100,16 +105,6 @@ public class QueryableStateOptions {
key("query.client.network-threads")
.defaultValue(0);

/** Number of retries on location lookup failures. */
public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRIES =
key("query.client.lookup.num-retries")
.defaultValue(3);

/** Retry delay on location lookup failures (millis). */
public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRY_DELAY =
key("query.client.lookup.retry-delay")
.defaultValue(1000);

// ------------------------------------------------------------------------

/** Not intended to be instantiated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,16 @@ public InetSocketAddress getServerAddress() {
*/
public void start() throws Throwable {
Preconditions.checkState(serverAddress == null,
"Server " + serverName + " already running @ " + serverAddress + '.');
"The " + serverName + " already running @ " + serverAddress + '.');

Iterator<Integer> portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {}

if (serverAddress != null) {
LOG.info("Started server {} @ {}.", serverName, serverAddress);
LOG.info("Started the {} @ {}.", serverName, serverAddress);
} else {
LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName);
throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied.");
LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName);
throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void setup(int proxyPortRangeStart, int serverPortRangeStart) {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS));
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testServerInitializationFailure() throws Throwable {

// the expected exception along with the adequate message
expectedEx.expect(FlinkRuntimeException.class);
expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied.");
expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied.");

TestServer server1 = null;
TestServer server2 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void start() throws IOException {
if (kvStateServer != null) {
try {
kvStateServer.start();
LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress());
LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress());
} catch (Throwable ie) {
kvStateServer.shutdown();
kvStateServer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.util.NetUtils;

import java.util.Iterator;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -27,44 +30,43 @@
*/
public class QueryableStateConfiguration {

private final boolean enabled;

private final Iterator<Integer> proxyPortRange;

private final Iterator<Integer> qserverPortRange;

private final int numProxyThreads;

private final int numPQueryThreads;

private final int numServerThreads;

private final int numQueryThreads;
private final int numSQueryThreads;

public QueryableStateConfiguration(
boolean enabled,
Iterator<Integer> proxyPortRange,
Iterator<Integer> qserverPortRange,
int numProxyThreads,
int numPQueryThreads,
int numServerThreads,
int numQueryThreads) {
int numSQueryThreads) {

checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext()));
checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext()));
checkArgument(proxyPortRange != null && proxyPortRange.hasNext());
checkArgument(qserverPortRange != null && qserverPortRange.hasNext());
checkArgument(numProxyThreads >= 0, "queryable state number of server threads must be zero or larger");
checkArgument(numPQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger");
checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger");
checkArgument(numSQueryThreads >= 0, "queryable state number of query threads must be zero or larger");

this.enabled = enabled;
this.proxyPortRange = proxyPortRange;
this.qserverPortRange = qserverPortRange;
this.numProxyThreads = numProxyThreads;
this.numPQueryThreads = numPQueryThreads;
this.numServerThreads = numServerThreads;
this.numQueryThreads = numQueryThreads;
this.numSQueryThreads = numSQueryThreads;
}

// ------------------------------------------------------------------------

/**
* Returns whether queryable state is enabled.
*/
public boolean isEnabled() {
return enabled;
}

/**
* Returns the port range where the queryable state client proxy can listen.
* See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}.
Expand All @@ -85,26 +87,43 @@ public Iterator<Integer> getStateServerPortRange() {
* Returns the number of threads for the query server NIO event loop.
* These threads only process network events and dispatch query requests to the query threads.
*/
public int numServerThreads() {
public int numProxyServerThreads() {
return numProxyThreads;
}

/**
* Returns the number of threads for the thread pool that performs the actual state lookup.
* These threads perform the actual state lookup.
*/
public int numProxyQueryThreads() {
return numPQueryThreads;
}

/**
* Returns the number of threads for the query server NIO event loop.
* These threads only process network events and dispatch query requests to the query threads.
*/
public int numStateServerThreads() {
return numServerThreads;
}

/**
* Returns the number of threads for the thread pool that performs the actual state lookup.
* These threads perform the actual state lookup.
*/
public int numQueryThreads() {
return numQueryThreads;
public int numStateQueryThreads() {
return numSQueryThreads;
}

// ------------------------------------------------------------------------

@Override
public String toString() {
return "QueryableStateConfiguration {" +
"enabled=" + enabled +
", numServerThreads=" + numServerThreads +
", numQueryThreads=" + numQueryThreads +
return "QueryableStateConfiguration{" +
"numProxyServerThreads=" + numProxyThreads +
", numProxyQueryThreads=" + numPQueryThreads +
", numStateServerThreads=" + numServerThreads +
", numStateQueryThreads=" + numSQueryThreads +
'}';
}

Expand All @@ -114,6 +133,8 @@ public String toString() {
* Gets the configuration describing the queryable state as deactivated.
*/
public static QueryableStateConfiguration disabled() {
return new QueryableStateConfiguration(false, null, null, 0, 0);
final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue());
final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue());
return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,33 +327,35 @@ private static NetworkEnvironment createNetworkEnvironment(
TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

KvStateRegistry kvStateRegistry = new KvStateRegistry();
KvStateClientProxy kvClientProxy = null;
KvStateServer kvStateServer = null;

if (taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) {
QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

int numNetworkThreads = qsConfig.numServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numServerThreads();

int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads();

kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numNetworkThreads,
numQueryThreads,
new DisabledKvStateRequestStats());

kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numNetworkThreads,
numQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());
}

QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
new DisabledKvStateRequestStats());

int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numStateServerNetworkThreads,
numStateServerQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());

// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,23 +400,27 @@ public static boolean hasNewNetworkBufConf(final Configuration config) {
* Creates the {@link QueryableStateConfiguration} from the given Configuration.
*/
private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) {
final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE);

if (enabled) {
final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));

final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads, numQueryThreads);
}
else {
return QueryableStateConfiguration.disabled();
}

final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));

final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);

final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);

return new QueryableStateConfiguration(
proxyPorts,
serverPorts,
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
numStateServerNetworkThreads,
numStateServerQueryThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ class LocalFlinkMiniCluster(
config.addAll(userConfiguration)
setMemory(config)
initializeIOFormatClasses(config)

// Disable queryable state server if nothing else is configured explicitly
if (!config.containsKey(QueryableStateOptions.SERVER_ENABLE.key())) {
LOG.info("Disabled queryable state server")
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, false)
}

config
}

Expand Down

0 comments on commit 84746a8

Please sign in to comment.