Skip to content

Commit

Permalink
[FLINK-10712] Support state restore for RestartPipelinedRegionStrategy
Browse files Browse the repository at this point in the history
This closes apache#7813.
  • Loading branch information
Myasuka authored and StefanRRichter committed Apr 17, 2019
1 parent a670b97 commit c3a9293
Show file tree
Hide file tree
Showing 11 changed files with 748 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -77,8 +77,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas

private static String outPath;

@BeforeClass
public static void createHDFS() throws IOException {
@Before
public void createHDFS() throws IOException {
Configuration conf = new Configuration();

File dataDir = tempFolder.newFolder();
Expand All @@ -94,8 +94,8 @@ public static void createHDFS() throws IOException {
+ "/string-non-rolling-out";
}

@AfterClass
public static void destroyHDFS() {
@After
public void destroyHDFS() {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -77,8 +77,8 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB

private static String outPath;

@BeforeClass
public static void createHDFS() throws IOException {
@Before
public void createHDFS() throws IOException {
Configuration conf = new Configuration();

File dataDir = tempFolder.newFolder();
Expand All @@ -94,8 +94,8 @@ public static void createHDFS() throws IOException {
+ "/string-non-rolling-out";
}

@AfterClass
public static void destroyHDFS() {
@After
public void destroyHDFS() {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,12 +1202,23 @@ public void stopCheckpointScheduler() {
currentPeriodicTrigger = null;
}

abortPendingCheckpoints(new Exception("Checkpoint Coordinator is suspending."));

numUnsuccessfulCheckpointsTriggers.set(0);
}
}

/**
* Aborts all the pending checkpoints due to en exception.
* @param exception The exception.
*/
public void abortPendingCheckpoints(Exception exception) {
synchronized (lock) {
for (PendingCheckpoint p : pendingCheckpoints.values()) {
p.abortError(new Exception("Checkpoint Coordinator is suspending."));
p.abortError(exception);
}

pendingCheckpoints.clear();
numUnsuccessfulCheckpointsTriggers.set(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times
Expand Down Expand Up @@ -365,7 +364,6 @@ public JobManagerTaskRestore getTaskRestore() {
* @param taskRestore information to restore the state
*/
public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
this.taskRestore = taskRestore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.util.AbstractID;
Expand All @@ -38,6 +40,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand All @@ -64,15 +67,19 @@ public class FailoverRegion {

private final List<ExecutionVertex> connectedExecutionVertexes;

private final Map<JobVertexID, ExecutionJobVertex> tasks;

/** Current status of the job execution */
private volatile JobStatus state = JobStatus.RUNNING;

public FailoverRegion(
ExecutionGraph executionGraph,
List<ExecutionVertex> connectedExecutions) {
List<ExecutionVertex> connectedExecutions,
Map<JobVertexID, ExecutionJobVertex> tasks) {

this.executionGraph = checkNotNull(executionGraph);
this.connectedExecutionVertexes = checkNotNull(connectedExecutions);
this.tasks = checkNotNull(tasks);

LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions);
}
Expand Down Expand Up @@ -108,14 +115,7 @@ public JobStatus getState() {
return state;
}

/**
* get all execution vertexes contained in this region
*/
public List<ExecutionVertex> getAllExecutionVertexes() {
return connectedExecutionVertexes;
}

// Notice the region to failover,
// Notice the region to failover,
private void failover(long globalModVersionOfFailover) {
if (!executionGraph.getRestartStrategy().canRestart()) {
executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail"));
Expand Down Expand Up @@ -206,13 +206,15 @@ private void restart(long globalModVersionOfFailover) {
try {
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
// if we have checkpointed state, reload it into the executions
//TODO: checkpoint support restore part ExecutionVertex cp
/**
if (executionGraph.getCheckpointCoordinator() != null) {
// we restart the checkpoint scheduler for
// i) enable new checkpoint could be triggered without waiting for last checkpoint expired.
// ii) ensure the EXACTLY_ONCE semantics if needed.
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new Exception("FailoverRegion is restarting."));

executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
connectedExecutionVertexes, false, false);
tasks, false, true);
}
*/

HashSet<AllocationID> previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size());
for (ExecutionVertex connectedExecutionVertex : connectedExecutionVertexes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;

Expand All @@ -36,11 +37,12 @@
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A failover strategy that restarts regions of the ExecutionGraph. A region is defined
* A failover strategy that restarts regions of the ExecutionGraph with state. A region is defined
* by this strategy as the weakly connected component of tasks that communicate via pipelined
* data exchange.
*/
Expand Down Expand Up @@ -222,7 +224,19 @@ private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) {

@VisibleForTesting
protected FailoverRegion createFailoverRegion(ExecutionGraph eg, List<ExecutionVertex> connectedExecutions) {
return new FailoverRegion(eg, connectedExecutions);
Map<JobVertexID, ExecutionJobVertex> tasks = initTasks(connectedExecutions);
return new FailoverRegion(eg, connectedExecutions, tasks);
}

@VisibleForTesting
protected Map<JobVertexID, ExecutionJobVertex> initTasks(List<ExecutionVertex> connectedExecutions) {
Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(connectedExecutions.size());
for (ExecutionVertex executionVertex : connectedExecutions) {
JobVertexID jobvertexId = executionVertex.getJobvertexId();
ExecutionJobVertex jobVertex = executionVertex.getJobVertex();
tasks.putIfAbsent(jobvertexId, jobVertex);
}
return tasks;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
Expand Down Expand Up @@ -521,7 +522,8 @@ public void setBlockerFuture(@Nonnull CompletableFuture<?> blockerFuture) {

@Override
protected FailoverRegion createFailoverRegion(ExecutionGraph eg, List<ExecutionVertex> connectedExecutions) {
return new FailoverRegion(eg, connectedExecutions) {
Map<JobVertexID, ExecutionJobVertex> tasks = initTasks(connectedExecutions);
return new FailoverRegion(eg, connectedExecutions, tasks) {
@Override
protected CompletableFuture<Void> createTerminationFutureOverAllConnectedVertexes() {
ArrayList<CompletableFuture<?>> terminationAndBlocker = new ArrayList<>(2);
Expand Down
Loading

0 comments on commit c3a9293

Please sign in to comment.