Skip to content

Commit

Permalink
[FLINK-16600][k8s] Respect the rest.bind-port config option for the K…
Browse files Browse the repository at this point in the history
…ubernetes setup

This closes apache#11705 .
  • Loading branch information
zhengcanbin authored and tisonkun committed Apr 18, 2020
1 parent 6b3e037 commit 7a7aaec
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,8 @@ private ClusterClientProvider<String> deployClusterInternal(

// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig,
TaskManagerOptions.RPC_PORT,
Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
.addNewPort()
.withName(Constants.REST_PORT_NAME)
.withPort(kubernetesJobManagerParameters.getRestPort())
.withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
.endPort()
.endSpec()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public int getRestPort() {
return flinkConfig.getInteger(RestOptions.PORT);
}

public int getRestBindPort() {
return Integer.valueOf(flinkConfig.getString(RestOptions.BIND_PORT));
}

public int getRPCPort() {
return flinkConfig.getInteger(JobManagerOptions.PORT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class Constants {
public static final String LABEL_COMPONENT_TASK_MANAGER = "taskmanager";

// Use fixed port in kubernetes, it needs to be exposed.
public static final int REST_PORT = 8081;
public static final int BLOB_SERVER_PORT = 6124;
public static final int TASK_MANAGER_RPC_PORT = 6122;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class KubernetesJobManagerTestBase extends KubernetesTestBase {
protected static final int JOB_MANAGER_MEMORY = 768;

protected static final int REST_PORT = 9081;
protected static final String REST_BIND_PORT = "9082";
protected static final int RPC_PORT = 7123;
protected static final int BLOB_SERVER_PORT = 8346;

Expand Down Expand Up @@ -74,6 +75,7 @@ public void setup() throws Exception {
super.setup();

this.flinkConfig.set(RestOptions.PORT, REST_PORT);
this.flinkConfig.set(RestOptions.BIND_PORT, REST_BIND_PORT);
this.flinkConfig.set(JobManagerOptions.PORT, RPC_PORT);
this.flinkConfig.set(BlobServerOptions.PORT, Integer.toString(BLOB_SERVER_PORT));
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, JOB_MANAGER_CPU);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testBuildAccompanyingKubernetesResources() throws IOException {
new ServicePortBuilder()
.withName(Constants.REST_PORT_NAME)
.withPort(REST_PORT)
.withNewTargetPort(Integer.valueOf(REST_BIND_PORT))
.build());
assertEquals(expectedServicePorts, restService.getSpec().getPorts());

Expand Down

0 comments on commit 7a7aaec

Please sign in to comment.