Skip to content

Commit

Permalink
[FLINK-21135][coord] Adjust parallelism of job for reactive mode
Browse files Browse the repository at this point in the history
This closes apache#15071
  • Loading branch information
rmetzger committed Mar 11, 2021
1 parent 282eb17 commit bc08cff
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.dispatcher;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -33,8 +35,10 @@
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.scheduler.adaptive.ReactiveModeUtils;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.util.Preconditions;

/** Singleton default factory for {@link JobManagerRunnerImpl}. */
public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
Expand All @@ -60,6 +64,15 @@ public JobManagerRunner createJobManagerRunner(
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType());

if (jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE) {
Preconditions.checkState(
slotPoolServiceSchedulerFactory.getSchedulerType()
== JobManagerOptions.SchedulerType.Adaptive,
"Adaptive Scheduler is required for reactive mode");
ReactiveModeUtils.configureJobGraphForReactiveMode(jobGraph);
}

final ShuffleMaster<?> shuffleMaster =
ShuffleServiceLoader.loadShuffleServiceFactory(configuration)
.createShuffleMaster(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class JobVertex implements java.io.Serializable {

private static final String DEFAULT_NAME = "(unnamed vertex)";

public static final int MAX_PARALLELISM_DEFAULT = -1;

// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -77,7 +79,7 @@ public class JobVertex implements java.io.Serializable {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = -1;
private int maxParallelism = MAX_PARALLELISM_DEFAULT;

/** The minimum resource of the vertex. */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public SlotPoolService createSlotPoolService(JobID jid) {
return slotPoolServiceFactory.createSlotPoolService(jid);
}

@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
return schedulerNGFactory.getSchedulerType();
}

@Override
public SchedulerNG createScheduler(
Logger log,
Expand Down Expand Up @@ -143,7 +148,6 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
JobManagerOptions.SchedulerType schedulerType =
ClusterOptions.getSchedulerType(configuration);

if (schedulerType == JobManagerOptions.SchedulerType.Adaptive
&& jobType == JobType.BATCH) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -48,6 +49,13 @@ public interface SlotPoolServiceSchedulerFactory {
*/
SlotPoolService createSlotPoolService(JobID jid);

/**
* Returns the scheduler type this factory is creating.
*
* @return the scheduler type this factory is creating.
*/
JobManagerOptions.SchedulerType getSchedulerType();

/**
* Creates a {@link SchedulerNG}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -126,4 +127,9 @@ public SchedulerNG createInstance(
mainThreadExecutor,
jobStatusListener);
}

@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
return JobManagerOptions.SchedulerType.Ng;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -62,4 +63,6 @@ SchedulerNG createInstance(
FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener)
throws Exception;

JobManagerOptions.SchedulerType getSchedulerType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
Expand Down Expand Up @@ -252,12 +251,6 @@ public AdaptiveScheduler(
declarativeSlotPool::reserveFreeSlot,
declarativeSlotPool::freeReservedSlot);

for (JobVertex vertex : jobGraph.getVertices()) {
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
vertex.setParallelism(1);
}
}

declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);

this.componentMainThreadExecutor = mainThreadExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -107,4 +108,9 @@ public SchedulerNG createInstance(
fatalErrorHandler,
jobStatusListener);
}

@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
return JobManagerOptions.SchedulerType.Adaptive;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for reactive mode. */
public final class ReactiveModeUtils {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveModeUtils.class);

/**
* Sets the parallelism of all vertices in the passed JobGraph to the highest possible max
* parallelism, unless the user defined a maxParallelism.
*
* @param jobGraph The JobGraph to modify.
*/
public static void configureJobGraphForReactiveMode(JobGraph jobGraph) {
LOG.info("Modifying job parallelism for running in reactive mode.");
for (JobVertex vertex : jobGraph.getVertices()) {
if (vertex.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT) {
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
} else {
vertex.setParallelism(vertex.getMaxParallelism());
}
}
}

private ReactiveModeUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,26 @@ public void testFallsBackToDefaultSchedulerIfBatchJob() {
assertThat(
defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory(),
is(instanceOf(DefaultSchedulerFactory.class)));
assertThat(
defaultSlotPoolServiceSchedulerFactory.getSchedulerType(),
is(JobManagerOptions.SchedulerType.Ng));
}

@Test
public void testAdaptiveSchedulerForReactiveMode() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);

final DefaultSlotPoolServiceSchedulerFactory defaultSlotPoolServiceSchedulerFactory =
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, JobType.STREAMING);

assertThat(
defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory(),
is(instanceOf(AdaptiveSchedulerFactory.class)));
assertThat(
defaultSlotPoolServiceSchedulerFactory.getSchedulerType(),
is(JobManagerOptions.SchedulerType.Adaptive));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -119,5 +120,10 @@ public SchedulerNG createInstance(
})
.build();
}

@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
return JobManagerOptions.SchedulerType.Ng;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -72,4 +73,9 @@ public SchedulerNG createInstance(
throws Exception {
return schedulerNG;
}

@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
return JobManagerOptions.SchedulerType.Ng;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ public void testRestoringModifiedJobFromSavepointFails() throws Exception {
// create a new operator
final JobVertex jobVertex = new JobVertex("New operator");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(1);

// this test will fail in the end due to the previously created Savepoint having a state for
// a given OperatorID that does not match any operator of the newly created JobGraph
Expand Down Expand Up @@ -759,6 +760,8 @@ public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSuccee
// create a new operator
final JobVertex jobVertex = new JobVertex("New operator");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(1);

final JobGraph jobGraphWithNewOperator =
TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
savepointRestoreSettings, jobVertex);
Expand Down
Loading

0 comments on commit bc08cff

Please sign in to comment.