Skip to content

Commit

Permalink
[FLINK-28586][runtime] Let SourceCoordinator context know whether `su…
Browse files Browse the repository at this point in the history
…pportsConcurrentExecutionAttempts`
  • Loading branch information
zhuzhurk committed Jul 22, 2022
1 parent 79d93f2 commit 9af271f
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ protected void initialize(
for (final SerializedValue<OperatorCoordinator.Provider> provider :
coordinatorProviders) {
coordinators.add(
OperatorCoordinatorHolder.create(
provider, this, graph.getUserClassLoader(), coordinatorStore));
createOperatorCoordinatorHolder(
provider, graph.getUserClassLoader(), coordinatorStore));
}
} catch (Exception | LinkageError e) {
IOUtils.closeAllQuietly(coordinators);
Expand Down Expand Up @@ -278,6 +278,15 @@ protected ExecutionVertex createExecutionVertex(
initialAttemptCount);
}

protected OperatorCoordinatorHolder createOperatorCoordinatorHolder(
SerializedValue<OperatorCoordinator.Provider> provider,
ClassLoader classLoader,
CoordinatorStore coordinatorStore)
throws Exception {
return OperatorCoordinatorHolder.create(
provider, this, classLoader, coordinatorStore, false);
}

public boolean isInitialized() {
return taskVertices != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.util.SerializedValue;

/** The ExecutionJobVertex which supports speculative execution. */
public class SpeculativeExecutionJobVertex extends ExecutionJobVertex {
Expand Down Expand Up @@ -53,6 +57,16 @@ protected ExecutionVertex createExecutionVertex(
initialAttemptCount);
}

@Override
protected OperatorCoordinatorHolder createOperatorCoordinatorHolder(
SerializedValue<OperatorCoordinator.Provider> provider,
ClassLoader classLoader,
CoordinatorStore coordinatorStore)
throws Exception {
return OperatorCoordinatorHolder.create(
provider, this, classLoader, coordinatorStore, true);
}

/** Factory to create {@link SpeculativeExecutionJobVertex}. */
public static class Factory extends ExecutionJobVertex.Factory {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,16 @@ interface Context {
ClassLoader getUserCodeClassloader();

/**
* Get the {@link CoordinatorStore} instance for sharring information between {@link
* Gets the {@link CoordinatorStore} instance for sharing information between {@link
* OperatorCoordinator}s.
*/
CoordinatorStore getCoordinatorStore();

/**
* Gets that whether the coordinator supports an execution vertex to have multiple
* concurrent running execution attempts.
*/
boolean isConcurrentExecutionAttemptsSupported();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ public static OperatorCoordinatorHolder create(
SerializedValue<OperatorCoordinator.Provider> serializedProvider,
ExecutionJobVertex jobVertex,
ClassLoader classLoader,
CoordinatorStore coordinatorStore)
CoordinatorStore coordinatorStore,
boolean supportsConcurrentExecutionAttempts)
throws Exception {

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
Expand All @@ -444,7 +445,8 @@ public static OperatorCoordinatorHolder create(
jobVertex.getGraph().getUserClassLoader(),
jobVertex.getParallelism(),
jobVertex.getMaxParallelism(),
taskAccesses);
taskAccesses,
supportsConcurrentExecutionAttempts);
}
}

Expand All @@ -457,7 +459,8 @@ static OperatorCoordinatorHolder create(
final ClassLoader userCodeClassLoader,
final int operatorParallelism,
final int operatorMaxParallelism,
final SubtaskAccess.SubtaskAccessFactory taskAccesses)
final SubtaskAccess.SubtaskAccessFactory taskAccesses,
final boolean supportsConcurrentExecutionAttempts)
throws Exception {

final LazyInitializedCoordinatorContext context =
Expand All @@ -466,7 +469,8 @@ static OperatorCoordinatorHolder create(
operatorName,
userCodeClassLoader,
operatorParallelism,
coordinatorStore);
coordinatorStore,
supportsConcurrentExecutionAttempts);

final OperatorCoordinator coordinator = coordinatorProvider.create(context);

Expand Down Expand Up @@ -503,6 +507,7 @@ private static final class LazyInitializedCoordinatorContext
private final ClassLoader userCodeClassLoader;
private final int operatorParallelism;
private final CoordinatorStore coordinatorStore;
private final boolean supportsConcurrentExecutionAttempts;

private GlobalFailureHandler globalFailureHandler;
private Executor schedulerExecutor;
Expand All @@ -514,12 +519,14 @@ public LazyInitializedCoordinatorContext(
final String operatorName,
final ClassLoader userCodeClassLoader,
final int operatorParallelism,
final CoordinatorStore coordinatorStore) {
final CoordinatorStore coordinatorStore,
final boolean supportsConcurrentExecutionAttempts) {
this.operatorId = checkNotNull(operatorId);
this.operatorName = checkNotNull(operatorName);
this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
this.operatorParallelism = operatorParallelism;
this.coordinatorStore = checkNotNull(coordinatorStore);
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
}

void lazyInitialize(GlobalFailureHandler globalFailureHandler, Executor schedulerExecutor) {
Expand Down Expand Up @@ -588,5 +595,10 @@ public ClassLoader getUserCodeClassloader() {
public CoordinatorStore getCoordinatorStore() {
return coordinatorStore;
}

@Override
public boolean isConcurrentExecutionAttemptsSupported() {
return supportsConcurrentExecutionAttempts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public CoordinatorStore getCoordinatorStore() {
return context.getCoordinatorStore();
}

@Override
public boolean isConcurrentExecutionAttemptsSupported() {
return context.isConcurrentExecutionAttemptsSupported();
}

@VisibleForTesting
synchronized void quiesce() {
quiesced = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
coordinatorThreadFactory;
private final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
private final String coordinatorThreadName;
private final boolean supportsConcurrentExecutionAttempts;
private volatile boolean closed;

public SourceCoordinatorContext(
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer) {
SimpleVersionedSerializer<SplitT> splitSerializer,
boolean supportsConcurrentExecutionAttempts) {
this(
Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
Executors.newScheduledThreadPool(
Expand All @@ -110,7 +112,8 @@ public SourceCoordinatorContext(
coordinatorThreadFactory,
operatorCoordinatorContext,
splitSerializer,
new SplitAssignmentTracker<>());
new SplitAssignmentTracker<>(),
supportsConcurrentExecutionAttempts);
}

// Package private method for unit test.
Expand All @@ -121,7 +124,8 @@ public SourceCoordinatorContext(
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
OperatorCoordinator.Context operatorCoordinatorContext,
SimpleVersionedSerializer<SplitT> splitSerializer,
SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
SplitAssignmentTracker<SplitT> splitAssignmentTracker,
boolean supportsConcurrentExecutionAttempts) {
this.workerExecutor = workerExecutor;
this.coordinatorExecutor = coordinatorExecutor;
this.coordinatorThreadFactory = coordinatorThreadFactory;
Expand All @@ -130,6 +134,7 @@ public SourceCoordinatorContext(
this.registeredReaders = new ConcurrentHashMap<>();
this.assignmentTracker = splitAssignmentTracker;
this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
this.subtaskGateways =
new OperatorCoordinator.SubtaskGateway
[operatorCoordinatorContext.currentParallelism()];
Expand All @@ -143,6 +148,10 @@ public SourceCoordinatorContext(
this.notifier = new ExecutorNotifier(workerExecutor, errorHandlingCoordinatorExecutor);
}

boolean isConcurrentExecutionAttemptsSupported() {
return supportsConcurrentExecutionAttempts;
}

@Override
public SplitEnumeratorMetricGroup metricGroup() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
new SourceCoordinatorContext<>(
coordinatorThreadFactory, numWorkerThreads, context, splitSerializer);
coordinatorThreadFactory,
numWorkerThreads,
context,
splitSerializer,
context.isConcurrentExecutionAttemptsSupported());
return new SourceCoordinator<>(
operatorName,
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public CoordinatorStore getCoordinatorStore() {
return coordinatorStore;
}

@Override
public boolean isConcurrentExecutionAttemptsSupported() {
return false;
}

// -------------------------------

public boolean isJobFailed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) {
getClass().getClassLoader(),
3,
1775,
eventTarget);
eventTarget,
false);

holder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
holder.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void testExceptionInRunnableFailsTheJob() throws InterruptedException, Execution
coordinatorThreadFactory,
operatorCoordinatorContext,
new MockSourceSplitSerializer(),
splitSplitAssignmentTracker);
splitSplitAssignmentTracker,
false);

testingContext.runInCoordinatorThread(
() -> {
Expand Down Expand Up @@ -200,7 +201,8 @@ void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedExcep
TEST_OPERATOR_ID.toHexString(), operatorCoordinatorContext),
operatorCoordinatorContext,
new MockSourceSplitSerializer(),
splitSplitAssignmentTracker);
splitSplitAssignmentTracker,
false);

testingContext.callAsync(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorConte
coordinatorThreadFactory,
operatorCoordinatorContext,
new MockSourceSplitSerializer(),
splitSplitAssignmentTracker);
splitSplitAssignmentTracker,
false);
}
}

0 comments on commit 9af271f

Please sign in to comment.