Skip to content

Commit

Permalink
[FLINK-18050][task][checkpointing] Simplify ChannelStateCheckpointWri…
Browse files Browse the repository at this point in the history
…ter interface
  • Loading branch information
rkhachatryan authored and zhijiangW committed Jun 8, 2020
1 parent ed7b0b1 commit f2dd4b8
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,30 @@ class ChannelStateCheckpointWriter {
runWithChecks(() -> serializer.writeHeader(dataStream));
}

void writeInput(InputChannelInfo info, Buffer... flinkBuffers) throws Exception {
write(inputChannelOffsets, info, flinkBuffers, !allInputsReceived);
void writeInput(InputChannelInfo info, Buffer buffer) throws Exception {
write(inputChannelOffsets, info, buffer, !allInputsReceived);
}

void writeOutput(ResultSubpartitionInfo info, Buffer... flinkBuffers) throws Exception {
write(resultSubpartitionOffsets, info, flinkBuffers, !allOutputsReceived);
void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws Exception {
write(resultSubpartitionOffsets, info, buffer, !allOutputsReceived);
}

private <K> void write(Map<K, StateContentMetaInfo> offsets, K key, Buffer[] flinkBuffers, boolean precondition) throws Exception {
private <K> void write(Map<K, StateContentMetaInfo> offsets, K key, Buffer buffer, boolean precondition) throws Exception {
try {
if (result.isDone()) {
return;
}
runWithChecks(() -> {
checkState(precondition);
long offset = checkpointStream.getPos();
serializer.writeData(dataStream, flinkBuffers);
serializer.writeData(dataStream, buffer);
long size = checkpointStream.getPos() - offset;
offsets
.computeIfAbsent(key, unused -> new StateContentMetaInfo())
.withDataAdded(offset, size);
});
} finally {
for (Buffer flinkBuffer : flinkBuffers) {
flinkBuffer.recycleBuffer();
}
buffer.recycleBuffer();
}
}

Expand Down

0 comments on commit f2dd4b8

Please sign in to comment.