diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 7ac2156b5bc2d..1a24eda0c672c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -220,8 +220,8 @@ protected void initialize( for (final SerializedValue provider : coordinatorProviders) { coordinators.add( - OperatorCoordinatorHolder.create( - provider, this, graph.getUserClassLoader(), coordinatorStore)); + createOperatorCoordinatorHolder( + provider, graph.getUserClassLoader(), coordinatorStore)); } } catch (Exception | LinkageError e) { IOUtils.closeAllQuietly(coordinators); @@ -278,6 +278,15 @@ protected ExecutionVertex createExecutionVertex( initialAttemptCount); } + protected OperatorCoordinatorHolder createOperatorCoordinatorHolder( + SerializedValue provider, + ClassLoader classLoader, + CoordinatorStore coordinatorStore) + throws Exception { + return OperatorCoordinatorHolder.create( + provider, this, classLoader, coordinatorStore, false); + } + public boolean isInitialized() { return taskVertices != null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java index 76118ef5544ad..b7493274105a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java @@ -21,7 +21,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.operators.coordination.CoordinatorStore; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.util.SerializedValue; /** The ExecutionJobVertex which supports speculative execution. */ public class SpeculativeExecutionJobVertex extends ExecutionJobVertex { @@ -53,6 +57,16 @@ protected ExecutionVertex createExecutionVertex( initialAttemptCount); } + @Override + protected OperatorCoordinatorHolder createOperatorCoordinatorHolder( + SerializedValue provider, + ClassLoader classLoader, + CoordinatorStore coordinatorStore) + throws Exception { + return OperatorCoordinatorHolder.create( + provider, this, classLoader, coordinatorStore, true); + } + /** Factory to create {@link SpeculativeExecutionJobVertex}. */ public static class Factory extends ExecutionJobVertex.Factory { @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index b00bfc91e8bd4..cf0e40d25f6ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -256,10 +256,16 @@ interface Context { ClassLoader getUserCodeClassloader(); /** - * Get the {@link CoordinatorStore} instance for sharring information between {@link + * Gets the {@link CoordinatorStore} instance for sharing information between {@link * OperatorCoordinator}s. */ CoordinatorStore getCoordinatorStore(); + + /** + * Gets that whether the coordinator supports an execution vertex to have multiple + * concurrent running execution attempts. + */ + boolean isConcurrentExecutionAttemptsSupported(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index f2112cfd75177..7049a6b7a30c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -425,7 +425,8 @@ public static OperatorCoordinatorHolder create( SerializedValue serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader, - CoordinatorStore coordinatorStore) + CoordinatorStore coordinatorStore, + boolean supportsConcurrentExecutionAttempts) throws Exception { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { @@ -444,7 +445,8 @@ public static OperatorCoordinatorHolder create( jobVertex.getGraph().getUserClassLoader(), jobVertex.getParallelism(), jobVertex.getMaxParallelism(), - taskAccesses); + taskAccesses, + supportsConcurrentExecutionAttempts); } } @@ -457,7 +459,8 @@ static OperatorCoordinatorHolder create( final ClassLoader userCodeClassLoader, final int operatorParallelism, final int operatorMaxParallelism, - final SubtaskAccess.SubtaskAccessFactory taskAccesses) + final SubtaskAccess.SubtaskAccessFactory taskAccesses, + final boolean supportsConcurrentExecutionAttempts) throws Exception { final LazyInitializedCoordinatorContext context = @@ -466,7 +469,8 @@ static OperatorCoordinatorHolder create( operatorName, userCodeClassLoader, operatorParallelism, - coordinatorStore); + coordinatorStore, + supportsConcurrentExecutionAttempts); final OperatorCoordinator coordinator = coordinatorProvider.create(context); @@ -503,6 +507,7 @@ private static final class LazyInitializedCoordinatorContext private final ClassLoader userCodeClassLoader; private final int operatorParallelism; private final CoordinatorStore coordinatorStore; + private final boolean supportsConcurrentExecutionAttempts; private GlobalFailureHandler globalFailureHandler; private Executor schedulerExecutor; @@ -514,12 +519,14 @@ public LazyInitializedCoordinatorContext( final String operatorName, final ClassLoader userCodeClassLoader, final int operatorParallelism, - final CoordinatorStore coordinatorStore) { + final CoordinatorStore coordinatorStore, + final boolean supportsConcurrentExecutionAttempts) { this.operatorId = checkNotNull(operatorId); this.operatorName = checkNotNull(operatorName); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); this.operatorParallelism = operatorParallelism; this.coordinatorStore = checkNotNull(coordinatorStore); + this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; } void lazyInitialize(GlobalFailureHandler globalFailureHandler, Executor schedulerExecutor) { @@ -588,5 +595,10 @@ public ClassLoader getUserCodeClassloader() { public CoordinatorStore getCoordinatorStore() { return coordinatorStore; } + + @Override + public boolean isConcurrentExecutionAttemptsSupported() { + return supportsConcurrentExecutionAttempts; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index ffab3ffe7dbfd..271eedf9a4142 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -246,6 +246,11 @@ public CoordinatorStore getCoordinatorStore() { return context.getCoordinatorStore(); } + @Override + public boolean isConcurrentExecutionAttemptsSupported() { + return context.isConcurrentExecutionAttemptsSupported(); + } + @VisibleForTesting synchronized void quiesce() { quiesced = true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 4434a18b5dc56..5724ac8393612 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -94,13 +94,15 @@ public class SourceCoordinatorContext coordinatorThreadFactory; private final OperatorCoordinator.SubtaskGateway[] subtaskGateways; private final String coordinatorThreadName; + private final boolean supportsConcurrentExecutionAttempts; private volatile boolean closed; public SourceCoordinatorContext( SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, int numWorkerThreads, OperatorCoordinator.Context operatorCoordinatorContext, - SimpleVersionedSerializer splitSerializer) { + SimpleVersionedSerializer splitSerializer, + boolean supportsConcurrentExecutionAttempts) { this( Executors.newScheduledThreadPool(1, coordinatorThreadFactory), Executors.newScheduledThreadPool( @@ -110,7 +112,8 @@ public SourceCoordinatorContext( coordinatorThreadFactory, operatorCoordinatorContext, splitSerializer, - new SplitAssignmentTracker<>()); + new SplitAssignmentTracker<>(), + supportsConcurrentExecutionAttempts); } // Package private method for unit test. @@ -121,7 +124,8 @@ public SourceCoordinatorContext( SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer splitSerializer, - SplitAssignmentTracker splitAssignmentTracker) { + SplitAssignmentTracker splitAssignmentTracker, + boolean supportsConcurrentExecutionAttempts) { this.workerExecutor = workerExecutor; this.coordinatorExecutor = coordinatorExecutor; this.coordinatorThreadFactory = coordinatorThreadFactory; @@ -130,6 +134,7 @@ public SourceCoordinatorContext( this.registeredReaders = new ConcurrentHashMap<>(); this.assignmentTracker = splitAssignmentTracker; this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName(); + this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; this.subtaskGateways = new OperatorCoordinator.SubtaskGateway [operatorCoordinatorContext.currentParallelism()]; @@ -143,6 +148,10 @@ public SourceCoordinatorContext( this.notifier = new ExecutorNotifier(workerExecutor, errorHandlingCoordinatorExecutor); } + boolean isConcurrentExecutionAttemptsSupported() { + return supportsConcurrentExecutionAttempts; + } + @Override public SplitEnumeratorMetricGroup metricGroup() { return null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index 9c2d44d029da8..3575c13c97411 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -77,7 +77,11 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { SimpleVersionedSerializer splitSerializer = source.getSplitSerializer(); SourceCoordinatorContext sourceCoordinatorContext = new SourceCoordinatorContext<>( - coordinatorThreadFactory, numWorkerThreads, context, splitSerializer); + coordinatorThreadFactory, + numWorkerThreads, + context, + splitSerializer, + context.isConcurrentExecutionAttemptsSupported()); return new SourceCoordinator<>( operatorName, source, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java index bf1498d9a6f83..9bb756e157765 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java @@ -78,6 +78,11 @@ public CoordinatorStore getCoordinatorStore() { return coordinatorStore; } + @Override + public boolean isConcurrentExecutionAttemptsSupported() { + return false; + } + // ------------------------------- public boolean isJobFailed() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index b2694c625cefe..730b9930ab530 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -511,7 +511,8 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) { getClass().getClassLoader(), 3, 1775, - eventTarget); + eventTarget, + false); holder.lazyInitialize(globalFailureHandler, mainThreadExecutor); holder.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 2ada5813d7d62..94a11251c9034 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -166,7 +166,8 @@ void testExceptionInRunnableFailsTheJob() throws InterruptedException, Execution coordinatorThreadFactory, operatorCoordinatorContext, new MockSourceSplitSerializer(), - splitSplitAssignmentTracker); + splitSplitAssignmentTracker, + false); testingContext.runInCoordinatorThread( () -> { @@ -200,7 +201,8 @@ void testCallableInterruptedDuringShutdownDoNotFailJob() throws InterruptedExcep TEST_OPERATOR_ID.toHexString(), operatorCoordinatorContext), operatorCoordinatorContext, new MockSourceSplitSerializer(), - splitSplitAssignmentTracker); + splitSplitAssignmentTracker, + false); testingContext.callAsync( () -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index 6fd36c9f3a7aa..34ea16e1646f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -177,6 +177,7 @@ protected SourceCoordinatorContext getNewSourceCoordinatorConte coordinatorThreadFactory, operatorCoordinatorContext, new MockSourceSplitSerializer(), - splitSplitAssignmentTracker); + splitSplitAssignmentTracker, + false); } }