Skip to content

Commit

Permalink
[FLINK-10425][runtime] Use TaskManagerOptions#HOST
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored and zentol committed Oct 24, 2018
1 parent 5953bb6 commit 5fa36c9
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.mesos.runtime.clusterframework;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
Expand Down Expand Up @@ -232,7 +232,7 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation al
.matcher(taskManagerHostnameOption.get())
.replaceAll(Matcher.quoteReplacement(taskID.getValue()));

dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname);
dynamicProperties.setString(TaskManagerOptions.HOST, taskManagerHostname);
}

// take needed ports for the TM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.minicluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -85,7 +84,7 @@ public String getJobManagerBindAddress() {
public String getTaskManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
configuration.getString(TaskManagerOptions.HOST, "localhost");
}

public String getResourceManagerBindAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
Expand Down Expand Up @@ -405,7 +404,7 @@ public static RpcService createRpcService(
checkNotNull(configuration);
checkNotNull(haServices);

String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,8 +1735,7 @@ object TaskManager {
highAvailabilityServices: HighAvailabilityServices)
: (String, java.util.Iterator[Integer]) = {

var taskManagerHostname = configuration.getString(
ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
var taskManagerHostname = configuration.getString(TaskManagerOptions.HOST)

if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskmanager;

import net.jcip.annotations.NotThreadSafe;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void testUsePreconfiguredNetworkInterface() throws Exception {
final String TEST_HOST_NAME = "testhostname";

Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);

Expand All @@ -89,7 +88,7 @@ public void testUsePreconfiguredNetworkInterface() throws Exception {
public void testActorSystemPortConfig() throws Exception {
// config with pre-configured hostname to speed up tests (no interface selection)
Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
config.setString(TaskManagerOptions.HOST, "localhost");
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand Down Expand Up @@ -247,7 +246,7 @@ public void testStartupWhenNetworkStackFailsToInitialize() throws Exception {
blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost"));

final Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
cfg.setString(TaskManagerOptions.HOST, "localhost");
cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort());
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "1m");
ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AkkaSslITCase(_system: ActorSystem)

val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setString(TaskManagerOptions.HOST, "127.0.0.1")
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)

Expand All @@ -77,7 +77,7 @@ class AkkaSslITCase(_system: ActorSystem)

val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setString(TaskManagerOptions.HOST, "127.0.0.1")
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand Down Expand Up @@ -77,7 +76,7 @@ private Configuration getConfiguration() {

Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, addressString);
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
config.setString(TaskManagerOptions.HOST, addressString);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.yarn;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -133,7 +133,7 @@ private static void run(String[] args) {
// use the hostname passed by job manager
final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
if (taskExecutorHostname != null) {
configuration.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskExecutorHostname);
configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
}

SecurityUtils.install(sc);
Expand Down

0 comments on commit 5fa36c9

Please sign in to comment.