Skip to content

Commit

Permalink
[FLINK-21348] Add tests for AdaptiveScheduler's StateWithExecutionGraph
Browse files Browse the repository at this point in the history
This closes apache#15342
  • Loading branch information
rmetzger committed Mar 23, 2021
1 parent d67c4d9 commit 55a1d1b
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/** Default handler for the {@link OperatorCoordinator OperatorCoordinators}. */
public class DefaultOperatorCoordinatorHandler implements OperatorCoordinatorHandler {
private final ExecutionGraph executionGraph;

private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;

private final Consumer<Throwable> globalFailureHandler;

public DefaultOperatorCoordinatorHandler(
ExecutionGraph executionGraph, Consumer<Throwable> globalFailureHandler) {
this.executionGraph = executionGraph;

this.coordinatorMap = createCoordinatorMap(executionGraph);
this.globalFailureHandler = globalFailureHandler;
}

private static Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap(
ExecutionGraph executionGraph) {
Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
coordinatorMap.put(holder.operatorId(), holder);
}
}
return coordinatorMap;
}

@Override
public void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) {
for (OperatorCoordinatorHolder coordinatorHolder : coordinatorMap.values()) {
coordinatorHolder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
}
}

@Override
public void startAllOperatorCoordinators() {
final Collection<OperatorCoordinatorHolder> coordinators = coordinatorMap.values();
try {
for (OperatorCoordinatorHolder coordinator : coordinators) {
coordinator.start();
}
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
coordinators.forEach(IOUtils::closeQuietly);
throw new FlinkRuntimeException("Failed to start the operator coordinators", t);
}
}

@Override
public void disposeAllOperatorCoordinators() {
coordinatorMap.values().forEach(IOUtils::closeQuietly);
}

@Override
public void deliverOperatorEventToCoordinator(
final ExecutionAttemptID taskExecutionId,
final OperatorID operatorId,
final OperatorEvent evt)
throws FlinkException {

// Failure semantics (as per the javadocs of the method):
// If the task manager sends an event for a non-running task or an non-existing operator
// coordinator, then respond with an exception to the call. If task and coordinator exist,
// then we assume that the call from the TaskManager was valid, and any bubbling exception
// needs to cause a job failure.

final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId);
if (exec == null || exec.getState() != ExecutionState.RUNNING) {
// This situation is common when cancellation happens, or when the task failed while the
// event was just being dispatched asynchronously on the TM side.
// It should be fine in those expected situations to just ignore this event, but, to be
// on the safe, we notify the TM that the event could not be delivered.
throw new TaskNotRunningException(
"Task is not known or in state running on the JobManager.");
}

final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId);
if (coordinator == null) {
throw new FlinkException("No coordinator registered for operator " + operatorId);
}

try {
coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
globalFailureHandler.accept(t);
}
}

@Override
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operator, CoordinationRequest request) throws FlinkException {

final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator);
if (coordinatorHolder == null) {
throw new FlinkException("Coordinator of operator " + operator + " does not exist");
}

final OperatorCoordinator coordinator = coordinatorHolder.coordinator();
if (coordinator instanceof CoordinationRequestHandler) {
return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
} else {
throw new FlinkException(
"Coordinator of operator " + operator + " cannot handle client event");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,129 +19,52 @@
package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/** Handler for the {@link OperatorCoordinator OperatorCoordinators}. */
public class OperatorCoordinatorHandler {
private final ExecutionGraph executionGraph;

private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;

private final Consumer<Throwable> globalFailureHandler;

public OperatorCoordinatorHandler(
ExecutionGraph executionGraph, Consumer<Throwable> globalFailureHandler) {
this.executionGraph = executionGraph;

this.coordinatorMap = createCoordinatorMap(executionGraph);
this.globalFailureHandler = globalFailureHandler;
}

private Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap(
ExecutionGraph executionGraph) {
Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
coordinatorMap.put(holder.operatorId(), holder);
}
}
return coordinatorMap;
}

public void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) {
for (OperatorCoordinatorHolder coordinatorHolder : coordinatorMap.values()) {
coordinatorHolder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
}
}

public void startAllOperatorCoordinators() {
final Collection<OperatorCoordinatorHolder> coordinators = coordinatorMap.values();
try {
for (OperatorCoordinatorHolder coordinator : coordinators) {
coordinator.start();
}
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
coordinators.forEach(IOUtils::closeQuietly);
throw new FlinkRuntimeException("Failed to start the operator coordinators", t);
}
}

public void disposeAllOperatorCoordinators() {
coordinatorMap.values().forEach(IOUtils::closeQuietly);
}

public void deliverOperatorEventToCoordinator(
final ExecutionAttemptID taskExecutionId,
final OperatorID operatorId,
final OperatorEvent evt)
throws FlinkException {

// Failure semantics (as per the javadocs of the method):
// If the task manager sends an event for a non-running task or an non-existing operator
// coordinator, then respond with an exception to the call. If task and coordinator exist,
// then we assume that the call from the TaskManager was valid, and any bubbling exception
// needs to cause a job failure.

final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId);
if (exec == null || exec.getState() != ExecutionState.RUNNING) {
// This situation is common when cancellation happens, or when the task failed while the
// event was just being dispatched asynchronously on the TM side.
// It should be fine in those expected situations to just ignore this event, but, to be
// on the safe, we notify the TM that the event could not be delivered.
throw new TaskNotRunningException(
"Task is not known or in state running on the JobManager.");
}

final OperatorCoordinatorHolder coordinator = coordinatorMap.get(operatorId);
if (coordinator == null) {
throw new FlinkException("No coordinator registered for operator " + operatorId);
}

try {
coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
globalFailureHandler.accept(t);
}
}

public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operator, CoordinationRequest request) throws FlinkException {

final OperatorCoordinatorHolder coordinatorHolder = coordinatorMap.get(operator);
if (coordinatorHolder == null) {
throw new FlinkException("Coordinator of operator " + operator + " does not exist");
}

final OperatorCoordinator coordinator = coordinatorHolder.coordinator();
if (coordinator instanceof CoordinationRequestHandler) {
return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
} else {
throw new FlinkException(
"Coordinator of operator " + operator + " cannot handle client event");
}
}
public interface OperatorCoordinatorHandler {

/**
* Initialize operator coordinators.
*
* @param mainThreadExecutor Executor for submitting work to the main thread.
*/
void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor);

/** Start all operator coordinators. */
void startAllOperatorCoordinators();

/** Dispose all operator coordinators. */
void disposeAllOperatorCoordinators();

/**
* Delivers an OperatorEvent to a {@link OperatorCoordinator}.
*
* @param taskExecutionId Execution attempt id of the originating task.
* @param operatorId OperatorId of the target OperatorCoordinator.
* @param event Event to deliver to the OperatorCoordinator.
* @throws FlinkException If no coordinator is registered for operator.
*/
void deliverOperatorEventToCoordinator(
ExecutionAttemptID taskExecutionId, OperatorID operatorId, OperatorEvent event)
throws FlinkException;

/**
* Deliver coordination request from the client to the coordinator.
*
* @param operator Id of target operator.
* @param request request for the operator.
* @return Future with the response.
* @throws FlinkException If the coordinator doesn't exist or if it can not handle the request.
*/
CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operator, CoordinationRequest request) throws FlinkException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public SchedulerBase(
new ExecutionGraphHandler(executionGraph, log, ioExecutor, this.mainThreadExecutor);

this.operatorCoordinatorHandler =
new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
exceptionHistory =
new BoundedFIFOQueue<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
Expand Down Expand Up @@ -646,7 +647,7 @@ public void goToExecuting(ExecutionGraph executionGraph) {
new ExecutionGraphHandler(
executionGraph, LOG, ioExecutor, componentMainThreadExecutor);
final OperatorCoordinatorHandler operatorCoordinatorHandler =
new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
operatorCoordinatorHandler.initializeOperatorCoordinators(componentMainThreadExecutor);
operatorCoordinatorHandler.startAllOperatorCoordinators();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,7 @@ private Canceling createCancelingState(
ctx.getMainThreadExecutor(),
ctx.getMainThreadExecutor());
final OperatorCoordinatorHandler operatorCoordinatorHandler =
new OperatorCoordinatorHandler(
executionGraph,
(throwable) -> {
throw new RuntimeException("Error in test", throwable);
});
new TestingOperatorCoordinatorHandler();
executionGraph.transitionToRunning();
Canceling canceling =
new Canceling(
Expand Down
Loading

0 comments on commit 55a1d1b

Please sign in to comment.