-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[scheduling] implement backtracking of intermediate results #640
Conversation
cc0d3e0
to
9ac127e
Compare
if (visitedPartition) { | ||
availableInputs++; | ||
} | ||
pendingInputs.remove(ir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I know, it's unsafe to modify a collection while iterating over it if you don't use the remove method of the iterator directly.
Quote from JavaDoc: Iterator.remove() "The behavior of an iterator is unspecified if the underlying collection is modified while the iteration is in progress in any way other than by calling this method."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, that's not good practice. I changed that.
4d17eff
to
c036c1a
Compare
); | ||
|
||
/** BEGIN Asynchronous callback **/ | ||
future.onComplete(new OnComplete<Object>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an asynchronous callback but what we're effectively doing is to execute the LockResultPartition
for each producer sequentially, right? This can take for large degrees of parallelism quite some time because it is dop * lockTime
instead of lockTime
if we executed the LockResultPartition
in parallel.
Why not doing all the LockResultPartition
calls in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback is asynchronous, to prevent the job manager from blocking or waiting in the dispatcher threads. You are right that the locking would be faster to execute in parallel. To not having to deal with the concurrency related issues, I chose not to implement it in parallel. If I think about it now, it is probably not too hard to change that.
c036c1a
to
4b615b2
Compare
The backtracking looks good @mxm. I have some remarks concerning the way the locking of the partition on the TMs works. At the moment this happens sequentially, meaning that for each Furthermore, we could think about doing the backtracking in parallel. This could also speed up the scheduling process. |
Thank you for your valuable comments @tillrohrmann. I haven't worked with Akka's future compositions but it seems a sophisticated way to parallelize and combine the actor replies. |
4b615b2
to
d5f678d
Compare
d5f678d
to
11e9b12
Compare
168e5d7
to
bdd78e5
Compare
For batch programs, we currently schedule all tasks which are sources and let them kick off the execution of the connected tasks. This approach bears some problems when executing large dataflows with many branches. With backtracking, we traverse the execution graph output-centrically (from the sinks) in a depth-first manner. This enables us to use resources differently. In the course of backtracking, only tasks will be executed that are required to supply inputs to the current task. When a job is newly submitted, this means that the backtracking will reach the sources. When the job has been previously executed and intermediate results are available, old ResultPartitions to resume from can be requested while backtracking. Backtracking is disabled by default. It can be enabled by setting the ScheduleMode in JobGraph to BACKTRACKING. CHANGELOG - new scheduling mode: backtracking - backtracks from the sinks of an ExecutionGraph - checks the availability of IntermediatePartitionResults - marks ExecutionVertex to be scheduled - caches ResultPartitions and reloads them - resumes from intermediate results - test for general behavior of backtracking (BacktrackingTest) - test for resuming from an intermediate result (ResumeITCase) - test for releasing of cached ResultPartitions (ResultPartitionManagerTest) - allow multiple consumers per blocking intermediate result (batch)
bdd78e5
to
81c0078
Compare
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended session test WIP
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended session test WIP
This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition> | ||
registeredPartitions = HashBasedTable.create(); | ||
|
||
/** | ||
* Cached ResultPartitions which are used to resume/recover from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment that the LinkedHashMap implements a LRU policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two unused classes LRUCache and LRUCacheMap, which can be removed. I think this solution is fine. :)
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis if interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis if interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
commit ceb2d57 Author: Maximilian Michels <[email protected]> Date: Thu Jun 18 16:38:09 2015 +0200 [FLINK-2097] [core] Finalize session management commit 30f78f0 Author: Stephan Ewen <[email protected]> Date: Fri May 29 14:35:33 2015 +0200 [FLINK-2097] [core] Improve session management. - The Client manages only connections to the JobManager, it is not job specific - Executors provide a more explicit life cycle and methods to start new sessions - Sessions are handled by the environments - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope commit b6bb34e Author: Maximilian Michels <[email protected]> Date: Wed May 13 17:06:47 2015 +0200 [FLINK-2097] [core] Implement job session management. Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis if interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
commit ceb2d57 Author: Maximilian Michels <[email protected]> Date: Thu Jun 18 16:38:09 2015 +0200 [FLINK-2097] [core] Finalize session management commit 30f78f0 Author: Stephan Ewen <[email protected]> Date: Fri May 29 14:35:33 2015 +0200 [FLINK-2097] [core] Improve session management. - The Client manages only connections to the JobManager, it is not job specific - Executors provide a more explicit life cycle and methods to start new sessions - Sessions are handled by the environments - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope commit b6bb34e Author: Maximilian Michels <[email protected]> Date: Wed May 13 17:06:47 2015 +0200 [FLINK-2097] [core] Implement job session management. Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis if interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended
ui jo commit ceb2d57 Author: Maximilian Michels <[email protected]> Date: Thu Jun 18 16:38:09 2015 +0200 [FLINK-2097] [core] Finalize session management commit 30f78f0 Author: Stephan Ewen <[email protected]> Date: Fri May 29 14:35:33 2015 +0200 [FLINK-2097] [core] Improve session management. - The Client manages only connections to the JobManager, it is not job specific - Executors provide a more explicit life cycle and methods to start new sessions - Sessions are handled by the environments - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope commit b6bb34e Author: Maximilian Michels <[email protected]> Date: Wed May 13 17:06:47 2015 +0200 [FLINK-2097] [core] Implement job session management. Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis if interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended major fix next ok OK ok ok3 bla ok
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
While this may provide some help for future implementations, the code is outdated and needs an overhaul. Closing for now. |
For batch programs, we currently schedule all tasks which are sources
and let them kick off the execution of the connected tasks. This
approach bears some problems when executing large dataflows with many
branches. With backtracking, we traverse the execution graph
output-centrically (from the sinks) in a depth-first manner. This
enables us to use resources differently.
In the course of backtracking, only tasks will be executed that are
required to supply inputs to the current task. When a job is newly
submitted, this means that the backtracking will reach the
sources. When the job has been previously executed and intermediate
results are available, old ResultPartitions to resume from can be
requested while backtracking.
Backtracking is disabled by default. It can be enabled by setting the
ScheduleMode in JobGraph to BACKTRACKING.
CHANGELOG