Skip to content

Commit

Permalink
[FLINK-23527][core] Clarify semantics of SourceFunction.cancel() with…
Browse files Browse the repository at this point in the history
… respect to thread interruptions.
  • Loading branch information
StephanEwen committed Sep 7, 2021
1 parent 650cfcf commit 9136c40
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,27 @@ public interface SourceFunction<T> extends Function, Serializable {
* <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
* {@code false} in this method. That flag is checked in the loop condition.
*
* <p>When a source is canceled, the executing thread will also be interrupted (via {@link
* Thread#interrupt()}). The interruption happens strictly after this method has been called, so
* any interruption handler can rely on the fact that this method has completed. It is good
* practice to make any flags altered by this method "volatile", in order to guarantee the
* visibility of the effects of this method to any interruption handler.
* <p>In case of an ungraceful shutdown (cancellation of the source operator, possibly for
* failover), the thread that calls {@link #run(SourceContext)} will also be {@link
* Thread#interrupt() interrupted}) by the Flink runtime, in order to speed up the cancellation
* (to ensure threads exit blocking methods fast, like I/O, blocking queues, etc.). The
* interruption happens strictly after this method has been called, so any interruption handler
* can rely on the fact that this method has completed (for example to ignore exceptions that
* happen after cancellation).
*
* <p>During graceful shutdown (for example stopping a job with a savepoint), the program must
* cleanly exit the {@link #run(SourceContext)} method soon after this method was called. The
* Flink runtime will NOT interrupt the source thread during graceful shutdown. Source
* implementors must ensure that no thread interruption happens on any thread that emits records
* through the {@code SourceContext} from the {@link #run(SourceContext)} method; otherwise the
* clean shutdown may fail when threads are interrupted while processing the final records.
*
* <p>Because the {@code SourceFunction} cannot easily differentiate whether the shutdown should
* be graceful or ungraceful, we recommend that implementors refrain from interrupting any
* threads that interact with the {@code SourceContext} at all. You can rely on the Flink
* runtime to interrupt the source thread in case of ungraceful cancellation. Any additionally
* spawned threads that directly emit records through the {@code SourceContext} should use a
* shutdown method that does not rely on thread interruption.
*/
void cancel();

Expand Down

0 comments on commit 9136c40

Please sign in to comment.