Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2097] Implement job session management #858

Closed
wants to merge 2 commits into from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Jun 22, 2015

This is a joint effort by @StephanEwen and me to introduce a session management in Flink. Session are used to keep a copy of the ExecutionGraph in the job manager for the session lifetime. It is important that the ExecutionGraph is not kept around longer because it consumes a lot of memory. Its intermediate results can also be freed. To integrate sessions properly into Flink, some refactoring was necessary. In particular these are:

  • JobId is created through the ExecutionEnvironment and passed through
  • Sessions can be termined by the ExecutionEnvironment or directly through the executor
  • Session are cancelled implicitly through "reapers" or shutdown hooks in the ExecutionEnvironment, otherwise they time out
  • LocalExecutor and RemoteExecutor manage sessions
  • The Client only deals with the communication with the job manager and is agnostic of session management

With the session management, we will be able to properly support backtracking of produced intermediate results. This makes calls to count()/collect()/print() efficient and enables to write incremental/interactive jobs.

@uce
Copy link
Contributor

uce commented Jul 22, 2015

I just had a look at the JobManager in a different context and thought about the following, which might be relevant here: when submitting a new JobGraph, which is attached to an existing ExecutionGraph, some ExecutionGraph state is overwritten by the new JobGraph. With some you might run into (maybe) unexpected behaviour like resetting number of left execution retries or creating a new CheckpointCoordinator for the ExecutionGraph.

What's the intended behaviour of attaching to an existing ExecutionGraph? Is there an implicit assumption that the existing ExecutionGraph needs to be finished already?

@StephanEwen
Copy link
Contributor

I think right now, it pretty much behaves as if someone started a new job, with the "grown" execution graph.

@mxm
Copy link
Contributor Author

mxm commented Jul 22, 2015

It should just add more nodes to the ExecutionGraph. Existing ones should not be modified. For batch, I think the assumption is that it needs to be finished. For streaming, I could also picture attaching nodes at runtime but this has to be carefully implemented..

mxm added a commit to mxm/flink that referenced this pull request Sep 8, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 8, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@mxm
Copy link
Contributor Author

mxm commented Sep 8, 2015

I've ported this pull request to the latest master. It was a lot more work than I anticipated because some classes had diverged significantly and merging them was a bit hard.

Due to some refactoring, the changes have grown quite large again and I know that makes reviewing hard. Despite that, I wouldn't delay merging this pull request much further. We can disable the session management until it is integrated with the rest of the system (intermediate results) by throwing an exception on the interface methods. If we decide later, that we want to delay this feature, we could also remove the session code. In that case, it would still make sense to merge this pull request because it contains a lot of nice refactoring.

With the session management in place, we can reuse already computed intermediate results with not too much effort. Actually, only some API changes are remaining to expose the session management to the user in production.

cluster.stop()
}
}
// "detect a lost connection to the JobManager and try to reconnect to it" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-enable the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

mxm added a commit to mxm/flink that referenced this pull request Sep 9, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 9, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@tillrohrmann
Copy link
Contributor

Could you elaborate a little bit on what you refactored and which components would be important to review?

mxm added a commit to mxm/flink that referenced this pull request Sep 9, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@mxm
Copy link
Contributor Author

mxm commented Sep 9, 2015

Of course! The following classes have been refactored in the course of integrating them with the session management:

Client

  • Establish connection to JobManager on creation
  • Refactor run method into runBlocking and runDetached
  • Extract helper classes to generate the Plan
  • Make Optimizer and JobGraph generation methods static
  • Pass ClassLoader correctly (do not keep one per Client but rather let it be passed before submission)

CliFrontend

  • runBlocking and runDetached methods by analogy with the Client class

ExecutionEnvironment, LocalEnvironment, RemoteEnvironment

  • modified abstract class to support sessions (timeout and jobID generation)
  • handle session management via Reapers and ShutdownHooks

PlanExecutor, LocalExecutor, RemoteExecutor

  • modified interface
  • support session termination
  • set JobID on Plan

JobManager

  • keep ExecutionGraph as long as session has not expired

Future issues:

  • Support for sessions in streaming. Currently streaming jobs are agnostic of sessions.
  • Representation of sessions in the JobManager web frontend. How do we represent updates to the ExecutionGraph in sessions?
  • Build features on top of session management (e.g. intermediate results)

mxm added a commit to mxm/flink that referenced this pull request Sep 9, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@tillrohrmann
Copy link
Contributor

Thanks Max for the detailed description.

On Wed, Sep 9, 2015 at 11:12 AM, Max [email protected] wrote:

Of course! The following classes have been refactored in the course of
integrating them with the session management:

Client

  • Establish connection to JobManager on creation
  • Refactor run method into runBlocking and runDetached
  • Extract helper classes to generate the Plan
  • Make Optimizer and JobGraph generation methods static
  • Pass ClassLoader correctly (do not keep one per Client but rather
    let it be passed before submission)

CliFrontend

  • runBlocking and runDetached methods by analogy with the Client class

ExecutionEnvironment, LocalEnvironment, RemoteEnvironment

  • modified abstract class to support sessions (timeout and jobID
    generation)
  • handle session management via Reapers and ShutdownHooks

PlanExecutor, LocalExecutor, RemoteExecutor

  • modified interface
  • support session termination
  • set JobID on Plan

JobManager

  • keep ExecutionGraph as long as session has not expired

Future issues:

  • Support for sessions in streaming. Currently streaming jobs are
    agnostic of sessions.
  • Representation of sessions in the JobManager web frontend. How do we
    represent updates to the ExecutionGraph in sessions?
  • Build features on top of session management (e.g. intermediate
    results)


Reply to this email directly or view it on GitHub
#858 (comment).

mxm added a commit to mxm/flink that referenced this pull request Sep 10, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 10, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 15, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 15, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@mxm
Copy link
Contributor Author

mxm commented Sep 15, 2015

I've rebased again...If nobody objects, I will merge this soon. The new API-facing methods on ExecutionEnvironment will be disabled until we implement first applications of session management. I've added a separate commit that does that.

mxm added a commit to mxm/flink that referenced this pull request Sep 15, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
mxm added a commit to mxm/flink that referenced this pull request Sep 21, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
@asfgit asfgit closed this in 71bf2f5 Sep 22, 2015
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
lofifnc pushed a commit to lofifnc/flink that referenced this pull request Oct 8, 2015
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants