Skip to content

Commit

Permalink
[FLINK-12529][runtime] Release record-deserializer buffers timely to …
Browse files Browse the repository at this point in the history
…improve the usage efficiency of heap on taskmanager (apache#8471)

This pull request releases the buffer of the corresponding record deserializer after receiving EndOfPartitionEvent from the input channel to improve the efficiency of heap memory usage on taskmanager.
  • Loading branch information
sunhaibotb authored and pnowojski committed Aug 16, 2019
1 parent 3e0e030 commit 7ed0df0
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -81,6 +82,21 @@ public StreamTaskNetworkInput(
this.inputIndex = inputIndex;
}

@VisibleForTesting
StreamTaskNetworkInput(
CheckpointedInputGate checkpointedInputGate,
TypeSerializer<?> inputSerializer,
IOManager ioManager,
int inputIndex,
RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers) {

this.checkpointedInputGate = checkpointedInputGate;
this.deserializationDelegate = new NonReusingDeserializationDelegate<>(
new StreamElementSerializer<>(inputSerializer));
this.recordDeserializers = recordDeserializers;
this.inputIndex = inputIndex;
}

@Override
@Nullable
public StreamElement pollNextNullable() throws Exception {
Expand Down Expand Up @@ -119,6 +135,9 @@ private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOExceptio
if (bufferOrEvent.isBuffer()) {
lastChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[lastChannel];
checkState(currentRecordDeserializer != null,
"currentRecordDeserializer has already been released");

currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
Expand All @@ -128,6 +147,10 @@ private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOExceptio
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}

// release the record deserializer immediately,
// which is very valuable in case of bounded stream
releaseDeserializer(bufferOrEvent.getChannelIndex());
}
}

Expand Down Expand Up @@ -156,15 +179,26 @@ public CompletableFuture<?> isAvailable() {

@Override
public void close() throws IOException {
// clear the buffers. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
// release the deserializers . this part should not ever fail
for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
releaseDeserializer(channelIndex);
}

// cleanup the resources of the checkpointed input gate
checkpointedInputGate.cleanup();
}

private void releaseDeserializer(int channelIndex) {
RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
if (deserializer != null) {
// recycle buffers and clear the deserializer.
Buffer buffer = deserializer.getCurrentBuffer();
if (buffer != null && !buffer.isRecycled()) {
buffer.recycleBuffer();
}
deserializer.clear();
}

checkpointedInputGate.cleanup();
recordDeserializers[channelIndex] = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,13 @@ private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws Exception
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
int channelIndex = bufferOrEvent.getChannelIndex();

handleEndOfPartitionEvent(bufferOrEvent.getChannelIndex());
handleEndOfPartitionEvent(channelIndex);

// release the record deserializer immediately,
// which is very valuable in case of bounded stream
releaseDeserializer(channelIndex);
}
}

Expand Down Expand Up @@ -361,17 +366,27 @@ private void handleEndOfPartitionEvent(int channelIndex) throws Exception {

@Override
public void close() throws IOException {
// clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
// release the deserializers first. this part should not ever fail
for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
releaseDeserializer(channelIndex);
}

// cleanup the barrier handler resources
barrierHandler.cleanup();
}

private void releaseDeserializer(int channelIndex) {
RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
if (deserializer != null) {
// recycle buffers and clear the deserializer.
Buffer buffer = deserializer.getCurrentBuffer();
if (buffer != null && !buffer.isRecycled()) {
buffer.recycleBuffer();
}
deserializer.clear();
}

// cleanup the barrier handler resources
barrierHandler.cleanup();
recordDeserializers[channelIndex] = null;
}
}

private class ForwardingValveOutputHandler1 implements StatusWatermarkValve.ValveOutputHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
Expand All @@ -36,11 +40,13 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -81,6 +87,39 @@ public void testIsAvailableWithBufferedDataInDeserializer() throws Exception {
assertHasNextElement(input);
}

@Test
public void testReleasingDeserializerTimely()
throws Exception {

int numInputChannels = 2;
LongSerializer inSerializer = LongSerializer.INSTANCE;
StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate<>(numInputChannels, 1024, inSerializer);

TestRecordDeserializer[] deserializers = new TestRecordDeserializer[numInputChannels];
for (int i = 0; i < deserializers.length; i++) {
deserializers[i] = new TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths());
}

TestRecordDeserializer[] copiedDeserializers = Arrays.copyOf(deserializers, deserializers.length);
StreamTaskNetworkInput input = new StreamTaskNetworkInput(
new CheckpointedInputGate(
inputGate.getInputGate(),
new EmptyBufferStorage(),
new CheckpointBarrierTracker(1)),
inSerializer,
ioManager,
0,
deserializers);

for (int i = 0; i < numInputChannels; i++) {
assertNotNull(deserializers[i]);
inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, i);
input.pollNextNullable();
assertNull(deserializers[i]);
assertTrue(copiedDeserializers[i].isCleared());
}
}

private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
RecordSerializer<SerializationDelegate<StreamElement>> serializer = new SpanningRecordSerializer<>();
SerializationDelegate<StreamElement> serializationDelegate =
Expand All @@ -98,4 +137,23 @@ private static void assertHasNextElement(StreamTaskNetworkInput input) throws Ex
assertNotNull(element);
assertTrue(element.isRecord());
}

private static class TestRecordDeserializer
extends SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>> {

private boolean cleared = false;

public TestRecordDeserializer(String[] tmpDirectories) {
super(tmpDirectories);
}

@Override
public void clear() {
cleared = true;
}

public boolean isCleared() {
return cleared;
}
}
}

0 comments on commit 7ed0df0

Please sign in to comment.