Skip to content

Commit

Permalink
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 16, 2018
1 parent 8eb4604 commit b1f3ca3
Showing 1 changed file with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
Expand Down Expand Up @@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource {

private ClusterClient<?> clusterClient;

private Configuration restClusterClientConfig;

private int numberSlots = -1;

private TestEnvironment executionEnvironment;
Expand Down Expand Up @@ -117,6 +121,10 @@ public ClusterClient<?> getClusterClient() {
return clusterClient;
}

public Configuration getClientConfiguration() {
return restClusterClientConfig;
}

public TestEnvironment getTestEnvironment() {
return executionEnvironment;
}
Expand Down Expand Up @@ -194,6 +202,9 @@ private void startLegacyMiniCluster() throws Exception {
if (enableClusterClient) {
clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
}
Configuration restClientConfig = new Configuration();
restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
}

private void startMiniCluster() throws Exception {
Expand Down Expand Up @@ -229,6 +240,10 @@ private void startMiniCluster() throws Exception {
if (enableClusterClient) {
clusterClient = new MiniClusterClient(configuration, miniCluster);
}
Configuration restClientConfig = new Configuration();
restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
}

/**
Expand Down

0 comments on commit b1f3ca3

Please sign in to comment.