Skip to content

Commit

Permalink
[FLINK-5407] Handle snapshoting null-operator in chain
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter authored and aljoscha committed Jan 12, 2017
1 parent fc343e0 commit 9c6eb57
Showing 1 changed file with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,15 @@ public void run() {
List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size());

for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) {
operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
if (null != snapshotInProgress) {
operatorStatesBackend.add(
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
operatorStatesStream.add(
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
} else {
operatorStatesBackend.add(null);
operatorStatesStream.add(null);
}
}

final long asyncEndNanos = System.nanoTime();
Expand Down Expand Up @@ -950,7 +957,9 @@ public void run() {
public void close() {
// cleanup/release ongoing snapshot operations
for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
snapshotResult.cancel();
if (null != snapshotResult) {
snapshotResult.cancel();
}
}
}
}
Expand Down Expand Up @@ -995,14 +1004,7 @@ public void executeCheckpointing() throws Exception {
try {

for (StreamOperator<?> op : allOperators) {

createStreamFactory(op);
snapshotNonPartitionableState(op);

OperatorSnapshotResult snapshotInProgress =
op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory);

snapshotInProgressList.add(snapshotInProgress);
checkpointStreamOperator(op);
}

if (LOG.isDebugEnabled()) {
Expand All @@ -1029,7 +1031,9 @@ public void executeCheckpointing() throws Exception {
if (failed) {
// Cleanup to release resources
for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
operatorSnapshotResult.cancel();
if (null != operatorSnapshotResult) {
operatorSnapshotResult.cancel();
}
}

if (LOG.isDebugEnabled()) {
Expand All @@ -1039,7 +1043,24 @@ public void executeCheckpointing() throws Exception {
}
}
}
}

private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
createStreamFactory(op);
snapshotNonPartitionableState(op);

OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
streamFactory);

snapshotInProgressList.add(snapshotInProgress);
} else {
nonPartitionedStates.add(null);
OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
snapshotInProgressList.add(emptySnapshotInProgress);
}
}

private void createStreamFactory(StreamOperator<?> operator) throws IOException {
Expand Down

0 comments on commit 9c6eb57

Please sign in to comment.