diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java new file mode 100644 index 0000000000000..fe94a179c770c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** Default handler for the {@link OperatorCoordinator OperatorCoordinators}. */ +public class DefaultOperatorCoordinatorHandler implements OperatorCoordinatorHandler { + private final ExecutionGraph executionGraph; + + private final Map coordinatorMap; + + private final Consumer globalFailureHandler; + + public DefaultOperatorCoordinatorHandler( + ExecutionGraph executionGraph, Consumer globalFailureHandler) { + this.executionGraph = executionGraph; + + this.coordinatorMap = createCoordinatorMap(executionGraph); + this.globalFailureHandler = globalFailureHandler; + } + + private static Map createCoordinatorMap( + ExecutionGraph executionGraph) { + Map coordinatorMap = new HashMap<>(); + for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) { + for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) { + coordinatorMap.put(holder.operatorId(), holder); + } + } + return coordinatorMap; + } + + @Override + public void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) { + for (OperatorCoordinatorHolder coordinatorHolder : coordinatorMap.values()) { + coordinatorHolder.lazyInitialize(globalFailureHandler, mainThreadExecutor); + } + } + + @Override + public void startAllOperatorCoordinators() { + final Collection coordinators = coordinatorMap.values(); + try { + for (OperatorCoordinatorHolder coordinator : coordinators) { + coordinator.start(); + } + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + coordinators.forEach(IOUtils::closeQuietly); + throw new FlinkRuntimeException("Failed to start the operator coordinators", t); + } + } + + @Override + public void disposeAllOperatorCoordinators() { + coordinatorMap.values().forEach(IOUtils::closeQuietly); + } + + @Override + public void deliverOperatorEventToCoordinator( + final ExecutionAttemptID taskExecutionId, + final OperatorID operatorId, + final OperatorEvent evt) + throws FlinkException { + + // Failure semantics (as per the javadocs of the method): + // If the task manager sends an event for a non-running task or an non-existing operator + // coordinator, then respond with an exception to the call. If task and coordinator exist, + // then we assume that the call from the TaskManager was valid, and any bubbling exception + // needs to cause a job failure. + + final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId); + if (exec == null || exec.getState() != ExecutionState.RUNNING) { + // This situation is common when cancellation happens, or when the task failed while the + // event was just being dispatched asynchronously on the TM side. + // It should be fine in those expected situations to just ignore this event, but, to be + // on the safe, we notify the TM that the event could not be delivered. + throw new TaskNotRunningException( + "Task is not known or in state running on the JobManager."); + } + + final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId); + if (coordinator == null) { + throw new FlinkException("No coordinator registered for operator " + operatorId); + } + + try { + coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + globalFailureHandler.accept(t); + } + } + + @Override + public CompletableFuture deliverCoordinationRequestToCoordinator( + OperatorID operator, CoordinationRequest request) throws FlinkException { + + final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator); + if (coordinatorHolder == null) { + throw new FlinkException("Coordinator of operator " + operator + " does not exist"); + } + + final OperatorCoordinator coordinator = coordinatorHolder.coordinator(); + if (coordinator instanceof CoordinationRequestHandler) { + return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request); + } else { + throw new FlinkException( + "Coordinator of operator " + operator + " cannot handle client event"); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java index d9ca356e523b6..839086d956f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorHandler.java @@ -19,129 +19,52 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; -import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.IOUtils; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; /** Handler for the {@link OperatorCoordinator OperatorCoordinators}. */ -public class OperatorCoordinatorHandler { - private final ExecutionGraph executionGraph; - - private final Map coordinatorMap; - - private final Consumer globalFailureHandler; - - public OperatorCoordinatorHandler( - ExecutionGraph executionGraph, Consumer globalFailureHandler) { - this.executionGraph = executionGraph; - - this.coordinatorMap = createCoordinatorMap(executionGraph); - this.globalFailureHandler = globalFailureHandler; - } - - private Map createCoordinatorMap( - ExecutionGraph executionGraph) { - Map coordinatorMap = new HashMap<>(); - for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) { - for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) { - coordinatorMap.put(holder.operatorId(), holder); - } - } - return coordinatorMap; - } - - public void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) { - for (OperatorCoordinatorHolder coordinatorHolder : coordinatorMap.values()) { - coordinatorHolder.lazyInitialize(globalFailureHandler, mainThreadExecutor); - } - } - - public void startAllOperatorCoordinators() { - final Collection coordinators = coordinatorMap.values(); - try { - for (OperatorCoordinatorHolder coordinator : coordinators) { - coordinator.start(); - } - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - coordinators.forEach(IOUtils::closeQuietly); - throw new FlinkRuntimeException("Failed to start the operator coordinators", t); - } - } - - public void disposeAllOperatorCoordinators() { - coordinatorMap.values().forEach(IOUtils::closeQuietly); - } - - public void deliverOperatorEventToCoordinator( - final ExecutionAttemptID taskExecutionId, - final OperatorID operatorId, - final OperatorEvent evt) - throws FlinkException { - - // Failure semantics (as per the javadocs of the method): - // If the task manager sends an event for a non-running task or an non-existing operator - // coordinator, then respond with an exception to the call. If task and coordinator exist, - // then we assume that the call from the TaskManager was valid, and any bubbling exception - // needs to cause a job failure. - - final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId); - if (exec == null || exec.getState() != ExecutionState.RUNNING) { - // This situation is common when cancellation happens, or when the task failed while the - // event was just being dispatched asynchronously on the TM side. - // It should be fine in those expected situations to just ignore this event, but, to be - // on the safe, we notify the TM that the event could not be delivered. - throw new TaskNotRunningException( - "Task is not known or in state running on the JobManager."); - } - - final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId); - if (coordinator == null) { - throw new FlinkException("No coordinator registered for operator " + operatorId); - } - - try { - coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt); - } catch (Throwable t) { - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - globalFailureHandler.accept(t); - } - } - - public CompletableFuture deliverCoordinationRequestToCoordinator( - OperatorID operator, CoordinationRequest request) throws FlinkException { - - final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator); - if (coordinatorHolder == null) { - throw new FlinkException("Coordinator of operator " + operator + " does not exist"); - } - - final OperatorCoordinator coordinator = coordinatorHolder.coordinator(); - if (coordinator instanceof CoordinationRequestHandler) { - return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request); - } else { - throw new FlinkException( - "Coordinator of operator " + operator + " cannot handle client event"); - } - } +public interface OperatorCoordinatorHandler { + + /** + * Initialize operator coordinators. + * + * @param mainThreadExecutor Executor for submitting work to the main thread. + */ + void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor); + + /** Start all operator coordinators. */ + void startAllOperatorCoordinators(); + + /** Dispose all operator coordinators. */ + void disposeAllOperatorCoordinators(); + + /** + * Delivers an OperatorEvent to a {@link OperatorCoordinator}. + * + * @param taskExecutionId Execution attempt id of the originating task. + * @param operatorId OperatorId of the target OperatorCoordinator. + * @param event Event to deliver to the OperatorCoordinator. + * @throws FlinkException If no coordinator is registered for operator. + */ + void deliverOperatorEventToCoordinator( + ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent event) + throws FlinkException; + + /** + * Deliver coordination request from the client to the coordinator. + * + * @param operator Id of target operator. + * @param request request for the operator. + * @return Future with the response. + * @throws FlinkException If the coordinator doesn't exist or if it can not handle the request. + */ + CompletableFuture deliverCoordinationRequestToCoordinator( + OperatorID operator, CoordinationRequest request) throws FlinkException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c08ea6a246517..f75bde1ec7449 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -205,7 +205,7 @@ public SchedulerBase( new ExecutionGraphHandler(executionGraph, log, ioExecutor, this.mainThreadExecutor); this.operatorCoordinatorHandler = - new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); + new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor); exceptionHistory = new BoundedFIFOQueue<>( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index b1b6525e7ea11..4552e3aea1b16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; @@ -646,7 +647,7 @@ public void goToExecuting(ExecutionGraph executionGraph) { new ExecutionGraphHandler( executionGraph, LOG, ioExecutor, componentMainThreadExecutor); final OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); + new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); operatorCoordinatorHandler.initializeOperatorCoordinators(componentMainThreadExecutor); operatorCoordinatorHandler.startAllOperatorCoordinators(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java index 0d646dd625840..0d74a1053cac3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java @@ -140,11 +140,7 @@ private Canceling createCancelingState( ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor()); final OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - executionGraph, - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); + new TestingOperatorCoordinatorHandler(); executionGraph.transitionToRunning(); Canceling canceling = new Canceling( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index 5c2537f58fc5d..b60f0e2f91a2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -103,8 +103,8 @@ public void testExecutionGraphDeploymentOnEnter() throws Exception { public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph() throws Exception { try (MockExecutingContext ctx = new MockExecutingContext()) { - MockOperatorCoordinatorHandler operatorCoordinator = - new MockOperatorCoordinatorHandler(); + TestingOperatorCoordinatorHandler operatorCoordinator = + new TestingOperatorCoordinatorHandler(); Executing exec = new ExecutingStateBuilder() .setOperatorCoordinatorHandler(operatorCoordinator) @@ -369,12 +369,7 @@ private final class ExecutingStateBuilder { private OperatorCoordinatorHandler operatorCoordinatorHandler; private ExecutingStateBuilder() throws JobException, JobExecutionException { - operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - executionGraph, - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); + operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler(); } public ExecutingStateBuilder setExecutionGraph(ExecutionGraph executionGraph) { @@ -689,28 +684,6 @@ public Logger getLogger() { } } - private static class MockOperatorCoordinatorHandler extends OperatorCoordinatorHandler { - - private boolean disposed = false; - - public MockOperatorCoordinatorHandler() throws JobException, JobExecutionException { - super( - TestingDefaultExecutionGraphBuilder.newBuilder().build(), - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); - } - - @Override - public void disposeAllOperatorCoordinators() { - disposed = true; - } - - public boolean isDisposed() { - return disposed; - } - } - static class MockExecutionJobVertex extends ExecutionJobVertex { private final MockExecutionVertex mockExecutionVertex; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java index ce3dceb3346a5..1b347e012a4ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java @@ -145,11 +145,7 @@ private Failing createFailingState(MockFailingContext ctx, ExecutionGraph execut ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor()); final OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - executionGraph, - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); + new TestingOperatorCoordinatorHandler(); executionGraph.transitionToRunning(); return new Failing( ctx, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index 14aa3041c0461..d1d63953b07c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -119,11 +119,7 @@ public Restarting createRestartingState( ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor()); final OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - executionGraph, - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); + new TestingOperatorCoordinatorHandler(); executionGraph.transitionToRunning(); return new Restarting( ctx, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java index 22d2d1c48a2d7..443137e09454e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -75,34 +74,107 @@ public void testSuspendCanBeCalledWhenExecutionGraphHasReachedGloballyTerminalSt } } + @Test + public void testOperatorCoordinatorShutdownOnLeave() throws Exception { + try (MockStateWithExecutionGraphContext context = + new MockStateWithExecutionGraphContext()) { + + final TestingOperatorCoordinatorHandler testingOperatorCoordinatorHandler = + new TestingOperatorCoordinatorHandler(); + final TestingStateWithExecutionGraph stateWithExecutionGraph = + createStateWithExecutionGraph(context, testingOperatorCoordinatorHandler); + + stateWithExecutionGraph.onLeave(AdaptiveSchedulerTest.DummyState.class); + + assertThat(testingOperatorCoordinatorHandler.isDisposed(), is(true)); + } + } + + @Test + public void testSuspendToFinished() throws Exception { + try (MockStateWithExecutionGraphContext context = + new MockStateWithExecutionGraphContext()) { + + final TestingStateWithExecutionGraph stateWithExecutionGraph = + createStateWithExecutionGraph(context); + + context.setExpectFinished(aeg -> assertThat(aeg.getState(), is(JobStatus.SUSPENDED))); + + stateWithExecutionGraph.suspend(new RuntimeException()); + } + } + + @Test + public void testOnGloballyTerminalStateCalled() throws Exception { + MockStateWithExecutionGraphContext context = new MockStateWithExecutionGraphContext(); + + StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph(); + final TestingStateWithExecutionGraph stateWithExecutionGraph = + createStateWithExecutionGraph(context, mockExecutionGraph); + + mockExecutionGraph.completeTerminationFuture(JobStatus.FINISHED); + + context.close(); + + assertThat( + stateWithExecutionGraph.getGloballyTerminalStateFuture().get(), + is(JobStatus.FINISHED)); + } + + @Test + public void testOnGloballyTerminalStateNotCalledOnNonGloballyTerminalState() throws Exception { + MockStateWithExecutionGraphContext context = new MockStateWithExecutionGraphContext(); + + StateTrackingMockExecutionGraph mockExecutionGraph = new StateTrackingMockExecutionGraph(); + final TestingStateWithExecutionGraph stateWithExecutionGraph = + createStateWithExecutionGraph(context, mockExecutionGraph); + + mockExecutionGraph.completeTerminationFuture(JobStatus.SUSPENDED); + + context.close(); + + assertThat(stateWithExecutionGraph.getGloballyTerminalStateFuture().isDone(), is(false)); + } + + private TestingStateWithExecutionGraph createStateWithExecutionGraph( + MockStateWithExecutionGraphContext context) { + final ExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + return createStateWithExecutionGraph(context, executionGraph); + } + private TestingStateWithExecutionGraph createStateWithExecutionGraph( MockStateWithExecutionGraphContext context, - StateTrackingMockExecutionGraph testingExecutionGraph) { + OperatorCoordinatorHandler operatorCoordinatorHandler) { + final ExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + return createStateWithExecutionGraph(context, executionGraph, operatorCoordinatorHandler); + } + + private TestingStateWithExecutionGraph createStateWithExecutionGraph( + MockStateWithExecutionGraphContext context, ExecutionGraph executionGraph) { + final OperatorCoordinatorHandler operatorCoordinatorHandler = + new TestingOperatorCoordinatorHandler(); + return createStateWithExecutionGraph(context, executionGraph, operatorCoordinatorHandler); + } + + private TestingStateWithExecutionGraph createStateWithExecutionGraph( + MockStateWithExecutionGraphContext context, + ExecutionGraph executionGraph, + OperatorCoordinatorHandler operatorCoordinatorHandler) { final ExecutionGraphHandler executionGraphHandler = new ExecutionGraphHandler( - testingExecutionGraph, + executionGraph, log, context.getMainThreadExecutor(), context.getMainThreadExecutor()); - final OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - testingExecutionGraph, - globalFailure -> { - throw new FlinkRuntimeException( - "No global failures are expected", globalFailure); - }); + executionGraph.transitionToRunning(); return new TestingStateWithExecutionGraph( - context, - testingExecutionGraph, - executionGraphHandler, - operatorCoordinatorHandler, - log); + context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, log); } - private final class TestingStateWithExecutionGraph extends StateWithExecutionGraph { + private static final class TestingStateWithExecutionGraph extends StateWithExecutionGraph { private final CompletableFuture globallyTerminalStateFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index 357cf5de4f830..e0383fa33dcad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -352,11 +352,7 @@ private StopWithSavepoint createStopWithSavepoint( ctx.getMainThreadExecutor(), ctx.getMainThreadExecutor()); OperatorCoordinatorHandler operatorCoordinatorHandler = - new OperatorCoordinatorHandler( - executionGraph, - (throwable) -> { - throw new RuntimeException("Error in test", throwable); - }); + new TestingOperatorCoordinatorHandler(); executionGraph.transitionToRunning(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java new file mode 100644 index 0000000000000..fc9325b85855d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/TestingOperatorCoordinatorHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.util.FlinkException; + +import java.util.concurrent.CompletableFuture; + +class TestingOperatorCoordinatorHandler implements OperatorCoordinatorHandler { + private boolean disposed = false; + + public boolean isDisposed() { + return disposed; + } + + @Override + public void disposeAllOperatorCoordinators() { + disposed = true; + } + + @Override + public void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) { + throw new UnsupportedOperationException(); + } + + @Override + public void startAllOperatorCoordinators() { + throw new UnsupportedOperationException(); + } + + @Override + public void deliverOperatorEventToCoordinator( + ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent evt) + throws FlinkException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture deliverCoordinationRequestToCoordinator( + OperatorID operator, CoordinationRequest request) throws FlinkException { + throw new UnsupportedOperationException(); + } +}