From f2dd4b8500a82532dae17087c227ce34e1aeac9b Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 2 Jun 2020 14:10:16 +0200 Subject: [PATCH] [FLINK-18050][task][checkpointing] Simplify ChannelStateCheckpointWriter interface --- .../channel/ChannelStateCheckpointWriter.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java index 9643d4cc4c754..8ce8fc01a65d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java @@ -108,15 +108,15 @@ class ChannelStateCheckpointWriter { runWithChecks(() -> serializer.writeHeader(dataStream)); } - void writeInput(InputChannelInfo info, Buffer... flinkBuffers) throws Exception { - write(inputChannelOffsets, info, flinkBuffers, !allInputsReceived); + void writeInput(InputChannelInfo info, Buffer buffer) throws Exception { + write(inputChannelOffsets, info, buffer, !allInputsReceived); } - void writeOutput(ResultSubpartitionInfo info, Buffer... flinkBuffers) throws Exception { - write(resultSubpartitionOffsets, info, flinkBuffers, !allOutputsReceived); + void writeOutput(ResultSubpartitionInfo info, Buffer buffer) throws Exception { + write(resultSubpartitionOffsets, info, buffer, !allOutputsReceived); } - private void write(Map offsets, K key, Buffer[] flinkBuffers, boolean precondition) throws Exception { + private void write(Map offsets, K key, Buffer buffer, boolean precondition) throws Exception { try { if (result.isDone()) { return; @@ -124,16 +124,14 @@ private void write(Map offsets, K key, Buffer[] fli runWithChecks(() -> { checkState(precondition); long offset = checkpointStream.getPos(); - serializer.writeData(dataStream, flinkBuffers); + serializer.writeData(dataStream, buffer); long size = checkpointStream.getPos() - offset; offsets .computeIfAbsent(key, unused -> new StateContentMetaInfo()) .withDataAdded(offset, size); }); } finally { - for (Buffer flinkBuffer : flinkBuffers) { - flinkBuffer.recycleBuffer(); - } + buffer.recycleBuffer(); } }