Skip to content

Commit

Permalink
[FLINK-17075][coordination] Reconcile deployed Executions
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jul 8, 2020
1 parent 9896c9b commit 2210aff
Show file tree
Hide file tree
Showing 34 changed files with 1,360 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,9 @@ public void deploy() throws JobException {
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure == null) {
vertex.notifyDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.clusterframework.types.ResourceID;

/**
* A listener that is called when an execution has been deployed.
*/
public interface ExecutionDeploymentListener {
void onCompletedDeployment(ExecutionAttemptID execution, ResourceID host);
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
/** Shuffle master to register partitions for task deployment. */
private final ShuffleMaster<?> shuffleMaster;

private final ExecutionDeploymentListener executionDeploymentListener;
private final ExecutionStateUpdateListener executionStateUpdateListener;

// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
Expand All @@ -332,7 +335,9 @@ public ExecutionGraph(
PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ScheduleMode scheduleMode) throws IOException {
ScheduleMode scheduleMode,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener) throws IOException {

this.jobInformation = Preconditions.checkNotNull(jobInformation);

Expand Down Expand Up @@ -390,6 +395,9 @@ public ExecutionGraph(
this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
this::createResultPartitionId,
partitionTracker);

this.executionDeploymentListener = executionDeploymentListener;
this.executionStateUpdateListener = executionStateUpdateListener;
}

public void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor) {
Expand Down Expand Up @@ -1675,6 +1683,8 @@ void notifyExecutionChange(
return;
}

executionStateUpdateListener.onStateUpdate(execution.getAttemptId(), newExecutionState);

// see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
Expand Down Expand Up @@ -1726,4 +1736,8 @@ public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker(
PartitionReleaseStrategy getPartitionReleaseStrategy() {
return partitionReleaseStrategy;
}

ExecutionDeploymentListener getExecutionDeploymentListener() {
return executionDeploymentListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ExecutionGraphBuilder {
* If a prior execution graph exists, the JobGraph will be attached. If no prior execution
* graph exists, then the JobGraph will become attach to a new empty execution graph.
*/
@VisibleForTesting
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Expand Down Expand Up @@ -119,7 +121,9 @@ public static ExecutionGraph buildGraph(
log,
shuffleMaster,
partitionTracker,
failoverStrategy);
failoverStrategy,
(execution, host) -> {},
(execution, newState) -> {});
}

public static ExecutionGraph buildGraph(
Expand All @@ -139,7 +143,9 @@ public static ExecutionGraph buildGraph(
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {
FailoverStrategy.Factory failoverStrategyFactory,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener) throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");

Expand Down Expand Up @@ -179,7 +185,9 @@ public static ExecutionGraph buildGraph(
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
jobGraph.getScheduleMode());
jobGraph.getScheduleMode(),
executionDeploymentListener,
executionStateUpdateListener);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.execution.ExecutionState;

/**
* A listener that is called when an execution switched to a new state.
*/
public interface ExecutionStateUpdateListener {
void onStateUpdate(ExecutionAttemptID execution, ExecutionState newState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -874,17 +874,31 @@ void executionFinished(Execution execution) {
// Miscellaneous
// --------------------------------------------------------------------------------------------

void notifyDeployment(Execution execution) {
// only forward this notification if the execution is still the current execution
// otherwise we have an outdated execution
if (isCurrentExecution(execution)) {
getExecutionGraph().getExecutionDeploymentListener().onCompletedDeployment(
execution.getAttemptId(),
execution.getAssignedResourceLocation().getResourceID());
}
}

/**
* Simply forward this notification.
*/
void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
// only forward this notification if the execution is still the current execution
// otherwise we have an outdated execution
if (currentExecution == execution) {
if (isCurrentExecution(execution)) {
getExecutionGraph().notifyExecutionChange(execution, newState, error);
}
}

private boolean isCurrentExecution(Execution execution) {
return currentExecution == execution;
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;

import java.util.HashSet;
import java.util.Set;

/**
* Default {@link ExecutionDeploymentReconciler} implementation. Detects missing/unknown deployments, and defers
* to a provided {@link ExecutionDeploymentReconciliationHandler} to resolve them.
*/
public class DefaultExecutionDeploymentReconciler implements ExecutionDeploymentReconciler {

private final ExecutionDeploymentReconciliationHandler handler;

public DefaultExecutionDeploymentReconciler(ExecutionDeploymentReconciliationHandler handler) {
this.handler = handler;
}

@Override
public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutions) {
final Set<ExecutionAttemptID> unknownExecutions = new HashSet<>();
final Set<ExecutionAttemptID> expectedExecutions = new HashSet<>(expectedDeployedExecutions);

for (ExecutionAttemptID executionAttemptID : executionDeploymentReport.getExecutions()) {
boolean isTracked = expectedExecutions.remove(executionAttemptID);
if (!isTracked) {
unknownExecutions.add(executionAttemptID);
}
}
if (!unknownExecutions.isEmpty()) {
handler.onUnknownDeploymentsOf(unknownExecutions, taskExecutorHost);
}
if (!expectedExecutions.isEmpty()) {
handler.onMissingDeploymentsOf(expectedExecutions, taskExecutorHost);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Default {@link ExecutionDeploymentTracker} implementation.
*/
public class DefaultExecutionDeploymentTracker implements ExecutionDeploymentTracker {

private final Map<ResourceID, Set<ExecutionAttemptID>> executionsByHost = new HashMap<>();
private final Map<ExecutionAttemptID, ResourceID> hostByExecution = new HashMap<>();

@Override
public void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) {
hostByExecution.put(executionAttemptId, host);
executionsByHost.computeIfAbsent(host, ignored -> new HashSet<>()).add(executionAttemptId);
}

@Override
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
ResourceID host = hostByExecution.remove(executionAttemptId);
if (host != null) {
executionsByHost.computeIfPresent(host, (resourceID, executionAttemptIds) -> {
executionAttemptIds.remove(executionAttemptId);

return executionAttemptIds.isEmpty()
? null
: executionAttemptIds;
});
}
}

@Override
public Set<ExecutionAttemptID> getExecutionsOn(ResourceID host) {
return executionsByHost.getOrDefault(host, Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;

import java.util.Set;

/**
* Component for reconciling the deployment state of executions.
*/
public interface ExecutionDeploymentReconciler {

/**
* Factory for {@link ExecutionDeploymentReconciler}.
*/
interface Factory {
ExecutionDeploymentReconciler create(ExecutionDeploymentReconciliationHandler reconciliationHandler);
}

/**
* Reconciles the deployment states between all reported/expected executions for the given task executor.
*
* @param taskExecutorHost hosting task executor
* @param executionDeploymentReport task executor report for deployed executions
* @param expectedDeployedExecutionIds set of expected deployed executions
*/
void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutionIds);

}
Loading

0 comments on commit 2210aff

Please sign in to comment.