Skip to content

Commit

Permalink
[FLINK-14894][core][mem] Do not explicitly release unsafe memory when…
Browse files Browse the repository at this point in the history
… managed segment is freed

The conclusion at the moment is that releasing unsafe memory, while potentially having a link to it in Java code, is dangerous. We revert this to rely only on GC when there are no links in Java code. The problem can happen e.g. if task thread exits w/o joining with IO threads (e.g. spilling in batch job) then the unsafe memory is released but it can be written w/o segfault by IO thread. At the same time, other task can allocate interleaving memory which can be spoiled by that IO thread. We still keep it unsafe to allocate it outside of JVM direct memory limit to not interfere with direct allocations, also it does not make sense for RocksDB native memory (also accounted in MemoryManager) to be part of direct memory limit.

The potential downside can be that over-allocating of unsafe memory will not hit the direct limit and will not cause GC immediately which will be the only way to release it. In this case, it can cause out-of-memory failures w/o triggering GC to release a lot of potentially already unused memory.

If we see the delayed release as a problem then we can investigate further optimisations, like:
- directly monitoring phantom reference queue of the cleaner (if JVM detects quickly that there are no more reference to the memory) and explicitly release memory ready for GC asap, e.g. after Task exit
- monitor allocated memory amount and block allocation until GC releases occupied memory instead of failing with out-of-memory immediately

This closes apache#10940.
  • Loading branch information
azagrebin authored and tillrohrmann committed Jan 24, 2020
1 parent 90f638c commit 6dcaae0
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ public final class HybridMemorySegment extends MemorySegment {
@Nullable
private final ByteBuffer offHeapBuffer;

/** The cleaner is called to free the underlying native memory. */
@Nullable
private final Runnable cleaner;

/**
* Creates a new memory segment that represents the memory backing the given direct byte buffer.
* Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
Expand All @@ -70,13 +66,11 @@ public final class HybridMemorySegment extends MemorySegment {
*
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
* @param cleaner optional action to run upon freeing the segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable Runnable cleaner) {
HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
this.cleaner = cleaner;
}

/**
Expand All @@ -90,7 +84,6 @@ public final class HybridMemorySegment extends MemorySegment {
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
this.cleaner = null;
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -133,14 +126,6 @@ public ByteBuffer wrap(int offset, int length) {
}
}

@Override
public void free() {
super.free();
if (cleaner != null) {
cleaner.run();
}
}

// ------------------------------------------------------------------------
// Random Access get() and put() methods
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static MemorySegment allocateUnpooledOffHeapMemory(int size) {
*/
public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
ByteBuffer memory = ByteBuffer.allocateDirect(size);
return new HybridMemorySegment(memory, owner, null);
return new HybridMemorySegment(memory, owner);
}

/**
Expand All @@ -112,7 +112,8 @@ public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner
public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
return new HybridMemorySegment(offHeapBuffer, owner, MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address));
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address);
return new HybridMemorySegment(offHeapBuffer, owner);
}

/**
Expand All @@ -126,7 +127,7 @@ public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner)
* @return A new memory segment representing the given off-heap memory.
*/
public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
return new HybridMemorySegment(memory, null, null);
return new HybridMemorySegment(memory, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void testHybridHeapNullBuffer2() {

@Test(expected = NullPointerException.class)
public void testHybridOffHeapNullBuffer2() {
new HybridMemorySegment(null, new Object(), () -> {});
new HybridMemorySegment((ByteBuffer) null, new Object());
}

@Test(expected = IllegalArgumentException.class)
public void testHybridNonDirectBuffer() {
new HybridMemorySegment(ByteBuffer.allocate(1024), new Object(), () -> {});
new HybridMemorySegment(ByteBuffer.allocate(1024), new Object());
}

@Test(expected = IllegalArgumentException.class)
Expand Down

0 comments on commit 6dcaae0

Please sign in to comment.