Skip to content

Commit

Permalink
[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TA…
Browse files Browse the repository at this point in the history
…SK_SLOTS

This closes apache#5731.
  • Loading branch information
zhouhai02 authored and zentol committed Apr 24, 2018
1 parent 1a9675d commit 73e9f90
Show file tree
Hide file tree
Showing 36 changed files with 63 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
Expand Down Expand Up @@ -139,7 +140,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();

final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
Expand Down Expand Up @@ -220,7 +221,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

try {
// TODO: Set job's default parallelism to max number of slots
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

Expand Down Expand Up @@ -265,7 +266,7 @@ public void endSession(JobID jobID) throws Exception {

private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());

newConfiguration.addAll(baseConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.client.deployment;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand Down Expand Up @@ -66,7 +65,7 @@ public String toString() {
}

public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {

final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static void main(String[] args) throws Exception {

final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.storm.api;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void submitTopologyWithOpts(final String topologyName, final Map conf, fi
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public static Configuration generateTaskManagerConfiguration(

cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
if (numSlots != -1){
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}

return cfg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -167,7 +168,7 @@ public Builder setCommonBindAddress(String commonBindAddress) {

public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new Configuration(configuration);
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager);
modifiedConfiguration.setString(
RestOptions.ADDRESS,
modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public String getTaskManagerStdoutPath() {
// --------------------------------------------------------------------------------------------

public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

if (numberSlots == -1) {
numberSlots = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static TaskManagerServicesConfiguration fromConfiguration(
boolean localCommunication) throws Exception {

// we need this because many configs have been written with a "-1" entry
int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (slots == -1) {
slots = 1;
}
Expand Down Expand Up @@ -290,7 +290,7 @@ private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfigurat
checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");

checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
"Number of task slots must be at least one.");

final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void testCoordinatorShutsDownOnFailure() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();

Expand Down Expand Up @@ -128,7 +129,7 @@ public void testCoordinatorShutsDownOnSuccess() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
public static void setUp() throws Exception {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
Expand Down Expand Up @@ -146,7 +147,7 @@ protected void run() {

try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception {

flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand Down Expand Up @@ -641,7 +640,7 @@ public void testKvStateMessages() throws Exception {

Configuration tmConfig = new Configuration();
tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);

ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
Expand Down Expand Up @@ -1300,7 +1299,7 @@ public void testSavepointRestoreSettings() throws Exception {
archiver = new AkkaActorGateway(master._2(), leaderId);

Configuration tmConfig = new Configuration();
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);

ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void before() throws TimeoutException, InterruptedException {

configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);

configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void before() throws Exception {

configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);

highAvailabilityServices = new TestingManualHighAvailabilityServices();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testBackPressuredProducer() throws Exception {
config,
highAvailabilityServices);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);

taskManager = TestingUtils.createTaskManager(
testActorSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testTaskClearedWhileSampling() throws Exception {
config,
highAvailabilityServices);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);

taskManager = TestingUtils.createTaskManager(
testActorSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)

config.setBoolean(SecurityOptions.SSL_ENABLED, true)
Expand All @@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)

config.setBoolean(SecurityOptions.SSL_ENABLED, true)
Expand All @@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
"start with akka ssl disabled" in {

val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, false)

Expand All @@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {

val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")

Expand All @@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {

val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions}
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
Expand Down Expand Up @@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
heartbeatTimeout: String)
: TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
new TestingCluster(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object TestingUtils {
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
config.setString(AkkaOptions.ASK_TIMEOUT, timeout)

Expand Down
Loading

0 comments on commit 73e9f90

Please sign in to comment.