Skip to content

Commit

Permalink
[FLINK-8778] Port queryable state ITCases to use MiniClusterResource
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 11, 2018
1 parent 6732669 commit 8365c90
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 335 deletions.

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,102 @@

package org.apache.flink.queryablestate.itcases;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

/**
* Several integration tests for queryable state using the {@link FsStateBackend}.
*/
public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase {
public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final int NUM_JMS = 2;
// NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that
// we always use all TaskManagers so that the JM oracle is always properly re-registered
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;

@BeforeClass
public static void setup() {
setup(9064, 9069);
}
private static final int QS_PROXY_PORT_RANGE_START = 9064;
private static final int QS_SERVER_PORT_RANGE_START = 9069;

@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();

private static TestingServer zkServer;

private static MiniClusterResource miniClusterResource;

@Override
protected AbstractStateBackend createStateBackend() throws Exception {
return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
}

@BeforeClass
public static void setup() throws Exception {
zkServer = new TestingServer();

// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
true);

miniClusterResource.before();

client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START);

clusterClient = miniClusterResource.getClusterClient();
}

@AfterClass
public static void tearDown() throws Exception {
miniClusterResource.after();

client.shutdownAndWait();

zkServer.stop();
zkServer.close();
}

private static Configuration getConfig() throws Exception {

Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);

config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());

config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,103 @@

package org.apache.flink.queryablestate.itcases;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final int NUM_JMS = 2;
// NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that
// we always use all TaskManagers so that the JM oracle is always properly re-registered
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;

@BeforeClass
public static void setup() {
setup(9074, 9079);
}
private static final int QS_PROXY_PORT_RANGE_START = 9074;
private static final int QS_SERVER_PORT_RANGE_START = 9079;

@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();

private static TestingServer zkServer;

private static MiniClusterResource miniClusterResource;

@Override
protected AbstractStateBackend createStateBackend() throws Exception {
return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
}

@BeforeClass
public static void setup() throws Exception {
zkServer = new TestingServer();

// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
true);

miniClusterResource.before();

client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START);

clusterClient = miniClusterResource.getClusterClient();
}

@AfterClass
public static void tearDown() throws Exception {
miniClusterResource.after();

client.shutdownAndWait();

zkServer.stop();
zkServer.close();
}

private static Configuration getConfig() throws Exception {

Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
config.setString(
QueryableStateOptions.PROXY_PORT_RANGE,
QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS));
config.setString(
QueryableStateOptions.SERVER_PORT_RANGE,
QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS));
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);

config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());

config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");

return config;
}

}

This file was deleted.

Loading

0 comments on commit 8365c90

Please sign in to comment.