From c036c1acf80f55161d95921afb6137c5a58052e6 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 13 Apr 2015 19:32:40 +0200 Subject: [PATCH] [scheduling] implement backtracking of intermediate results For batch programs, we currently schedule all tasks which are sources and let them kick off the execution of the connected tasks. This approach bears some problems when executing large dataflows with many branches. With backtracking, we traverse the execution graph output-centrically (from the sinks) in a depth-first manner. This enables us to use resources differently. In the course of backtracking, only tasks will be executed that are required to supply inputs to the current task. When a job is newly submitted, this means that the backtracking will reach the sources. When the job has been previously executed and intermediate results are available, old ResultPartitions to resume from can be requested while backtracking. Backtracking is disabled by default. It can be enabled by setting the ScheduleMode in JobGraph to BACKTRACKING. CHANGELOG - new scheduling mode: backtracking - backtracks from the sinks of an ExecutionGraph - checks the availability of IntermediatePartitionResults - marks ExecutionVertex to be scheduled - caches ResultPartitions and reloads them - resumes from intermediate results - test for general behavior of backtracking (BacktrackingTest) - test for resuming from an intermediate result (ResumeITCase) - test for releasing of cached ResultPartitions (ResultPartitionManagerTest) - allow multiple consumers per blocking intermediate result (batch) --- .../runtime/executiongraph/Backtracking.java | 254 ++++++++++++ .../runtime/executiongraph/Execution.java | 73 +++- .../executiongraph/ExecutionGraph.java | 107 +++-- .../executiongraph/ExecutionJobVertex.java | 4 +- .../executiongraph/ExecutionVertex.java | 34 +- .../executiongraph/IntermediateResult.java | 3 +- .../IntermediateResultPartition.java | 26 +- .../io/network/NetworkEnvironment.java | 2 + .../io/network/buffer/NetworkBufferPool.java | 43 +- .../io/network/partition/ResultPartition.java | 17 + .../network/partition/ResultPartitionID.java | 2 +- .../partition/ResultPartitionManager.java | 117 +++++- .../network/partition/ResultSubpartition.java | 5 + .../partition/SpillableSubpartition.java | 50 +-- .../runtime/jobgraph/AbstractJobVertex.java | 13 +- .../flink/runtime/jobgraph/JobEdge.java | 13 +- .../flink/runtime/jobgraph/JobGraph.java | 12 +- .../flink/runtime/jobgraph/JobStatus.java | 2 +- .../flink/runtime/jobgraph/ScheduleMode.java | 3 + .../jobmanager/scheduler/Scheduler.java | 4 +- .../flink/runtime/jobmanager/JobManager.scala | 7 +- .../messages/TaskManagerMessages.scala | 2 + .../flink/runtime/messages/TaskMessages.scala | 17 +- .../runtime/taskmanager/TaskManager.scala | 11 +- .../ExecutionVertexSchedulingTest.java | 4 + .../network/buffer/BufferPoolFactoryTest.java | 1 - .../partition/ResultPartitionManagerTest.java | 80 ++++ .../scheduler/BacktrackingTest.java | 386 ++++++++++++++++++ .../scheduler/SchedulerTestUtils.java | 1 - .../runtime/taskmanager/TaskManagerTest.java | 36 +- .../testingUtils/TestingJobManager.scala | 11 +- .../testingUtils/TestingTaskManager.scala | 33 +- .../TestingTaskManagerMessages.scala | 13 +- .../flink/test/resume/ResumeITCase.java | 253 ++++++++++++ pom.xml | 2 +- 35 files changed, 1506 insertions(+), 135 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BacktrackingTest.java create mode 100644 flink-tests/src/test/java/org/apache/flink/test/resume/ResumeITCase.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java new file mode 100644 index 0000000000000..e9e892f0ae2e9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** + * Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed + */ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** + * A TaskRequirement encapsulates an ExecutionVertex and its IntermediateResultPartitions which + * are required for execution. + */ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { + return executionVertex; + } + + public boolean pendingRequirements() { + Iterator iter = pendingInputs.iterator(); + while (iter.hasNext()) { + Boolean visitedPartition = visitedPartitions.get(iter.next().getPartitionId()); + if (visitedPartition == null) { + return true; + } else { + if (visitedPartition) { + availableInputs++; + } + iter.remove(); + } + } + return false; + } + + public IntermediateResultPartition getNextRequirement() { + return pendingInputs.pop(); + } + + public boolean needsToBeScheduled() { + return numInputs == availableInputs; + } + + public void inputFound() { + availableInputs++; + } + + @Override + public String toString() { + return "TaskRequirement{" + + "executionVertex=" + executionVertex + + ", pendingInputs=" + pendingInputs.size() + + '}'; + } + } + + /** + * Action to be performed on an ExecutionVertex when it is determined to be scheduled. + * @param scheduleAction A ScheduleAction which receives an ExecutionVertex. + */ + public void setScheduleAction(ScheduleAction scheduleAction) { + Preconditions.checkNotNull(scheduleAction); + this.scheduleAction = scheduleAction; + } + + /** + * Hook executed after backtracking finishes. Note that because of the use of futures, this may + * not be when the scheduleUsingBacktracking() method returns. + * @param postBacktrackingHook A Runnable that is executed after backtracking finishes. + */ + public void setPostBacktrackingHook(Runnable postBacktrackingHook) { + Preconditions.checkNotNull(postBacktrackingHook); + this.postBacktrackingHook = postBacktrackingHook; + } + + /* Visit the ExecutionGraph from the previously determined sinks using a pre-order depth-first + * iterative traversal. + */ + public void scheduleUsingBacktracking() { + + while (!taskRequirements.isEmpty()) { + + final TaskRequirement taskRequirement = taskRequirements.peek(); + final ExecutionVertex task = taskRequirement.getExecutionVertex(); + task.getCurrentExecutionAttempt().setScheduled(); + + if (task.getExecutionState() != ExecutionState.CREATED && task.getExecutionState() != ExecutionState.DEPLOYING) { + LOG.debug("Resetting ExecutionVertex {} from {} to CREATED.", task, task.getExecutionState()); + task.resetForNewExecution(); + task.getCurrentExecutionAttempt().setScheduled(); + } + + if (taskRequirement.pendingRequirements()) { + + final IntermediateResultPartition resultRequired = taskRequirement.getNextRequirement(); + + if (resultRequired.isLocationAvailable()) { + ActorRef taskManager = resultRequired.getLocation().getTaskManager(); + + LOG.debug("Requesting availability of IntermediateResultPartition " + resultRequired.getPartitionId()); + // pin ResulPartition for this intermediate result + Future future = Patterns.ask( + taskManager, + new TaskMessages.LockResultPartition( + resultRequired.getPartitionId(), + // We lock this result for all consumers. We have to make sure + // to release it once a job finishes. + resultRequired.getNumConsumers() + ), + 5000 // 5 seconds timeout + ); + + /** BEGIN Asynchronous callback **/ + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object success) { + if (success instanceof TaskMessages.LockResultPartitionReply && + ((TaskMessages.LockResultPartitionReply) success).locked()) { + LOG.debug("Resuming from IntermediateResultPartition " + resultRequired.getPartitionId()); + visitedPartitions.put(resultRequired.getPartitionId(), true); + taskRequirement.inputFound(); + } else { + // intermediate result not available + visitedPartitions.put(resultRequired.getPartitionId(), false); + taskRequirements.push(new TaskRequirement(resultRequired.getProducer())); + } + // TODO try again in case of errors? + // continue with backtracking + scheduleUsingBacktracking(); + } + }, AkkaUtils.globalExecutionContext()); + /** END Asynchronous callback **/ + + // interrupt backtracking here and continue once future is complete + return; + + } else { + visitedPartitions.put(resultRequired.getPartitionId(), false); + taskRequirements.push(new TaskRequirement(resultRequired.getProducer())); + } + + } else { + taskRequirements.pop(); + + if (taskRequirement.needsToBeScheduled()) { + scheduleAction.schedule(task); + } + + } + + } + + LOG.debug("Finished backtracking."); + postBacktrackingHook.run(); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 93b4f2f5506c7..1c80fc287c074 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -101,9 +102,9 @@ public class Execution implements Serializable { private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); - + private static final Logger LOG = ExecutionGraph.LOG; - + private static final int NUM_CANCEL_CALL_TRIES = 3; // -------------------------------------------------------------------------------------------- @@ -121,21 +122,29 @@ public class Execution implements Serializable { private ConcurrentLinkedQueue partialInputChannelDeploymentDescriptors; private volatile ExecutionState state = CREATED; - + + /** + * Flag indicating whether this Execution has been marked to be scheduled + */ + private volatile boolean scheduled; + private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived - + private volatile Throwable failureCause; // once assigned, never changes - + private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - + + /** Hint for the backtracking to query for a ResultPartition */ + private volatile Instance resultPartitionLocation; + private StateHandle operatorState; // -------------------------------------------------------------------------------------------- - + public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { this.vertex = checkNotNull(vertex); this.attemptId = new ExecutionAttemptID(); - + this.attemptNumber = attemptNumber; this.stateTimestamps = new long[ExecutionState.values().length]; @@ -145,7 +154,7 @@ public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue(); } - + // -------------------------------------------------------------------------------------------- // Properties // -------------------------------------------------------------------------------------------- @@ -166,6 +175,31 @@ public ExecutionState getState() { return state; } + public Instance getResultPartitionLocation() { + return resultPartitionLocation; + } + + public void setResultPartitionLocation(Instance instance) { + resultPartitionLocation = instance; + } + + /** + * Returns whether this Execution should be scheduled. Always true if schedule modes other than + * Backtracking are used. + * @return true when Execution should be scheduled, false otherwise + */ + public boolean isScheduled() { + return scheduled || getVertex().getExecutionGraph().getScheduleMode() != ScheduleMode.BACKTRACKING; + } + + /** + * Sets the current execution to the scheduled mode. This is only relevant for backtracking scheduling + * where we only want to trigger execution if the backtracking decided to mark this Execution as scheduled. + */ + public void setScheduled() { + this.scheduled = true; + } + public SimpleSlot getAssignedResource() { return assignedResource; } @@ -232,7 +266,7 @@ public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed."); } - if (transitionState(CREATED, SCHEDULED)) { + if (isScheduled() && transitionState(CREATED, SCHEDULED)) { ScheduledUnit toSchedule = locationConstraint == null ? new ScheduledUnit(this, sharingGroup) : @@ -318,14 +352,14 @@ public void deployToSlot(final SimpleSlot slot) throws JobException { slot.releaseSlot(); return; } - + if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(), attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname())); } - + final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot); - + // register this execution at the execution graph, to receive call backs vertex.getExecutionGraph().registerExecution(this); @@ -421,7 +455,7 @@ else if (current == CREATED || current == SCHEDULED) { } void scheduleOrUpdateConsumers(List> allConsumers) { - if (allConsumers.size() != 1) { + if (allConsumers.size() > 1) { fail(new IllegalStateException("Currently, only a single consumer group per partition is supported.")); } @@ -438,6 +472,7 @@ void scheduleOrUpdateConsumers(List> allConsumers) { // descriptors if there is a deployment race // ---------------------------------------------------------------- if (consumerState == CREATED) { + final Execution partitionExecution = partition.getProducer() .getCurrentExecutionAttempt(); @@ -452,12 +487,10 @@ void scheduleOrUpdateConsumers(List> allConsumers) { // TODO The current approach may send many update messages even though the consuming // task has already been deployed with all necessary information. We have to check // whether this is a problem and fix it, if it is. - future(new Callable(){ + future(new Callable() { @Override public Boolean call() throws Exception { try { - final ExecutionGraph consumerGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( consumerVertex.getExecutionGraph().getScheduler(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); @@ -471,7 +504,7 @@ public Boolean call() throws Exception { }, AkkaUtils.globalExecutionContext()); // double check to resolve race conditions - if(consumerVertex.getExecutionState() == RUNNING){ + if (consumerVertex.getExecutionState() == RUNNING) { consumerVertex.sendPartitionInfos(); } } @@ -572,6 +605,9 @@ void markFinished() { if (current == RUNNING || current == DEPLOYING) { if (transitionState(current, FINISHED)) { + // store the location of the ResultPartition + resultPartitionLocation = assignedResource.getInstance(); + try { for (IntermediateResultPartition finishedPartition : getVertex().finishAllBlockingPartitions()) { @@ -579,6 +615,7 @@ void markFinished() { IntermediateResultPartition[] allPartitions = finishedPartition .getIntermediateResult().getPartitions(); + for (IntermediateResultPartition partition : allPartitions) { scheduleOrUpdateConsumers(partition.getConsumers()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d38913e16d5db..844e2b14c0ca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.state.StateHandle; @@ -51,9 +52,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -123,9 +125,9 @@ public class ExecutionGraph implements Serializable { * inside the BlobService and are referenced via the BLOB keys. */ private final List requiredJarFiles; - private final List jobStatusListenerActors; + private final Set jobStatusListenerActors; - private final List executionListenerActors; + private final Set executionListenerActors; /** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when * the execution graph transitioned into a certain state. The index into this array is the @@ -155,7 +157,7 @@ public class ExecutionGraph implements Serializable { /** The mode of scheduling. Decides how to select the initial set of tasks to be deployed. * May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking - * from results than need to be materialized. */ + * from results that need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; @@ -212,8 +214,8 @@ public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, Fini this.verticesInCreationOrder = new ArrayList(); this.currentExecutions = new ConcurrentHashMap(); - this.jobStatusListenerActors = new CopyOnWriteArrayList(); - this.executionListenerActors = new CopyOnWriteArrayList(); + this.jobStatusListenerActors = new CopyOnWriteArraySet(); + this.executionListenerActors = new CopyOnWriteArraySet(); this.stateTimestamps = new long[JobStatus.values().length]; this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis(); @@ -224,7 +226,7 @@ public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, Fini } // -------------------------------------------------------------------------------------------- - + public void setStateCheckpointerActor(ActorRef stateCheckpointerActor) { this.stateCheckpointerActor = stateCheckpointerActor; } @@ -264,21 +266,21 @@ public void attachJobGraph(List topologiallySorted) throws Jo LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d " + "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size())); } - + final long createTimestamp = System.currentTimeMillis(); - + for (AbstractJobVertex jobVertex : topologiallySorted) { - + // create the execution job vertex and attach it to the graph ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); ejv.connectToPredecessors(this.intermediateResults); - + ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask)); } - + for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { @@ -286,7 +288,7 @@ public void attachJobGraph(List topologiallySorted) throws Jo res.getId(), res, previousDataSet)); } } - + this.verticesInCreationOrder.add(ejv); } } @@ -349,7 +351,7 @@ public Iterable getVerticesTopologically() { // we return a specific iterator that does not fail with concurrent modifications // the list is append only, so it is safe for that final int numElements = this.verticesInCreationOrder.size(); - + return new Iterable() { @Override public Iterator iterator() { @@ -412,28 +414,29 @@ public ScheduleMode getScheduleMode() { return scheduleMode; } + + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- - public void scheduleForExecution(Scheduler scheduler) throws JobException { + public void scheduleForExecution(final Scheduler scheduler) throws JobException { if (scheduler == null) { throw new IllegalArgumentException("Scheduler must not be null."); } - + if (this.scheduler != null && this.scheduler != scheduler) { throw new IllegalArgumentException("Cannot use different schedulers for the same job"); } - + if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { this.scheduler = scheduler; switch (scheduleMode) { case FROM_SOURCES: - // initially, we simply take the ones without inputs. - // next, we implement the logic to go back from vertices that need computation - // to the ones we need to start running + // simply take the ones without inputs (sources) and let them initiate other + // task scheduling through the availability of their results for (ExecutionJobVertex ejv : this.tasks.values()) { if (ejv.getJobVertex().isInputVertex()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); @@ -443,6 +446,7 @@ public void scheduleForExecution(Scheduler scheduler) throws JobException { break; case ALL: + // simply schedule all nodes in a topological order for (ExecutionJobVertex ejv : getVerticesTopologically()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); } @@ -450,7 +454,31 @@ public void scheduleForExecution(Scheduler scheduler) throws JobException { break; case BACKTRACKING: - throw new JobException("BACKTRACKING is currently not supported as schedule mode."); + /** + * Start from the sinks that do not produce intermediate results and track + * back to find available intermediate results. + */ + + Backtracking backtracking = new Backtracking(this.tasks.values()); + + backtracking.setScheduleAction(new Backtracking.ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) { + try { + ev.scheduleForExecution(scheduler, allowQueuedScheduling); + } catch (NoResourceAvailableException e) { + e.printStackTrace(); + fail(e); + } + } + }); + + backtracking.scheduleUsingBacktracking(); + + break; + + default: + throw new JobException("Unsupported scheduling mode."); } if (checkpointingEnabled) { @@ -466,7 +494,7 @@ public void scheduleForExecution(Scheduler scheduler) throws JobException { public void cancel() { while (true) { JobStatus current = state; - + if (current == JobStatus.RUNNING || current == JobStatus.CREATED) { if (transitionState(current, JobStatus.CANCELLING)) { for (ExecutionJobVertex ejv : verticesInCreationOrder) { @@ -500,10 +528,10 @@ else if (transitionState(current, JobStatus.FAILING, t)) { // set the state of the job to failed transitionState(JobStatus.FAILING, JobStatus.FAILED, t); } - + return; } - + // no need to treat other states } } @@ -538,20 +566,20 @@ void jobVertexInFinalState(ExecutionJobVertex ev) { // - the second (after it could grab the lock) tries to advance the position again return; } - + // see if we are the next to finish and then progress until the next unfinished one if (verticesInCreationOrder.get(nextPos) == ev) { do { nextPos++; } while (nextPos < verticesInCreationOrder.size() && verticesInCreationOrder.get(nextPos).isInFinalState()); - + nextVertexToFinish = nextPos; - + if (nextPos == verticesInCreationOrder.size()) { - + // we are done, transition to the final state - + while (true) { JobStatus current = this.state; if (current == JobStatus.RUNNING && transitionState(current, JobStatus.FINISHED)) { @@ -623,7 +651,7 @@ public boolean updateState(TaskExecutionState state) { return false; } } - + public void loadOperatorStates(Map , StateHandle> states) { synchronized (this.progressLock) { for (Map.Entry, StateHandle> state : states.entrySet()) @@ -723,6 +751,13 @@ void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID } } + public void prepareForResuming() { + synchronized (progressLock) { + transitionState(JobStatus.FINISHED, JobStatus.CREATED); + this.currentExecutions.clear(); + } + } + public void restart() { try { if (state == JobStatus.FAILED) { @@ -730,7 +765,7 @@ public void restart() { throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart."); } } - + synchronized (progressLock) { if (state != JobStatus.RESTARTING) { throw new IllegalStateException("Can only restart job from state restarting."); @@ -758,7 +793,7 @@ public void restart() { fail(t); } } - + /** * This method cleans fields that are irrelevant for the archived execution attempt. */ @@ -766,19 +801,19 @@ public void prepareForArchiving() { if (!state.isTerminalState()) { throw new IllegalStateException("Can only archive the job from a terminal state"); } - + userClassLoader = null; - + for (ExecutionJobVertex vertex : verticesInCreationOrder) { vertex.prepareForArchiving(); } - + intermediateResults.clear(); currentExecutions.clear(); requiredJarFiles.clear(); jobStatusListenerActors.clear(); executionListenerActors.clear(); - + scheduler = null; parentContext = null; stateCheckpointerActor = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index acbc17addde33..69a21c75c59ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -244,7 +244,7 @@ public void connectToPredecessors(Map } this.inputs.add(ires); - + int consumerIndex = ires.registerConsumer(); for (int i = 0; i < parallelism; i++) { @@ -257,7 +257,7 @@ public void connectToPredecessors(Map //--------------------------------------------------------------------------------------------- // Actions //--------------------------------------------------------------------------------------------- - + public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { ExecutionVertex[] vertices = this.taskVertices; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a44fc6f49bc5a..95783cf3af200 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -220,7 +220,7 @@ public void setOperatorState(StateHandle operatorState) { public StateHandle getOperatorState() { return operatorState; } - + public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } @@ -498,6 +498,11 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { else { throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + "pipelined partitions."); + /** + * Scheduling of blocking partitions are triggered through the markFinished method in + * @see Execution + */ + } } @@ -539,24 +544,15 @@ void sendPartitionInfos() { * Returns all blocking result partitions whose receivers can be scheduled/updated. */ List finishAllBlockingPartitions() { - List finishedBlockingPartitions = null; + List finishedBlockingPartitions = new LinkedList(); for (IntermediateResultPartition partition : resultPartitions.values()) { if (partition.getResultType().isBlocking() && partition.markFinished()) { - if (finishedBlockingPartitions == null) { - finishedBlockingPartitions = new LinkedList(); - } - finishedBlockingPartitions.add(partition); } } - if (finishedBlockingPartitions == null) { - return Collections.emptyList(); - } - else { - return finishedBlockingPartitions; - } + return finishedBlockingPartitions; } // -------------------------------------------------------------------------------------------- @@ -631,6 +627,20 @@ subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConf // Utilities // -------------------------------------------------------------------------------------------- + /** + * Gets all IntermediateResultPartitions required by this Execution Vertex + * @return list of intermediate result partitions + */ + public List getInputs() { + List intermediateResultPartitions = new ArrayList(); + for (ExecutionEdge[] edgeList : inputEdges) { + for (ExecutionEdge edge : edgeList) { + intermediateResultPartitions.add(edge.getSource()); + } + } + return intermediateResultPartitions; + } + /** * Creates a simple name representation in the style 'taskname (x/y)', where * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 658a06b5feef3..fc21b84bdbe9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -108,7 +108,8 @@ public int registerConsumer() { numConsumers++; for (IntermediateResultPartition p : partitions) { - if (p.addConsumerGroup() != index) { + int consumerGroupIndex = p.addConsumerGroup(); + if (consumerGroupIndex != index && !resultType.isPersistent()) { throw new RuntimeException("Inconsistent consumer mapping between intermediate result partitions."); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 124ceb2b6bcd0..2829b3f010f7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -60,6 +61,19 @@ public IntermediateResultPartitionID getPartitionId() { return partitionId; } + public boolean isLocationAvailable() { + Instance instance = getLocation(); + return instance != null && instance.isAlive(); + } + + /** + * Last reported location of a ResultPartition for this IntermediateResultPartition + * @return Instance which contains the task manager location + */ + public Instance getLocation() { + return producer.getCurrentExecutionAttempt().getResultPartitionLocation(); + } + ResultPartitionType getResultType() { return totalResult.getResultType(); } @@ -76,8 +90,8 @@ int addConsumerGroup() { int pos = consumers.size(); // NOTE: currently we support only one consumer per result!!! - if (pos != 0) { - throw new RuntimeException("Currently, each intermediate result can only have one consumer."); + if (!getResultType().isPersistent() && pos > 0) { + throw new RuntimeException("Currently, each intermediate result can only have one consumer (for non-blocking result partitions)."); } consumers.add(new ArrayList()); @@ -88,6 +102,14 @@ void addConsumer(ExecutionEdge edge, int consumerNumber) { consumers.get(consumerNumber).add(edge); } + public int getNumConsumers() { + int numConsumers = 0; + for (List consumerGroup : consumers) { + numConsumers += consumerGroup.size(); + } + return numConsumers; + } + boolean markFinished() { // Sanity check that this is only called on blocking partitions. if (!getResultType().isBlocking()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index af55ebf4de228..868a37d22bf60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -167,6 +167,8 @@ public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorR LOG.debug("Starting result partition manager and network connection manager"); this.partitionManager = new ResultPartitionManager(); + // inject partition manager + this.networkBufferPool.setResultPartitionManager(partitionManager); this.taskEventDispatcher = new TaskEventDispatcher(); this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier( jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index cb1f118730f98..f0f7c97b96229 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,12 +54,23 @@ public class NetworkBufferPool implements BufferPoolFactory { private final Object factoryLock = new Object(); + /** + * Buffer pools that do NOT have a fixed memory size + */ private final Set managedBufferPools = new HashSet(); + /** + * All available pools (non-fixed and fixed memory size) + */ public final Set allBufferPools = new HashSet(); private int numTotalRequiredBuffers; + /** + * Reference to the ResultPartitionManager which can be called to give back memory segments. + */ + private ResultPartitionManager resultPartitionManager; + /** * Allocates all {@link MemorySegment} instances managed by this pool. */ @@ -102,6 +114,14 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { allocatedMb, availableMemorySegments.size(), segmentSize); } + /** + * Set the refrence to the ResultPartitionManager which is not available at constructor time + * @param resultPartitionManager + */ + public void setResultPartitionManager(ResultPartitionManager resultPartitionManager) { + this.resultPartitionManager = resultPartitionManager; + } + public MemorySegment requestMemorySegment() { return availableMemorySegments.poll(); } @@ -170,16 +190,25 @@ public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throw new IllegalStateException("Network buffer pool has already been destroyed."); } + // Ensure that the number of required buffers can be satisfied. // With dynamic memory management this should become obsolete. if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { - throw new IOException(String.format("Insufficient number of network buffers: " + - "required %d, but only %d available. The total number of network " + - "buffers is currently set to %d. You can increase this " + - "number by setting the configuration key '" + - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY + "'.", - numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers, - totalNumberOfMemorySegments)); + int remaining = totalNumberOfMemorySegments - numTotalRequiredBuffers; + // try to free the amount of memory segments required for the buffer pool + if (resultPartitionManager != null) { + resultPartitionManager.releaseLeastRecentlyUsedCachedPartitions(numRequiredBuffers - remaining); + } + // check again if we were able to free enough memory + if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) { + throw new IOException(String.format("Insufficient number of network buffers: " + + "required %d, but only %d available. The total number of network " + + "buffers is currently set to %d. You can increase this " + + "number by setting the configuration key '" + + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY + "'.", + numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers, + totalNumberOfMemorySegments)); + } } this.numTotalRequiredBuffers += numRequiredBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index f06c8fb077dc3..0a96ee90b5486 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -109,6 +109,9 @@ public class ResultPartition implements BufferPoolOwner { private boolean hasNotifiedPipelinedConsumers; + /** + * ResultPartition has been completely produced. + */ private boolean isFinished; // - Statistics ---------------------------------------------------------- @@ -212,6 +215,11 @@ public long getTotalNumberOfBytes() { return totalNumberOfBytes; } + public boolean isFinished() { + return isFinished; + } + + // ------------------------------------------------------------------------ /** @@ -404,4 +412,13 @@ private void notifyPipelinedConsumers() throws IOException { hasNotifiedPipelinedConsumers = true; } } + + /** + * Gets the type of the ResultPartition + * @return ResultPartitionType + */ + public ResultPartitionType getPartitionType() { + return partitionType; + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java index af2970d694df4..2c5694dcc0b9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java @@ -35,7 +35,7 @@ public final class ResultPartitionID implements Serializable { private final IntermediateResultPartitionID partitionId; - private final ExecutionAttemptID producerId; + private ExecutionAttemptID producerId; public ResultPartitionID() { this(new IntermediateResultPartitionID(), new ExecutionAttemptID()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index a666208b383a3..96957a9e115d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -28,6 +28,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import static com.google.common.base.Preconditions.checkState; @@ -40,9 +44,18 @@ public class ResultPartitionManager implements ResultPartitionProvider { private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class); + /** + * Table of running/finished ResultPartitions with corresponding IDs + */ public final Table registeredPartitions = HashBasedTable.create(); + /** + * Cached ResultPartitions which are used to resume/recover from + */ + private final HashMap cachedResultPartitions = + new LinkedHashMap(); + private boolean isShutdown; public void registerResultPartition(ResultPartition partition) throws IOException { @@ -88,7 +101,8 @@ public void releasePartitionsProducedBy(ExecutionAttemptID executionId) { registeredPartitions.row(executionId); for (ResultPartition partition : partitions.values()) { - partition.release(); + // move to cache if cachable + updateIntermediateResultPartitionCache(partition); } for (IntermediateResultPartitionID partitionId : ImmutableList @@ -101,6 +115,26 @@ public void releasePartitionsProducedBy(ExecutionAttemptID executionId) { } } + /** + * Moves a produced ResultPartition to a the cache where it can be retrieved and put back + * to the registeredPartitions later on. + * @param partition + */ + public void updateIntermediateResultPartitionCache(ResultPartition partition) { + synchronized (cachedResultPartitions) { + // cache only persistent results + if (partition.isFinished() && partition.getPartitionType().isPersistent()) { + IntermediateResultPartitionID intermediateResultPartitionID = partition.getPartitionId().getPartitionId(); + // remove if already registered + cachedResultPartitions.remove(intermediateResultPartitionID); + // add as most recently inserted element + cachedResultPartitions.put(intermediateResultPartitionID, partition); + } else { + partition.release(); + } + } + } + public void shutdown() { synchronized (registeredPartitions) { @@ -119,10 +153,50 @@ public void shutdown() { } } + /** + * Registers and pins a cached ResultPartition that holds the data for an IntermediateResultPartition. + * @param partitionID The IntermediateResultPartitionID to find a corresponding ResultPartition for. + * @param numConsumers The number of consumers that want to access the ResultPartition + * @return true if the registering/pinning succeeded, false otherwise. + */ + public boolean pinCachedResultPartition(IntermediateResultPartitionID partitionID, int numConsumers) { + synchronized (cachedResultPartitions) { + ResultPartition resultPartition = cachedResultPartitions.get(partitionID); + if (resultPartition != null) { + try { + // update its least recently used value + updateIntermediateResultPartitionCache(resultPartition); + + synchronized (registeredPartitions) { + if (!registeredPartitions.containsValue(resultPartition)) { + LOG.debug("Registered previously cached ResultPartition {}.", resultPartition); + registerResultPartition(resultPartition); + } + } + + for (int i = 0; i < numConsumers; i++) { + resultPartition.pin(); + } + + LOG.debug("Pinned the ResultPartition {} for the intermediate result {}.", resultPartition, partitionID); + return true; + } catch (IOException e) { + throw new IllegalStateException("Failed to pin the ResultPartition for the intermediate result partition " + partitionID); + } + } + return false; + } + } + + // ------------------------------------------------------------------------ // Notifications // ------------------------------------------------------------------------ + /** + * Notification from a @link{ResultPartition} when no pending references exist anymore + * @param partition + */ void onConsumedPartition(ResultPartition partition) { final ResultPartition previous; @@ -137,9 +211,46 @@ void onConsumedPartition(ResultPartition partition) { // Release the partition if it was successfully removed if (partition == previous) { - partition.release(); + // move to cache if cachable + updateIntermediateResultPartitionCache(partition); - LOG.debug("Released {}.", partition); + LOG.debug("Cached {}.", partition); + } + } + + /** + * Triggered by @link{NetworkBufferPool} when network buffers should be freed + * @param requiredBuffers The number of buffers that should be cleared. + */ + public boolean releaseLeastRecentlyUsedCachedPartitions (int requiredBuffers) { + synchronized (cachedResultPartitions) { + // make a list of ResultPartitions to release + List toBeReleased = new ArrayList(); + int numBuffersToBeFreed = 0; + + // traverse from least recently used cached ResultPartition + for (Map.Entry entry : cachedResultPartitions.entrySet()) { + ResultPartition cachedResult = entry.getValue(); + + synchronized (registeredPartitions) { + if (!registeredPartitions.containsValue(cachedResult)) { + if (numBuffersToBeFreed < requiredBuffers) { + toBeReleased.add(cachedResult); + numBuffersToBeFreed += cachedResult.getTotalNumberOfBuffers(); + } + } + } + + // check if we reached the desired number of buffers + if (numBuffersToBeFreed >= requiredBuffers) { + for (ResultPartition result : toBeReleased) { + result.release(); + cachedResultPartitions.remove(entry.getKey()); + } + return true; + } + } + return false; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index e2ce16e00a3a9..85e536695f9c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -71,6 +71,11 @@ protected void onConsumedSubpartition() { abstract public void finish() throws IOException; + /** + * Releases the memory of the SubPartiton. + * Should only be called by the parenting ResultPartition. + * @throws IOException + */ abstract public void release() throws IOException; abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 7ec24ac062dd1..66f87c63e0114 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -29,7 +29,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; import static com.google.common.base.Preconditions.checkNotNull; @@ -62,7 +64,7 @@ class SpillableSubpartition extends ResultSubpartition { private boolean isReleased; /** The read view to consume this subpartition. */ - private ResultSubpartitionView readView; + private Deque readView = new ArrayDeque(1); SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) { super(index, parent); @@ -110,7 +112,6 @@ public void finish() throws IOException { @Override public void release() throws IOException { - final ResultSubpartitionView view; synchronized (buffers) { if (isReleased) { @@ -130,16 +131,12 @@ public void release() throws IOException { spillWriter.closeAndDelete(); } - // Get the view... - view = readView; - readView = null; - isReleased = true; } // Release the view outside of the synchronized block - if (view != null) { - view.notifySubpartitionConsumed(); + if (!readView.isEmpty()) { + readView.pop().notifySubpartitionConsumed(); } } @@ -176,38 +173,35 @@ public ResultSubpartitionView createReadView(BufferProvider bufferProvider) thro "been finished."); } - if (readView != null) { - throw new IllegalStateException("Subpartition is being or already has been " + - "consumed, but we currently allow subpartitions to only be consumed once."); - } - // Spilled if closed and no outstanding write requests boolean isSpilled = spillWriter != null && (spillWriter.isClosed() || spillWriter.getNumberOfOutstandingRequests() == 0); if (isSpilled) { if (ioMode.isSynchronous()) { - readView = new SpilledSubpartitionViewSyncIO( - this, - bufferProvider.getMemorySegmentSize(), - spillWriter.getChannelID(), - 0); + readView.push(new SpilledSubpartitionViewSyncIO( + this, + bufferProvider.getMemorySegmentSize(), + spillWriter.getChannelID(), + 0) + ); } else { - readView = new SpilledSubpartitionViewAsyncIO( - this, - bufferProvider, - ioManager, - spillWriter.getChannelID(), - 0); + readView.push(new SpilledSubpartitionViewAsyncIO( + this, + bufferProvider, + ioManager, + spillWriter.getChannelID(), + 0) + ); } } else { - readView = new SpillableSubpartitionView( - this, bufferProvider, buffers.size(), ioMode); + readView.push(new SpillableSubpartitionView( + this, bufferProvider, buffers.size(), ioMode)); } - return readView; + return readView.peek(); } } @@ -215,7 +209,7 @@ public ResultSubpartitionView createReadView(BufferProvider bufferProvider) thro public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + "finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, !readView.isEmpty(), spillWriter != null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java index c9481553764ec..444e02f2072e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java @@ -52,6 +52,9 @@ public class AbstractJobVertex implements java.io.Serializable { /** List of edges with incoming data. One per Reader. */ private final ArrayList inputs = new ArrayList(); + /** Indicator whether this vertex reads directly from an intermediate result */ + private boolean resumesFromIntermediateResult = false; + /** Number of subtasks to split this task into at runtime.*/ private int parallelism = -1; @@ -364,7 +367,7 @@ public boolean isInputVertex() { public boolean isOutputVertex() { return this.results.isEmpty(); } - + public boolean hasNoConnectedInputs() { for (JobEdge edge : inputs) { if (!edge.isIdReference()) { @@ -374,6 +377,14 @@ public boolean hasNoConnectedInputs() { return true; } + + public void setResumeFromIntermediateResult() { + this.resumesFromIntermediateResult = true; + } + + public boolean resumesFromIntermediateResult() { + return this.resumesFromIntermediateResult; + } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java index 939f6c4df4ef2..2e491fe982e98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java @@ -34,7 +34,10 @@ public class JobEdge implements java.io.Serializable { /** The distribution pattern that should be used for this job edge. */ private final DistributionPattern distributionPattern; - /** The data set at the source of the edge, may be null if the edge is not yet connected*/ + /** + * The data set at the source of the edge, may be null if the edge is not yet connected + * or this edge belong to a Source + */ private IntermediateDataSet source; /** The id of the source intermediate data set */ @@ -111,9 +114,13 @@ public DistributionPattern getDistributionPattern(){ public IntermediateDataSetID getSourceId() { return sourceId; } - + + /** + * Returns true if this edge is simply referenced by an id and not by a produced intermediate result + * @return + */ public boolean isIdReference() { - return this.source == null; + return this.source == null && this.sourceId != null; } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 6d895f90b76f9..8774e76dcc2a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -303,7 +303,7 @@ public List getVerticesSortedTopologicallyFromSources() throw while (iter.hasNext()) { AbstractJobVertex vertex = iter.next(); - if (vertex.hasNoConnectedInputs()) { + if (vertex.isInputVertex() || vertex.resumesFromIntermediateResult()) { sorted.add(vertex); iter.remove(); } @@ -457,4 +457,14 @@ public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOExc } } } + + /** + * Sets the default parallelism of all job vertices in this JobGraph. + * @param parallelism + */ + public void setParallelism(int parallelism) { + for (AbstractJobVertex ejv : taskVertices.values()) { + ejv.setParallelism(parallelism); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 667a68e261cd6..43627b8ea982a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -43,7 +43,7 @@ public enum JobStatus { /** All of the job's tasks have successfully finished. */ FINISHED(true), - + /** The job is currently undergoing a reset and total restart */ RESTARTING(false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java index 330519dbb6f4b..d4d244889ea60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java @@ -25,6 +25,9 @@ public enum ScheduleMode { */ FROM_SOURCES, + /** + * Schedule tasks with backtracking from the sinks towards the sources. + */ BACKTRACKING, /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 9b2000bf29643..dc278ee690861 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -112,7 +112,7 @@ public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvail return (SimpleSlot) ret; } else { - throw new RuntimeException(); + throw new RuntimeException("Failed to schedule task immediately."); } } @@ -125,7 +125,7 @@ public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResource return (SlotAllocationFuture) ret; } else { - throw new RuntimeException(); + throw new RuntimeException("Failed to schedule task queued."); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 7e06e24634dee..8f62d925d9c4b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -47,7 +47,7 @@ import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} +import org.apache.flink.runtime.jobgraph.{ScheduleMode, JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -482,8 +482,9 @@ class JobManager(val flinkConfiguration: Configuration, executionGraph = currentJob match { case Some((graph, _)) if !graph.getState.isTerminalState => throw new Exception("Job still running") - case Some((graph, _)) if graph.getJobID == jobId => - // resume here + case Some((graph, _)) if graph.getJobID == jobId && + graph.getScheduleMode == ScheduleMode.BACKTRACKING => + graph.prepareForResuming() graph case _ => removeCurrentJob() diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index c81830c36b14e..cb6fe68abc1fc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -19,6 +19,8 @@ package org.apache.flink.runtime.messages import org.apache.flink.runtime.instance.InstanceID +import org.apache.flink.runtime.io.network.partition.ResultPartitionID +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID /** * Miscellaneous actor messages exchanged with the TaskManager. diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala index c8c57265cd51b..350b814f9b626 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.messages import org.apache.flink.runtime.deployment.{TaskDeploymentDescriptor, InputChannelDeploymentDescriptor} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID +import org.apache.flink.runtime.jobgraph.{IntermediateResultPartitionID, IntermediateDataSetID} import org.apache.flink.runtime.taskmanager.TaskExecutionState /** @@ -117,6 +117,21 @@ object TaskMessages { case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID) extends TaskMessage + /** + * Check availability of an intermediate result partition + * @param partitionID id of intermediate result to be locked + * @param numConsumers the number of consumers that want to access this intermediate result + */ + case class LockResultPartition(partitionID: IntermediateResultPartitionID, numConsumers: Int) + extends TaskMessage + + /** + * Pin/lock an ResultPartition for resuming + * @param locked true if ResultPartition for the requested IntermediateResultPartition + * is available + */ + case class LockResultPartitionReply(locked: Boolean) + extends TaskMessage // -------------------------------------------------------------------------- // Report Messages diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a6b9133034712..26d42ffa32bb1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -52,7 +52,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager +import org.apache.flink.runtime.jobgraph.{IntermediateResultPartitionID, IntermediateDataSetID} import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver} import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager} @@ -337,6 +338,14 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { } } + /** + * Pin a ResultPartition corresponding to an IntermediateResultPartition + */ + case LockResultPartition(partitionID, numConsumers) => + val partitionManager: ResultPartitionManager = this.network.getPartitionManager + val result: Boolean = partitionManager.pinCachedResultPartition(partitionID, numConsumers) + sender ! LockResultPartitionReply(result) + // notifies the TaskManager that the state of a task has changed. // the TaskManager informs the JobManager and cleans up in case the transition // was into a terminal state, or in case the JobManager cannot be informed of the diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 06f0e9dcce003..d88a9909583a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -78,10 +78,12 @@ public void testSlotReleasedWhenScheduledImmediately() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot + vertex.getCurrentExecutionAttempt().setScheduled(); vertex.scheduleForExecution(scheduler, false); // will have failed assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); + vertex.getCurrentExecutionAttempt().setScheduled(); assertTrue(slot.isReleased()); } catch (Exception e) { @@ -111,6 +113,7 @@ public void testSlotReleasedWhenScheduledQueued() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot + vertex.getCurrentExecutionAttempt().setScheduled(); vertex.scheduleForExecution(scheduler, true); // future has not yet a slot @@ -148,6 +151,7 @@ public void testScheduleToDeploying() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot + vertex.getCurrentExecutionAttempt().setScheduled(); vertex.scheduleForExecution(scheduler, false); assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java index 28862e8bbdb3a..2481655330583 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java @@ -128,5 +128,4 @@ public void testCreateDestroy() throws IOException { assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), second.getNumBuffers()); } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java new file mode 100644 index 0000000000000..545a7f04f1592 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManagerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + + +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertFalse; + +public class ResultPartitionManagerTest { + + @Test + public void testFreeCachedResultPartitions() { + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + + ResultPartition resultPartition1 = Mockito.mock(ResultPartition.class); + ResultPartition resultPartition2 = Mockito.mock(ResultPartition.class); + ResultPartition resultPartition3 = Mockito.mock(ResultPartition.class); + + ResultPartitionID partitionID1 = new ResultPartitionID(); + ResultPartitionID partitionID2 = new ResultPartitionID(); + ResultPartitionID partitionID3 = new ResultPartitionID(); + + Mockito.when(resultPartition1.getPartitionId()).thenReturn(partitionID1); + Mockito.when(resultPartition1.isFinished()).thenReturn(true); + Mockito.when(resultPartition1.getPartitionType()).thenReturn(ResultPartitionType.BLOCKING); + Mockito.when(resultPartition1.getTotalNumberOfBuffers()).thenReturn(5); + + + Mockito.when(resultPartition2.getPartitionId()).thenReturn(partitionID2); + Mockito.when(resultPartition2.isFinished()).thenReturn(true); + Mockito.when(resultPartition2.getPartitionType()).thenReturn(ResultPartitionType.BLOCKING); + Mockito.when(resultPartition2.getTotalNumberOfBuffers()).thenReturn(3); + + + Mockito.when(resultPartition3.getPartitionId()).thenReturn(partitionID3); + Mockito.when(resultPartition3.isFinished()).thenReturn(true); + Mockito.when(resultPartition3.getPartitionType()).thenReturn(ResultPartitionType.BLOCKING); + Mockito.when(resultPartition3.getTotalNumberOfBuffers()).thenReturn(2); + + try { + resultPartitionManager.registerResultPartition(resultPartition1); + resultPartitionManager.registerResultPartition(resultPartition2); + resultPartitionManager.registerResultPartition(resultPartition3); + } catch (IOException e) { + e.printStackTrace(); + fail("failed to register ResultPartition"); + } + + resultPartitionManager.releasePartitionsProducedBy(partitionID1.getProducerId()); + resultPartitionManager.releasePartitionsProducedBy(partitionID2.getProducerId()); + resultPartitionManager.releasePartitionsProducedBy(partitionID3.getProducerId()); + + assertFalse(resultPartitionManager.releaseLeastRecentlyUsedCachedPartitions(10000)); + assertFalse(resultPartitionManager.releaseLeastRecentlyUsedCachedPartitions(11)); + assertTrue(resultPartitionManager.releaseLeastRecentlyUsedCachedPartitions(10)); + assertFalse(resultPartitionManager.releaseLeastRecentlyUsedCachedPartitions(10)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BacktrackingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BacktrackingTest.java new file mode 100644 index 0000000000000..dd3bfebbe4a25 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BacktrackingTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceDiedException; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatVertex; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OutputFormatVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.*; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class BacktrackingTest { + + public static ActorSystem system; + + /** + * TaskManager stub which acknowledges/denies the availability of IntermediateResults + */ + public static class TestTaskManager extends UntypedActor { + + ActorRef testEnvironment; + private Set availableResults = new HashSet(); + + public TestTaskManager(ActorRef testEnvironment) { + this.testEnvironment = testEnvironment; + } + + @Override + public void onReceive(Object msg) throws Exception { + + if (msg instanceof TaskMessages.SubmitTask) { + //System.out.println("task submitted: " + msg); + testEnvironment.forward(msg, getContext()); + + } else if (msg instanceof IntermediateResultPartitionID) { + // collect all intermediate results ids sent by the testing system + availableResults.add((IntermediateResultPartitionID) msg); + + } else if (msg instanceof LockResultPartition) { + LockResultPartition unpacked = ((LockResultPartition) msg); + IntermediateResultPartitionID partitionID = unpacked.partitionID(); + System.out.println("intermediate partition requested: " + partitionID); + if (availableResults.contains(partitionID)) { + System.out.println("acknowledging result: " + partitionID); + getSender().tell(new LockResultPartitionReply(true), getSelf()); + } else { + System.out.println("denying result: " + partitionID); + getSender().tell(new LockResultPartitionReply(false), getSelf()); + } + testEnvironment.forward(msg, getContext()); + + } else { + System.out.println("Unknown msg " + msg); + } + } + + } + + @Before + public void setup(){ + system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); + TestingUtils.setCallingThreadDispatcher(system); + } + + @After + public void teardown(){ + TestingUtils.setGlobalExecutionContext(); + JavaTestKit.shutdownActorSystem(system); + } + + private AbstractJobVertex createNode(String name) { + AbstractJobVertex abstractJobVertex = new AbstractJobVertex(name); + abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + return abstractJobVertex; + } + + private AbstractJobVertex createOutputNode(String name) { + AbstractJobVertex abstractJobVertex = new OutputFormatVertex(name); + abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + return abstractJobVertex; + } + + private AbstractJobVertex createInputNode(String name) { + AbstractJobVertex abstractJobVertex = new InputFormatVertex(name); + abstractJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + return abstractJobVertex; + } + + @Test + public void testBacktrackingIntermediateResults() throws InstanceDiedException, NoResourceAvailableException { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + // JobVertex which has intermediate results available + final AbstractJobVertex resumePoint; + + /* + sink1 sink2 + O O + ^ ^ + ´ ` ´ ` + ´ ` ´ ` + _´ `_ _´ `_ + O O O O <---- resume starts here + ^ ^ ^ ^ + ` ´ ` ´ + ` ´ ` ´ + `_´ `_´ + O O <----- result available + ^ ^ + ` ´ + ` ´ + ` ´ + ` ´ + ` ´ + `_´ + O + source + */ + + // topologically sorted list + final List list = new ArrayList(); + + final AbstractJobVertex source = createInputNode("source1"); + list.add(source); + + AbstractJobVertex node1 = createOutputNode("sink1"); + { + AbstractJobVertex child1 = createNode("sink1-child1"); + AbstractJobVertex child2 = createNode("sink1-child2"); + node1.connectNewDataSetAsInput(child1, DistributionPattern.ALL_TO_ALL); + node1.connectNewDataSetAsInput(child2, DistributionPattern.ALL_TO_ALL); + + AbstractJobVertex child1child2child = createNode("sink1-child1-child2-child"); + child1.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL); + child2.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL); + + child1child2child.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL); + + list.add(child1child2child); + list.add(child1); + list.add(child2); + } + + AbstractJobVertex node2 = createOutputNode("sink2"); + final AbstractJobVertex child1 = createNode("sink2-child1"); + final AbstractJobVertex child2 = createNode("sink2-child2"); + node2.connectNewDataSetAsInput(child1, DistributionPattern.ALL_TO_ALL); + node2.connectNewDataSetAsInput(child2, DistributionPattern.ALL_TO_ALL); + + // resume from this node + AbstractJobVertex child1child2child = createNode("sink1-child1-child2-child"); + resumePoint = child1child2child; + + child1.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL); + child2.connectNewDataSetAsInput(child1child2child, DistributionPattern.ALL_TO_ALL); + + child1child2child.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL); + + list.add(child1child2child); + list.add(child1); + list.add(child2); + + list.add(node1); + list.add(node2); + + final ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + + new JavaTestKit(system) { + { + + final Props props = Props.create(TestTaskManager.class, getRef()); + final ActorRef taskManagerActor = system.actorOf(props); + + eg.setScheduleMode(ScheduleMode.BACKTRACKING); + + try { + eg.attachJobGraph(list); + } catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // list if of partition that should be requested + final Set requestedPartitions = new HashSet(); + + for (ExecutionJobVertex ejv : eg.getAllVertices().values()) { + for (IntermediateResult result : ejv.getInputs()) { + for (IntermediateResultPartition partition : result.getPartitions()) { + if (result.getProducer().getJobVertex() == resumePoint) { + // mock an instance + Instance mockInstance = Mockito.mock(Instance.class); + Mockito.when(mockInstance.isAlive()).thenReturn(true); + Mockito.when(mockInstance.getTaskManager()).thenReturn(taskManagerActor); + InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo(); + Mockito.when(mockInstance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo); + // set the mock as a location + partition.getProducer().getCurrentExecutionAttempt().setResultPartitionLocation(mockInstance); + // send the tests actor ref to the TestTaskManager to receive messages + taskManagerActor.tell(partition.getPartitionId(), getRef()); + requestedPartitions.add(partition.getPartitionId()); + } + } + } + } + + final Set schedulePoints = new HashSet(); + // list of execution vertices to be scheduled + schedulePoints.add(child1.getID()); + schedulePoints.add(child2.getID()); + schedulePoints.add(source.getID()); + + final Scheduler scheduler = new Scheduler();//Mockito.mock(Scheduler.class); + + Instance i1 = null; + try { + i1 = ExecutionGraphTestUtils.getInstance(taskManagerActor, 10); + } catch (Exception e) { + e.printStackTrace(); + fail("Couldn't get instance: " + e.getMessage()); + } + + scheduler.newInstanceAvailable(i1); + + try { + eg.scheduleForExecution(scheduler); + } catch (JobException e) { + e.printStackTrace(); + fail(); + } + + new Within(duration("5 seconds")) { + @Override + protected void run() { + Object[] messages = receiveN(schedulePoints.size() + requestedPartitions.size()); + for (Object msg : messages) { + if (msg instanceof LockResultPartition) { + LockResultPartition msg1 = (LockResultPartition) msg; + assertTrue("Available partition should have been requested.", + requestedPartitions.contains(msg1.partitionID())); + } else if (msg instanceof SubmitTask) { + SubmitTask msg1 = (SubmitTask) msg; + assertTrue("These nodes should have been scheduled", + schedulePoints.contains(msg1.tasks().getVertexID())); + } else { + fail("Unexpected message"); + } + } + } + }; + + } + }; + } + + @Test + public void testMassiveExecutionGraphExecutionVertexScheduling() { + + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + final int numSinks = 10; + final int depth = 50; + final int parallelism = 100; + + //Random rand = new Random(System.currentTimeMillis()); + + LinkedList allNodes = new LinkedList(); + + for (int s = 0; s < numSinks; s++) { + AbstractJobVertex node = new OutputFormatVertex("sink" + s); + node.setInvokableClass(Tasks.NoOpInvokable.class); + + //node.setParallelism(rand.nextInt(maxParallelism) + 1); + node.setParallelism(parallelism); + allNodes.addLast(node); + + for (int i = 0; i < depth; i++) { + AbstractJobVertex other = new AbstractJobVertex("vertex" + i + " sink" + s); + other.setParallelism(parallelism); + other.setInvokableClass(Tasks.NoOpInvokable.class); + node.connectNewDataSetAsInput(other, DistributionPattern.ALL_TO_ALL); + allNodes.addFirst(other); + node = other; + } + + } + + final ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + + eg.setScheduleMode(ScheduleMode.BACKTRACKING); + + try { + eg.attachJobGraph(allNodes); + } catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + new JavaTestKit(system) { + { + + final Props props = Props.create(TestTaskManager.class, getRef()); + final ActorRef taskManagerActor = system.actorOf(props); + + + Scheduler scheduler = new Scheduler(); + + Instance i1 = null; + try { + i1 = ExecutionGraphTestUtils.getInstance(taskManagerActor, numSinks * parallelism); + } catch (Exception e) { + e.printStackTrace(); + } + + scheduler.newInstanceAvailable(i1); + + try { + eg.scheduleForExecution(scheduler); + } catch (JobException e) { + e.printStackTrace(); + fail("Failed to schedule ExecutionGraph"); + } + + for (int i=0; i < numSinks * parallelism; i++) { + // all sources should be scheduled + expectMsgClass(duration("1 second"), TaskMessages.SubmitTask.class); + } + + } + }; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 776184ce40044..6086fb241c5a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -67,7 +67,6 @@ public static Instance getRandomInstance(int numSlots) { return new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, numSlots); } - public static Execution getDummyTask() { ExecutionVertex vertex = mock(ExecutionVertex.class); when(vertex.getJobId()).thenReturn(new JobID()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 056ed3707003c..8303217e10766 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -271,7 +271,41 @@ protected void run() { } }}; } - + + @Test + public void testPartitionReject() { + new JavaTestKit(system){{ + + ActorRef jobManager = null; + ActorRef taskManager = null; + jobManager = system.actorOf(Props.create(SimpleJobManager.class)); + taskManager = createTaskManager(jobManager); + + // send a non-existing partition id to the task manager + IntermediateResultPartitionID partitionID = new IntermediateResultPartitionID(); + taskManager.tell( + new TaskMessages.LockResultPartition(partitionID, 1), + getRef()); + + expectMsgEquals(duration("1 second"), + new TaskMessages.LockResultPartitionReply(false)); + }}; + } + + @Test + public void testPartitionAcknowledge() { + new JavaTestKit(system){{ + + ActorRef jobManager = null; + ActorRef taskManager = null; + jobManager = system.actorOf(Props.create(SimpleJobManager.class)); + taskManager = createTaskManager(jobManager); + + + }}; + } + + @Test public void testGateChannelEdgeMismatch() { new JavaTestKit(system){{ diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index c628954b66412..a8848a0a361d2 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -25,8 +25,11 @@ import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} -import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.ExecutionGraphMessages.{ExecutionStateChanged, +JobStatusChanged} +import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers import org.apache.flink.runtime.messages.Messages.Disconnect +import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect @@ -55,6 +58,9 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { val waitForJobStatus = scala.collection.mutable.HashMap[JobID, collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() + /** For ResumeItCase */ + val waitForTaskScheduled = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + var disconnectDisabled = false abstract override def receiveWithLogMessages: Receive = { @@ -179,9 +185,6 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { waitForJobStatus.remove(jobID) } - case DisableDisconnect => - disconnectDisabled = true - case msg: Disconnect => if (!disconnectDisabled) { super.receiveWithLogMessages(msg) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index bb0c1f90b25b2..0a3c1a409134a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -25,9 +25,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.instance.InstanceConnectionInfo import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.Messages.Disconnect -import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, UnregisterTask} +import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, LockResultPartition, UpdateTaskExecutionState, UnregisterTask} import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, TaskManager} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect @@ -58,6 +59,10 @@ class TestingTaskManager(config: TaskManagerConfiguration, val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]() + val waitForResultPartitionChanges = + scala.collection.mutable.HashMap[IntermediateResultPartitionID, Set[ActorRef]]() + val waitForSubmitTask = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + var disconnectDisabled = false @@ -94,7 +99,19 @@ class TestingTaskManager(config: TaskManagerConfiguration, waitForRemoval += (executionID -> (set + sender)) } } - + + case NotifyWhenTaskSubmitted(jobID: JobID) => + val set = waitForSubmitTask.getOrElse(jobID, Set()) + waitForSubmitTask += (jobID -> (set + sender)) + + case msg: SubmitTask => + super.receiveWithLogMessages(msg) + waitForSubmitTask.get(msg.tasks.getJobID) match { + case Some(actors) => (actors) foreach (_ ! msg) + case None => + } + + case UnregisterTask(executionID) => super.receiveWithLogMessages(UnregisterTask(executionID)) waitForRemoval.remove(executionID) match { @@ -177,5 +194,17 @@ class TestingTaskManager(config: TaskManagerConfiguration, _ foreach (_ ! true) } } + + case NotifyWhenResultPartitionChanges(id: IntermediateResultPartitionID) => + val set = waitForResultPartitionChanges.getOrElse(id, Set()) + waitForResultPartitionChanges += (id -> (set + sender)) + + case msg: LockResultPartition => + super.receiveWithLogMessages(msg) + waitForResultPartitionChanges.remove(msg.partitionID) match { + case Some(actors) => for(actor <- actors) actor ! ResultPartitionLocked(msg.partitionID) + case None => + } + } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala index c9a2f73d892b3..9558de19ed2bf 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -20,7 +20,8 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionAttemptID} +import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID} import org.apache.flink.runtime.taskmanager.Task /** @@ -51,7 +52,15 @@ object TestingTaskManagerMessages { case class NotifyWhenJobManagerTerminated(jobManager: ActorRef) case class JobManagerTerminated(jobManager: ActorRef) - + + /** For ResumeITCase */ + case class NotifyWhenTaskSubmitted(jobID: JobID) + + case class NotifyWhenResultPartitionChanges(partitionID: IntermediateResultPartitionID) + + case class ResultPartitionCached(partitionID: IntermediateResultPartitionID) + case class ResultPartitionLocked(partitionID: IntermediateResultPartitionID) + // -------------------------------------------------------------------------- // Utility methods to allow simpler case object access from Java // -------------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/resume/ResumeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/resume/ResumeITCase.java new file mode 100644 index 0000000000000..1ff491d4d7e5a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/resume/ResumeITCase.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.resume; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.messages.JobManagerMessages.*; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.*; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.types.IntValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import akka.actor.Status.Success; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class ResumeITCase { + + private static int PARALLELISM = 10; + ActorSystem system; + + @Before + public void setup(){ + system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); + TestingUtils.setCallingThreadDispatcher(system); + } + + @After + public void teardown(){ + TestingUtils.setGlobalExecutionContext(); + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public void testResumeFromIntermediateResults() { + + final JobID jobId = new JobID(); + final String jobName = "Resume from intermediate result"; + final Configuration cfg = new Configuration(); + + /* Simple test to resume from an intermediate result cached as a ResultPartition at a task manager + + O receiver receiver O O sink + ^ | / + | |/ + ■ intermediate result ■ intermediate result + ^ | + | | + O source O source + + */ + + AbstractJobVertex source = new AbstractJobVertex("source"); + source.setInvokableClass(Sender.class); + + final IntermediateDataSet intermediateResult = source.createAndAddResultDataSet(ResultPartitionType.BLOCKING); + + final AbstractJobVertex receiver = new AbstractJobVertex("receiver"); + receiver.setInvokableClass(Receiver.class); + receiver.connectDataSetAsInput(intermediateResult, DistributionPattern.ALL_TO_ALL); + + // first job graph + final JobGraph jobGraph1 = new JobGraph(jobId, jobName, source, receiver); + jobGraph1.setScheduleMode(ScheduleMode.BACKTRACKING); + jobGraph1.setParallelism(PARALLELISM); + + final AbstractJobVertex sink = new AbstractJobVertex("sink"); + sink.setInvokableClass(Receiver.class); + sink.setResumeFromIntermediateResult(); + + sink.connectDataSetAsInput(intermediateResult, DistributionPattern.ALL_TO_ALL); + + // second job graph + final JobGraph jobGraph2 = new JobGraph(jobId, jobName, sink); + jobGraph2.setScheduleMode(ScheduleMode.BACKTRACKING); + jobGraph2.setParallelism(PARALLELISM); + + ForkableFlinkMiniCluster cluster = ForkableFlinkMiniCluster.startCluster(2 * PARALLELISM, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + + final ActorRef jobManager = cluster.getJobManager(); + final ActorRef taskManager = cluster.getTaskManagersAsJava().get(0); + + + new JavaTestKit(system) {{ + new Within(TestingUtils.TESTING_DURATION()) { + @Override + protected void run() { + taskManager.tell(new TestingTaskManagerMessages.NotifyWhenTaskSubmitted(jobId), getRef()); + + jobManager.tell(new SubmitJob(jobGraph1, false), getRef()); + expectMsgClass(Success.class); + + // TODO check caching of intermediate result + //expectMsgEquals(new TestingTaskManagerMessages.ResultPartitionCached(intermediateResult.getId())); + + for (int i = 0; i < PARALLELISM * 2; i++) { + expectMsgClass(TaskMessages.SubmitTask.class); + } + + expectMsgClass(JobResultSuccess.class); + + // request execution graph + FiniteDuration t = new FiniteDuration(5, TimeUnit.SECONDS); + Future future = Patterns.ask(jobManager, new RequestExecutionGraph(jobId), new Timeout(t)); + ExecutionGraph executionGraph = null; + try { + executionGraph = ((ExecutionGraphFound)Await.result(future, t)).executionGraph(); + } catch (Exception e) { + e.printStackTrace(); + fail("Failed to get ExecutionGraph of first job."); + } + + // list of intermediate result partitions that should be reused + Set resultsToBeAcknowledged = new HashSet(); + for (IntermediateResult result : executionGraph.getAllIntermediateResults().values()) { + if(result.getId().equals(intermediateResult.getId())) { + for (IntermediateResultPartition partition : result.getPartitions()) { + IntermediateResultPartitionID partitionId = partition.getPartitionId(); + resultsToBeAcknowledged.add(partitionId); + // call back for partition changes + taskManager.tell(new TestingTaskManagerMessages.NotifyWhenResultPartitionChanges(partitionId), getRef()); + } + } + } + + // submit second job and register for execution graph changes + jobManager.tell(new SubmitJob(jobGraph2, false), getRef()); + expectMsgClass(Success.class); + + + for (Object msg : receiveN(PARALLELISM * 2)) { + + if (msg instanceof TestingTaskManagerMessages.ResultPartitionLocked) { + assertTrue("Locked ResultPartitions should be correct.", + resultsToBeAcknowledged.contains(((TestingTaskManagerMessages.ResultPartitionLocked) msg).partitionID())); + } else if (msg instanceof TaskMessages.SubmitTask) { + assertTrue("Scheduled vertices should be correct.", + ((TaskMessages.SubmitTask) msg).tasks().getVertexID().equals(sink.getID())); + } else { + fail("Unknown message " + msg); + } + + } + + expectMsgClass(JobResultSuccess.class); + + } + }; + }}; + + } + + public static class Sender extends AbstractInvokable { + + private RecordWriter writer; + + @Override + public void registerInputOutput() { + writer = new RecordWriter(getEnvironment().getWriter(0)); + } + + @Override + public void invoke() throws Exception { + + try { + for (int i=0; i < PARALLELISM; i++) { + writer.emit(new IntValue(23)); + } + writer.flush(); + } + finally { + writer.clearBuffers(); + } + } + } + + public static class Receiver extends AbstractInvokable { + + private RecordReader reader; + + @Override + public void registerInputOutput() { + reader = new RecordReader( + getEnvironment().getInputGate(0), + IntValue.class); + } + + @Override + public void invoke() throws Exception { + try { + IntValue record; + int numValues = 0; + + while ((record = reader.next()) != null) { + record.getValue(); + numValues++; + } + + assertTrue("There should be no more values available.", numValues == PARALLELISM); + } + finally { + reader.clearBuffers(); + } + } + } + +} diff --git a/pom.xml b/pom.xml index cb3190895f1de..2a51e11ac71d9 100644 --- a/pom.xml +++ b/pom.xml @@ -732,7 +732,7 @@ under the License. flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text - + flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/** **/flink-bin/conf/slaves