Skip to content

Commit

Permalink
[FLINK-17467][task][checkpointing] Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan authored and pnowojski committed May 14, 2020
1 parent 2c34a8c commit d1db7be
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex)

// -- general code path for multiple input channels --

if (numBarriersReceived > 0) {
if (isCheckpointPending()) {
// this is only true if some alignment is already progress and was not canceled

if (barrierId == currentCheckpointId) {
Expand Down Expand Up @@ -243,7 +243,7 @@ public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) thr

// -- general code path for multiple input channels --

if (numBarriersReceived > 0) {
if (isCheckpointPending()) {
// this is only true if some alignment is in progress and nothing was canceled

if (barrierId == currentCheckpointId) {
Expand Down Expand Up @@ -303,7 +303,7 @@ else if (barrierId > currentCheckpointId) {
public void processEndOfPartition() throws Exception {
numClosedChannels++;

if (numBarriersReceived > 0) {
if (isCheckpointPending()) {
// let the task know we skip a checkpoint
notifyAbort(currentCheckpointId,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {

@Override
public void releaseBlocksAndResetBarriers() {
if (numBarrierConsumed > 0) {
if (isCheckpointPending()) {
// make sure no additional data is persisted
Arrays.fill(hasInflightBuffers, false);
// the next barrier that comes must assume it is the first
Expand Down Expand Up @@ -150,7 +150,7 @@ public void processBarrier(
CheckpointBarrier receivedBarrier,
int channelIndex) throws Exception {
long barrierId = receivedBarrier.getId();
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) {
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
Expand All @@ -173,11 +173,11 @@ public void processBarrier(
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
final long barrierId = cancelBarrier.getCheckpointId();

if (currentConsumedCheckpointId >= barrierId && numBarrierConsumed == 0) {
if (currentConsumedCheckpointId >= barrierId && !isCheckpointPending()) {
return;
}

if (numBarrierConsumed > 0) {
if (isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
Expand All @@ -196,7 +196,7 @@ public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) thr
public void processEndOfPartition() throws Exception {
threadSafeUnaligner.onChannelClosed();

if (numBarrierConsumed > 0) {
if (isCheckpointPending()) {
// let the task know we skip a checkpoint
notifyAbort(
currentConsumedCheckpointId,
Expand Down

0 comments on commit d1db7be

Please sign in to comment.