Skip to content

Commit

Permalink
[FLINK-15172][table-blink] Imporve BytesHashMap to lazy allocate memory
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Feb 24, 2020
1 parent 56acfe9 commit f144e13
Showing 1 changed file with 15 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
Expand Down Expand Up @@ -109,8 +109,6 @@ public class BytesHashMap {
*/
private final LookupInfo reuseLookInfo;

private final MemoryManager memoryManager;

/**
* Used as a reused object when retrieve the map's value by key and iteration.
*/
Expand All @@ -120,7 +118,7 @@ public class BytesHashMap {
*/
private BinaryRow reusedKey;

private final List<MemorySegment> freeMemorySegments;
private final LazyMemorySegmentPool memoryPool;
private List<MemorySegment> bucketSegments;

private long numElements = 0;
Expand Down Expand Up @@ -159,12 +157,7 @@ public BytesHashMap(
boolean inferBucketMemory) {
this.segmentSize = memoryManager.getPageSize();
this.reservedNumBuffers = (int) (memorySize / segmentSize);
this.memoryManager = memoryManager;
try {
this.freeMemorySegments = memoryManager.allocatePages(owner, reservedNumBuffers);
} catch (MemoryAllocationException e) {
throw new IllegalArgumentException("BytesHashMap can't allocate " + reservedNumBuffers + " pages", e);
}
this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, reservedNumBuffers);
this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;
this.numBucketsPerSegmentBits = MathUtils.log2strict(this.numBucketsPerSegment);
this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) - 1;
Expand Down Expand Up @@ -359,7 +352,7 @@ private void initBucketSegments(int numBucketSegments) {
}
this.bucketSegments = new ArrayList<>(numBucketSegments);
for (int i = 0; i < numBucketSegments; i++) {
bucketSegments.add(i, freeMemorySegments.remove(freeMemorySegments.size() - 1));
bucketSegments.add(i, memoryPool.nextSegment());
}

resetBucketSegments(this.bucketSegments);
Expand Down Expand Up @@ -391,14 +384,13 @@ private void growAndRehash() throws EOFException {
List<MemorySegment> newBucketSegments = new ArrayList<>(required);

try {
int freeNumSegments = freeMemorySegments.size();
int numAllocatedSegments = required - freeNumSegments;
int numAllocatedSegments = required - memoryPool.freePages();
if (numAllocatedSegments > 0) {
throw new MemoryAllocationException();
}
int needNumFromFreeSegments = required - newBucketSegments.size();
for (int end = needNumFromFreeSegments; end > 0; end--) {
newBucketSegments.add(freeMemorySegments.remove(freeMemorySegments.size() - 1));
newBucketSegments.add(memoryPool.nextSegment());
}

setBucketVariables(newBucketSegments);
Expand Down Expand Up @@ -437,7 +429,7 @@ private void growAndRehash() throws EOFException {
}
}
LOG.info("The rehash take {} ms for {} segments", (System.currentTimeMillis() - reHashStartTime), required);
this.freeMemorySegments.addAll(this.bucketSegments);
this.memoryPool.returnAll(this.bucketSegments);
this.bucketSegments = newBucketSegments;
}

Expand Down Expand Up @@ -491,7 +483,7 @@ public void free(boolean reservedRecordMemory) {
this.bucketSegments.clear();
recordArea.release();
if (!reservedRecordMemory) {
memoryManager.release(freeMemorySegments);
memoryPool.close();
}
numElements = 0;
destructiveIterator = null;
Expand All @@ -507,12 +499,15 @@ public void reset() {
resetBucketSegments(bucketSegments);
numElements = 0;
destructiveIterator = null;
LOG.info("reset BytesHashMap with record memory segments {}, {} in bytes, init allocating {} for bucket area.",
freeMemorySegments.size(), freeMemorySegments.size() * segmentSize, bucketSegments.size());
LOG.info(
"reset BytesHashMap with record memory segments {}, {} in bytes, init allocating {} for bucket area.",
memoryPool.freePages(),
memoryPool.freePages() * segmentSize,
bucketSegments.size());
}

private void returnSegments(List<MemorySegment> segments) {
freeMemorySegments.addAll(segments);
memoryPool.returnAll(segments);
}

// ----------------------- Record Area -----------------------
Expand All @@ -524,7 +519,7 @@ private final class RecordArea {
private final SimpleCollectingOutputView outView;

RecordArea() {
this.outView = new SimpleCollectingOutputView(segments, new RecordAreaMemorySource(), segmentSize);
this.outView = new SimpleCollectingOutputView(segments, memoryPool, segmentSize);
this.inView = new RandomAccessInputView(segments, segmentSize);
}

Expand All @@ -541,20 +536,6 @@ void reset() {
inView.setReadPosition(0);
}

// ----------------------- Memory Management -----------------------

private final class RecordAreaMemorySource implements MemorySegmentSource {
@Override
public MemorySegment nextSegment() {
int s = freeMemorySegments.size();
if (s > 0) {
return freeMemorySegments.remove(s - 1);
} else {
return null;
}
}
}

// ----------------------- Append -----------------------
private long appendRecord(BinaryRow key, BinaryRow value) throws IOException {
final long oldLastPosition = outView.getCurrentOffset();
Expand Down

0 comments on commit f144e13

Please sign in to comment.