Skip to content

Commit

Permalink
[FLINK-16587][task] Moving event creation from OperatorChain to Subta…
Browse files Browse the repository at this point in the history
…skCheckpointCoordinatorImpl.
  • Loading branch information
Arvid Heise authored and pnowojski committed Apr 17, 2020
1 parent 4538321 commit 825cb25
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -249,17 +247,9 @@ public void toggleStreamStatus(StreamStatus status) {
}
}

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
public void broadcastEvent(AbstractEvent event) throws IOException {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}

public void broadcastCheckpointCancelMarker(long id) throws IOException {
CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
streamOutput.broadcastEvent(event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
Expand Down Expand Up @@ -99,7 +101,7 @@ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, Operato
env.declineCheckpoint(checkpointId, cause);

// notify all downstream operators that they should not wait for a barrier from us
actionExecutor.runThrowing(() -> operatorChain.broadcastCheckpointCancelMarker(checkpointId));
actionExecutor.runThrowing(() -> operatorChain.broadcastEvent(new CancelCheckpointMarker(checkpointId)));
}

@Override
Expand Down Expand Up @@ -133,7 +135,8 @@ public void checkpointState(
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());

// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options));

// Step (3): Take the state snapshot. This should be largely asynchronous, to not impact progress of the streaming topology

Expand Down

0 comments on commit 825cb25

Please sign in to comment.