Skip to content

Commit

Permalink
[FLINK-18668][table-runtime-blink] BytesHashMap#growAndRehash should …
Browse files Browse the repository at this point in the history
…release newly allocated segments before throwing the exception

This closes apache#12968
  • Loading branch information
tsreaper committed Jul 29, 2020
1 parent 35fed94 commit 2658510
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
Expand Down Expand Up @@ -381,24 +380,17 @@ private void growAndRehash() throws EOFException {
LOG.warn("We can't handle more than Integer.MAX_VALUE buckets (eg. because hash functions return int)");
throw new EOFException();
}
List<MemorySegment> newBucketSegments = new ArrayList<>(required);

try {
int numAllocatedSegments = required - memoryPool.freePages();
if (numAllocatedSegments > 0) {
throw new MemoryAllocationException();
}
int needNumFromFreeSegments = required - newBucketSegments.size();
for (int end = needNumFromFreeSegments; end > 0; end--) {
newBucketSegments.add(memoryPool.nextSegment());
}

setBucketVariables(newBucketSegments);
} catch (MemoryAllocationException e) {
int numAllocatedSegments = required - memoryPool.freePages();
if (numAllocatedSegments > 0) {
LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages",
required, reservedNumBuffers);
required, reservedNumBuffers);
throw new EOFException();
}

List<MemorySegment> newBucketSegments = memoryPool.allocateSegments(required);
setBucketVariables(newBucketSegments);

long reHashStartTime = System.currentTimeMillis();
resetBucketSegments(newBucketSegments);
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.Preconditions;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -88,6 +89,28 @@ public MemorySegment nextSegment() {
return this.cachePages.remove(this.cachePages.size() - 1);
}

public List<MemorySegment> allocateSegments(int required) {
int freePages = freePages();
if (freePages < required) {
return null;
}

List<MemorySegment> ret = new ArrayList<>(required);
for (int i = 0; i < required; i++) {
MemorySegment segment;
try {
segment = nextSegment();
Preconditions.checkNotNull(segment);
} catch (Throwable t) {
// unexpected, we should first return all temporary segments
returnAll(ret);
throw t;
}
ret.add(segment);
}
return ret;
}

@Override
public int freePages() {
return this.maxPages - this.pageUsage;
Expand Down

0 comments on commit 2658510

Please sign in to comment.