From ed7b0b1bea84a10ee45d10343f239cd183659a74 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 2 Jun 2020 14:04:00 +0200 Subject: [PATCH] [FLINK-18050][task][checkpointing] Use CloseableIterator to write ResultSubpartition state Currently, buffers passed to ChannelStateWriterImpl can be recycled twice: once in normal case after writing; second in CheckpointInProgressRequest.cancel (called from ChannelStateWriteRequestDispatcher and other places). This change prevents this by using CloseableIterator which distinguishes used and unused elements. --- .../channel/ChannelStateWriteRequest.java | 22 ++++-- .../channel/ChannelStateWriterImpl.java | 23 +------ ...elStateWriteRequestDispatcherImplTest.java | 69 +++++++++++++++++++ ...hannelStateWriteRequestDispatcherTest.java | 2 +- 4 files changed, 87 insertions(+), 29 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java index 084869867fca3..13242a4d31e19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.ThrowingConsumer; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +32,7 @@ import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.EXECUTING; import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.FAILED; import static org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.NEW; +import static org.apache.flink.util.CloseableIterator.ofElements; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,8 +50,20 @@ static CheckpointInProgressRequest completeOutput(long checkpointId) { } static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, CloseableIterator iterator) { + return buildWriteRequest(checkpointId, "writeInput", iterator, (writer, buffer) -> writer.writeInput(info, buffer)); + } + + static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer... buffers) { + return buildWriteRequest(checkpointId, "writeOutput", ofElements(Buffer::recycleBuffer, buffers), (writer, buffer) -> writer.writeOutput(info, buffer)); + } + + static ChannelStateWriteRequest buildWriteRequest( + long checkpointId, + String name, + CloseableIterator iterator, + BiConsumerWithException bufferConsumer) { return new CheckpointInProgressRequest( - "writeInput", + name, checkpointId, writer -> { while (iterator.hasNext()) { @@ -60,17 +74,13 @@ static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, buffer.recycleBuffer(); throw e; } - writer.writeInput(info, buffer); + bufferConsumer.accept(writer, buffer); } }, throwable -> iterator.close(), false); } - static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer... flinkBuffers) { - return new CheckpointInProgressRequest("writeOutput", checkpointId, writer -> writer.writeOutput(info, flinkBuffers), recycle(flinkBuffers), false); - } - static ChannelStateWriteRequest start(long checkpointId, ChannelStateWriteResult targetResult, CheckpointStorageLocationReference locationReference) { return new CheckpointStartRequest(checkpointId, targetResult, locationReference); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index fbb8ebca1fb15..e6aa9dca2ca97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -129,7 +129,7 @@ public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int st info, startSeqNum, data == null ? 0 : data.length); - enqueue(write(checkpointId, info, checkBufferType(data)), false); + enqueue(write(checkpointId, info, data), false); } @Override @@ -196,27 +196,6 @@ private void enqueue(ChannelStateWriteRequest request, boolean atTheFront) { } } - private static Buffer[] checkBufferType(Buffer... data) { - if (data == null) { - return new Buffer[0]; - } - try { - for (Buffer buffer : data) { - if (!buffer.isBuffer()) { - throw new IllegalArgumentException(buildBufferTypeErrorMessage(buffer)); - } - } - } catch (Exception e) { - for (Buffer buffer : data) { - if (buffer.isBuffer()) { - buffer.recycleBuffer(); - } - } - throw e; - } - return data; - } - private static String buildBufferTypeErrorMessage(Buffer buffer) { try { AbstractEvent event = EventSerializer.fromBuffer(buffer, ChannelStateWriterImpl.class.getClassLoader()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java new file mode 100644 index 0000000000000..8f1d02f45743c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; + +import org.junit.Test; + +import java.util.function.Function; + +import static org.apache.flink.util.CloseableIterator.ofElements; +import static org.junit.Assert.assertTrue; + +/** + * {@link ChannelStateWriteRequestDispatcherImpl} test. + */ +public class ChannelStateWriteRequestDispatcherImplTest { + + @Test + public void testPartialInputChannelStateWrite() throws Exception { + testBuffersRecycled(buffers -> ChannelStateWriteRequest.write(1L, new InputChannelInfo(1, 2), ofElements(Buffer::recycleBuffer, buffers))); + } + + @Test + public void testPartialResultSubpartitionStateWrite() throws Exception { + testBuffersRecycled(buffers -> ChannelStateWriteRequest.write(1L, new ResultSubpartitionInfo(1, 2), buffers)); + } + + private void testBuffersRecycled(Function requestBuilder) throws Exception { + ChannelStateWriteRequestDispatcher dispatcher = new ChannelStateWriteRequestDispatcherImpl(new MemoryBackendCheckpointStorage(new JobID(), null, null, 1), new ChannelStateSerializerImpl()); + ChannelStateWriteResult result = new ChannelStateWriteResult(); + dispatcher.dispatch(ChannelStateWriteRequest.start(1L, result, CheckpointStorageLocationReference.getDefault())); + + result.getResultSubpartitionStateHandles().completeExceptionally(new TestException()); + result.getInputChannelStateHandles().completeExceptionally(new TestException()); + + NetworkBuffer[] buffers = new NetworkBuffer[]{buffer(), buffer()}; + dispatcher.dispatch(requestBuilder.apply(buffers)); + for (NetworkBuffer buffer : buffers) { + assertTrue(buffer.isRecycled()); + } + } + + private NetworkBuffer buffer() { + return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(10), FreeingBufferRecycler.INSTANCE); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java index 00c8ca75d1cbc..9bb6748c16894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java @@ -95,7 +95,7 @@ private static ChannelStateWriteRequest writeIn() { } private static ChannelStateWriteRequest writeOut() { - return write(CHECKPOINT_ID, new ResultSubpartitionInfo(1, 1)); + return write(CHECKPOINT_ID, new ResultSubpartitionInfo(1, 1), new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1), FreeingBufferRecycler.INSTANCE)); } private static CheckpointStartRequest start() {