Skip to content

Commit

Permalink
[hotfix] [network] Release unpooled buffer for events.
Browse files Browse the repository at this point in the history
So far, these buffers never needed to be released, because they do not come from a buffer pool.
They were simply garbage collected.

When changing the blocking partitions to use memory mapped files, these buffer were refering
for a short time to an unmapped memory region (after the partition is released). Because the buffers
were not accessed any more by any code, it did not matter when regularly running Flink.

But, it did segfault the JVM when attaching a debugger and exploring just that part of the code.
This happens because the debugger calls toString() on the buffer object as part of its rendering of the current
stack frame. The toString() method access the buffer contents, which is an unmapped region of memory,
and boom!
  • Loading branch information
StephanEwen committed May 10, 2019
1 parent 541e413 commit 826daab
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,13 @@ private BufferOrEvent transformToBufferOrEvent(
return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
final AbstractEvent event;
try {
event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
}
finally {
buffer.recycleBuffer();
}

if (event.getClass() == EndOfPartitionEvent.class) {
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public void testBasicGetNextLogic() throws Exception {

// Return null when the input gate has received all end-of-partition events
assertTrue(inputGate.isFinished());

for (TestInputChannel ic : inputChannels) {
ic.assertReturnedEventsAreRecycled();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -43,6 +46,8 @@ public class TestInputChannel extends InputChannel {

private final Queue<BufferAndAvailabilityProvider> buffers = new ConcurrentLinkedQueue<>();

private final Collection<Buffer> allReturnedBuffers = new ArrayList<>();

private BufferAndAvailabilityProvider lastProvider = null;

private boolean isReleased = false;
Expand Down Expand Up @@ -125,7 +130,9 @@ Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedE

if (provider != null) {
lastProvider = provider;
return provider.getBufferAvailability();
Optional<BufferAndAvailability> baa = provider.getBufferAvailability();
baa.ifPresent((v) -> allReturnedBuffers.add(v.buffer()));
return baa;
} else if (lastProvider != null) {
return lastProvider.getBufferAvailability();
} else {
Expand Down Expand Up @@ -162,6 +169,29 @@ protected void notifyChannelNonEmpty() {

}

public void assertReturnedDataBuffersAreRecycled() {
assertReturnedBuffersAreRecycled(true, false);
}

public void assertReturnedEventsAreRecycled() {
assertReturnedBuffersAreRecycled(false, true);
}

public void assertAllReturnedBuffersAreRecycled() {
assertReturnedBuffersAreRecycled(true, true);
}

private void assertReturnedBuffersAreRecycled(boolean assertBuffers, boolean assertEvents) {
for (Buffer b : allReturnedBuffers) {
if (b.isBuffer() && assertBuffers && !b.isRecycled()) {
fail("Data Buffer " + b + " not recycled");
}
if (!b.isBuffer() && assertEvents && !b.isRecycled()) {
fail("Event Buffer " + b + " not recycled");
}
}
}

interface BufferAndAvailabilityProvider {
Optional<BufferAndAvailability> getBufferAvailability() throws IOException, InterruptedException;
}
Expand Down

0 comments on commit 826daab

Please sign in to comment.