-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-2097] Implement job session management #858
Conversation
783a72a
to
b43b9f6
Compare
I just had a look at the JobManager in a different context and thought about the following, which might be relevant here: when submitting a new JobGraph, which is attached to an existing ExecutionGraph, some ExecutionGraph state is overwritten by the new JobGraph. With some you might run into (maybe) unexpected behaviour like resetting number of left execution retries or creating a new CheckpointCoordinator for the ExecutionGraph. What's the intended behaviour of attaching to an existing ExecutionGraph? Is there an implicit assumption that the existing ExecutionGraph needs to be finished already? |
I think right now, it pretty much behaves as if someone started a new job, with the "grown" execution graph. |
It should just add more nodes to the ExecutionGraph. Existing ones should not be modified. For batch, I think the assumption is that it needs to be finished. For streaming, I could also picture attaching nodes at runtime but this has to be carefully implemented.. |
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
I've ported this pull request to the latest master. It was a lot more work than I anticipated because some classes had diverged significantly and merging them was a bit hard. Due to some refactoring, the changes have grown quite large again and I know that makes reviewing hard. Despite that, I wouldn't delay merging this pull request much further. We can disable the session management until it is integrated with the rest of the system (intermediate results) by throwing an exception on the interface methods. If we decide later, that we want to delay this feature, we could also remove the session code. In that case, it would still make sense to merge this pull request because it contains a lot of nice refactoring. With the session management in place, we can reuse already computed intermediate results with not too much effort. Actually, only some API changes are remaining to expose the session management to the user in production. |
cluster.stop() | ||
} | ||
} | ||
// "detect a lost connection to the JobManager and try to reconnect to it" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-enable the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Could you elaborate a little bit on what you refactored and which components would be important to review? |
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Of course! The following classes have been refactored in the course of integrating them with the session management: Client
CliFrontend
ExecutionEnvironment, LocalEnvironment, RemoteEnvironment
PlanExecutor, LocalExecutor, RemoteExecutor
JobManager
Future issues:
|
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Thanks Max for the detailed description. On Wed, Sep 9, 2015 at 11:12 AM, Max [email protected] wrote:
|
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
I've rebased again...If nobody objects, I will merge this soon. The new API-facing methods on |
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions. This pull request implements a rudimentary session management. Together with the backtracking apache#640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes apache#858.
This is a joint effort by @StephanEwen and me to introduce a session management in Flink. Session are used to keep a copy of the ExecutionGraph in the job manager for the session lifetime. It is important that the ExecutionGraph is not kept around longer because it consumes a lot of memory. Its intermediate results can also be freed. To integrate sessions properly into Flink, some refactoring was necessary. In particular these are:
With the session management, we will be able to properly support backtracking of produced intermediate results. This makes calls to count()/collect()/print() efficient and enables to write incremental/interactive jobs.