Skip to content

Commit

Permalink
[FLINK-7310] [core] always use the HybridMemorySegment
Browse files Browse the repository at this point in the history
Since we'd like to use our own off-heap buffers for network communication, we
cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment.
We thus drop any code that loads the HeapMemorySegment (it is still available
if needed) in favour of the HybridMemorySegment which is able to work on both
heap and off-heap memory.

For the performance penalty of this change compared to using HeapMemorySegment
alone, see this interesting blob article (from 2015):
https://flink.apache.org/news/2015/09/16/off-heap-memory.html

This closes apache#4445
  • Loading branch information
Nico Kruber authored and StephanEwen committed Oct 6, 2017
1 parent c3235c3 commit e14caef
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* <p>Note that memory segments should usually not be allocated manually, but rather through the
* {@link MemorySegmentFactory}.
*/
@SuppressWarnings("unused")
@Internal
public final class HeapMemorySegment extends MemorySegment {

Expand Down Expand Up @@ -177,29 +178,44 @@ public final void put(int offset, ByteBuffer source, int numBytes) {
* A memory segment factory that produces heap memory segments. Note that this factory does not
* support to allocate off-heap memory.
*/
public static final class HeapMemorySegmentFactory implements MemorySegmentFactory.Factory {
public static final class HeapMemorySegmentFactory {

@Override
/**
* Creates a new memory segment that targets the given heap memory region.
*
* @param memory The heap memory region.
* @return A new memory segment that targets the given heap memory region.
*/
public HeapMemorySegment wrap(byte[] memory) {
return new HeapMemorySegment(memory);
}

@Override
/**
* Allocates some unpooled memory and creates a new memory segment that represents
* that memory.
*
* @param size The size of the memory segment to allocate.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment, backed by unpooled heap memory.
*/
public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HeapMemorySegment(new byte[size], owner);
}

@Override
/**
* Creates a memory segment that wraps the given byte array.
*
* <p>This method is intended to be used for components which pool memory and create
* memory segments around long-lived memory regions.
*
* @param memory The heap memory to be represented by the memory segment.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment representing the given heap memory.
*/
public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HeapMemorySegment(memory, owner);
}

@Override
public HeapMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
throw new UnsupportedOperationException(
"The MemorySegment factory was not initialized for off-heap memory.");
}

/**
* Prevent external instantiation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public ByteBuffer wrap(int offset, int length) {
// ------------------------------------------------------------------------

@Override
public byte get(int index) {
public final byte get(int index) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
return UNSAFE.getByte(heapMemory, pos);
Expand All @@ -173,7 +173,7 @@ else if (address > addressLimit) {
}

@Override
public void put(int index, byte b) {
public final void put(int index, byte b) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
UNSAFE.putByte(heapMemory, pos, b);
Expand All @@ -188,17 +188,17 @@ else if (address > addressLimit) {
}

@Override
public void get(int index, byte[] dst) {
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}

@Override
public void put(int index, byte[] src) {
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}

@Override
public void get(int index, byte[] dst, int offset, int length) {
public final void get(int index, byte[] dst, int offset, int length) {
// check the byte array offset and length and the status
if ((offset | length | (offset + length) | (dst.length - (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
Expand All @@ -219,7 +219,7 @@ else if (address > addressLimit) {
}

@Override
public void put(int index, byte[] src, int offset, int length) {
public final void put(int index, byte[] src, int offset, int length) {
// check the byte array offset and length
if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
Expand All @@ -241,12 +241,12 @@ else if (address > addressLimit) {
}

@Override
public boolean getBoolean(int index) {
public final boolean getBoolean(int index) {
return get(index) != 0;
}

@Override
public void putBoolean(int index, boolean value) {
public final void putBoolean(int index, boolean value) {
put(index, (byte) (value ? 1 : 0));
}

Expand Down Expand Up @@ -436,41 +436,4 @@ private static long checkBufferAndGetAddress(ByteBuffer buffer) {
}
return getAddress(buffer);
}

// -------------------------------------------------------------------------
// Factoring
// -------------------------------------------------------------------------

/**
* Base factory for hybrid memory segments.
*/
public static final class HybridMemorySegmentFactory implements MemorySegmentFactory.Factory {

@Override
public HybridMemorySegment wrap(byte[] memory) {
return new HybridMemorySegment(memory);
}

@Override
public HybridMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HybridMemorySegment(new byte[size], owner);
}

@Override
public HybridMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HybridMemorySegment(memory, owner);
}

@Override
public HybridMemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
return new HybridMemorySegment(memory, owner);
}

/**
* Prevent external instantiation.
*/
HybridMemorySegmentFactory() {}
}

public static final HybridMemorySegmentFactory FACTORY = new HybridMemorySegmentFactory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,27 @@

import java.nio.ByteBuffer;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A factory for memory segments. The purpose of this factory is to make sure that all memory segments
* for heap data are of the same type. That way, the runtime does not mix the various specializations
* of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial
* to method specialization by the JIT and to overall performance.
* A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
*
* <p>Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment},
* if a request to create a segment comes before the initialization.
* <p>The purpose of this factory is to make sure that all memory segments for heap data are of the
* same type. That way, the runtime does not mix the various specializations of the {@link
* MemorySegment}. Not mixing them has shown to be beneficial to method specialization by the JIT
* and to overall performance.
*/
@Internal
public class MemorySegmentFactory {

/**
* The factory to use.
*/
private static volatile Factory factory;
public final class MemorySegmentFactory {

/**
* Creates a new memory segment that targets the given heap memory region.
* This method should be used to turn short lived byte arrays into memory segments.
*
* <p>This method should be used to turn short lived byte arrays into memory segments.
*
* @param buffer The heap memory region.
* @return A new memory segment that targets the given heap memory region.
*/
public static MemorySegment wrap(byte[] buffer) {
ensureInitialized();
return factory.wrap(buffer);
return new HybridMemorySegment(buffer);
}

/**
Expand Down Expand Up @@ -79,8 +71,7 @@ public static MemorySegment allocateUnpooledSegment(int size) {
* @return A new memory segment, backed by unpooled heap memory.
*/
public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
ensureInitialized();
return factory.allocateUnpooledSegment(size, owner);
return new HybridMemorySegment(new byte[size], owner);
}

/**
Expand All @@ -94,8 +85,7 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
* @return A new memory segment representing the given heap memory.
*/
public static MemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
ensureInitialized();
return factory.wrapPooledHeapMemory(memory, owner);
return new HybridMemorySegment(memory, owner);
}

/**
Expand All @@ -110,117 +100,7 @@ public static MemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
* @return A new memory segment representing the given off-heap memory.
*/
public static MemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner) {
ensureInitialized();
return factory.wrapPooledOffHeapMemory(memory, owner);
}

// ------------------------------------------------------------------------

/**
* Initializes this factory with the given concrete factory, iff it is not yet initialized.
* This also checks if the factory is already initialized to the exact same concrete factory
* as given.
*
* @param f The concrete factory to use.
* @return True, if the factory is initialized with the given concrete factory, or if it was already
* initialized with the exact same concrete factory. False, if it is already initialized with
* a different concrete factory.
*/
public static boolean initializeIfNotInitialized(Factory f) {
checkNotNull(f);

synchronized (MemorySegmentFactory.class) {
if (factory == null) {
factory = f;
return true;
} else {
return factory == f;
}
}
}

/**
* Checks whether this memory segment factory has been initialized (with a type to produce).
*
* @return True, if the factory has been initialized, false otherwise.
*/
public static boolean isInitialized() {
return factory != null;
return new HybridMemorySegment(memory, owner);
}

/**
* Gets the factory. May return null, if the factory has not been initialized.
*
* @return The factory, or null, if the factory has not been initialized.
*/
public static Factory getFactory() {
return factory;
}

/**
* Sets the factory to the {@link HeapMemorySegment#FACTORY} if no factory has been initialized, yet.
*/
private static void ensureInitialized() {
if (factory == null) {
synchronized (MemorySegmentFactory.class) {
if (factory == null) {
factory = HeapMemorySegment.FACTORY;
}
}
}
}

// ------------------------------------------------------------------------
// Internal factory
// ------------------------------------------------------------------------

/**
* A concrete factory for memory segments.
*/
public interface Factory {

/**
* Creates a new memory segment that targets the given heap memory region.
*
* @param memory The heap memory region.
* @return A new memory segment that targets the given heap memory region.
*/
MemorySegment wrap(byte[] memory);

/**
* Allocates some unpooled memory and creates a new memory segment that represents
* that memory.
*
* @param size The size of the memory segment to allocate.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment, backed by unpooled heap memory.
*/
MemorySegment allocateUnpooledSegment(int size, Object owner);

/**
* Creates a memory segment that wraps the given byte array.
*
* <p>This method is intended to be used for components which pool memory and create
* memory segments around long-lived memory regions.
*
*
* @param memory The heap memory to be represented by the memory segment.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment representing the given heap memory.
*/
MemorySegment wrapPooledHeapMemory(byte[] memory, Object owner);

/**
* Creates a memory segment that wraps the off-heap memory backing the given ByteBuffer.
* Note that the ByteBuffer needs to be a <i>direct ByteBuffer</i>.
*
* <p>This method is intended to be used for components which pool memory and create
* memory segments around long-lived memory regions.
*
* @param memory The byte buffer with the off-heap memory to be represented by the memory segment.
* @param owner The owner to associate with the memory segment.
* @return A new memory segment representing the given off-heap memory.
*/
MemorySegment wrapPooledOffHeapMemory(ByteBuffer memory, Object owner);
}
}
Loading

0 comments on commit e14caef

Please sign in to comment.