Skip to content

Commit

Permalink
[hotfix] Add setupFlinkConfig to KubernetesTestBase to make sure flin…
Browse files Browse the repository at this point in the history
…kConfig is always setup before used
  • Loading branch information
KarmaGYZ authored and tillrohrmann committed May 17, 2020
1 parent f631b49 commit 83cd281
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,17 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
@Rule
public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
}

@Before
public void setup() throws Exception {
super.setup();

final Deployment mockDeployment = new DeploymentBuilder()
.editOrNewMetadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ public class KubernetesTestBase extends TestLogger {

protected FlinkKubeClient flinkKubeClient;

@Before
public void setup() throws Exception {
protected void setupFlinkConfig() {
flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, NAMESPACE);
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, CONTAINER_IMAGE);
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
}

@Before
public void setup() throws Exception {
setupFlinkConfig();

flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {

private KubernetesJobManagerSpecification kubernetesJobManagerSpecification;

@Before
public void setup() throws Exception {
super.setup();

KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "logback.xml");
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "log4j.properties");
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);
flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS);
Expand All @@ -84,6 +81,14 @@ public void setup() throws Exception {
flinkConfig.set(BlobServerOptions.PORT, Integer.toString(BLOB_SERVER_PORT));
flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, JOB_MANAGER_CPU);
flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
}

@Before
public void setup() throws Exception {
super.setup();

KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "logback.xml");
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "log4j.properties");

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(JOB_MANAGER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public class KubernetesJobManagerTestBase extends KubernetesTestBase {

protected FlinkPod baseFlinkPod;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

this.flinkConfig.set(RestOptions.PORT, REST_PORT);
this.flinkConfig.set(RestOptions.BIND_PORT, REST_BIND_PORT);
Expand All @@ -85,6 +85,11 @@ public void setup() throws Exception {
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector);
this.flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
}

@Before
public void setup() throws Exception {
super.setup();

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(JOB_MANAGER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,22 @@ public class KubernetesTaskManagerTestBase extends KubernetesTestBase {

protected FlinkPod baseFlinkPod = new FlinkPod.Builder().build();

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(TaskManagerOptions.RPC_PORT, String.valueOf(RPC_PORT));
flinkConfig.set(TaskManagerOptions.CPU_CORES, TASK_MANAGER_CPU);
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(TOTAL_PROCESS_MEMORY + "m"));
customizedEnvs.forEach((k, v) ->
flinkConfig.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + k, v));
flinkConfig.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + k, v));
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_LABELS, userLabels);
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_NODE_SELECTOR, nodeSelector);
}

@Before
public void setup() throws Exception {
super.setup();

taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(flinkConfig);
containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {

private FlinkConfMountDecorator flinkConfMountDecorator;

@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

this.flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD);
}

@Before
public void setup() throws Exception {
super.setup();

this.flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD);

this.flinkConfMountDecorator = new FlinkConfMountDecorator(kubernetesJobManagerParameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
private Pod resultPod;
private Container resultMainContainer;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
}

@Before
public void setup() throws Exception {
super.setup();

final InitJobManagerDecorator initJobManagerDecorator =
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
private Pod resultPod;
private Container resultMainContainer;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
}

@Before
public void setup() throws Exception {
super.setup();

final InitTaskManagerDecorator initTaskManagerDecorator =
new InitTaskManagerDecorator(kubernetesTaskManagerParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ public class JavaCmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase

private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD);
flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, FLINK_LOG_DIR_IN_POD);
flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS);
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, KUBERNETES_ENTRY_PATH);
}

@Before
public void setup() throws Exception {
super.setup();

this.javaCmdJobManagerDecorator = new JavaCmdJobManagerDecorator(kubernetesJobManagerParameters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,18 @@ public class JavaCmdTaskManagerDecoratorTest extends KubernetesTaskManagerTestBa

private JavaCmdTaskManagerDecorator javaCmdTaskManagerDecorator;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, KUBERNETES_ENTRY_PATH);
flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD);
flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, FLINK_LOG_DIR_IN_POD);
}

@Before
public void setup() throws Exception {
super.setup();

this.mainClassArgs = String.format(
"%s--configDir %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,21 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas

protected KubernetesJobManagerSpecification kubernetesJobManagerSpecification;

@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS);
flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
}

@Before
public void setup() throws Exception {
super.setup();

KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "logback.xml");
KubernetesTestUtils.createTemporyFile("some data", flinkConfDir, "log4j.properties");

flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, ENTRY_POINT_CLASS);
flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);

this.kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,21 @@ public class KubernetesTaskManagerParametersTest extends KubernetesTestBase {

private KubernetesTaskManagerParameters kubernetesTaskManagerParameters;

@Before
public void setup() throws Exception {
super.setup();
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();

flinkConfig.set(TaskManagerOptions.CPU_CORES, TASK_MANAGER_CPU);
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(TASK_MANAGER_MEMORY + "m"));
flinkConfig.set(TaskManagerOptions.RPC_PORT, String.valueOf(RPC_PORT));

customizedEnvs.forEach((k, v) ->
flinkConfig.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + k, v));
}

@Before
public void setup() throws Exception {
super.setup();

final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(flinkConfig);
Expand Down

0 comments on commit 83cd281

Please sign in to comment.