Skip to content

Commit

Permalink
Merge pull request apache#11483 from apache/revert-10418-cachesizebytes
Browse files Browse the repository at this point in the history
Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size."
  • Loading branch information
lukecwik committed Apr 21, 2020
2 parents 6e6ff61 + 6663da8 commit 623c5ed
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public ApplianceShuffleEntryReader(
new ChunkingShuffleBatchReader(executionContext, operationContext, applianceShuffleReader);

if (cache) {
// Limit the size of the cache to ~32 full shuffle batches.
final long maxBytes = 128L * 1024 * 1024;
batchReader = new CachingShuffleBatchReader(batchReader, maxBytes);
// Limit the size of the cache.
final int maxBatches = 32;
batchReader = new CachingShuffleBatchReader(batchReader, maxBatches);
}
entryReader = new BatchingShuffleEntryReader(batchReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,14 @@ public ShuffleBatchReader.Batch read(
}
DataInputStream input = new DataInputStream(new ByteArrayInputStream(result.chunk));
ArrayList<ShuffleEntry> entries = new ArrayList<>();
long batchSize = 0;
while (input.available() > 0) {
ShuffleEntry entry = getShuffleEntry(input);
batchSize += entry.length();
entries.add(entry);
entries.add(getShuffleEntry(input));
}
return new Batch(
entries,
result.nextStartPosition == null
? null
: ByteArrayShufflePosition.of(result.nextStartPosition),
batchSize);
: ByteArrayShufflePosition.of(result.nextStartPosition));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,35 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Weigher;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;

/** A {@link ShuffleBatchReader} that caches batches as they're read. */
public class CachingShuffleBatchReader implements ShuffleBatchReader {
private final ShuffleBatchReader reader;
@VisibleForTesting final LoadingCache<BatchRange, Batch> cache;

/**
* Limit the size of the cache to 1GiB of batches.
*
* <p>If this increases beyond Integer.MAX_VALUE then {@link BatchWeigher} must be updated.
* Because a batch may be larger than 1GiB, the actual in-memory batch size may exceed this value.
*/
private static final int MAXIMUM_WEIGHT = 1024 * 1024 * 1024;
/** Limit the size of the cache to 1000 batches. */
private static final int MAXIMUM_BATCHES = 1000;

// Ensure that batches in the cache are expired quickly
// for improved GC performance.
private static final Duration EXPIRE_AFTER = Duration.ofMillis(250);
private static final long EXPIRE_AFTER_MS = 250;

/**
* Creates the caching reader.
*
* @param shuffleReader wrapped reader.
* @param maximumWeightBytes maximum bytes for the cache.
* @param expireAfterAccess cache items may be evicted after the elapsed duration.
*/
public CachingShuffleBatchReader(
ShuffleBatchReader shuffleReader, long maximumWeightBytes, Duration expireAfterAccess) {
ShuffleBatchReader shuffleReader, int maximumBatches, long expireAfterAccessMillis) {
this.reader = shuffleReader;
this.cache =
CacheBuilder.newBuilder()
.maximumWeight(maximumWeightBytes)
.weigher(new BatchWeigher())
.expireAfterAccess(expireAfterAccess)
.maximumSize(maximumBatches)
.expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
.<BatchRange, Batch>build(
new CacheLoader<BatchRange, Batch>() {
@Override
Expand All @@ -73,24 +58,12 @@ public Batch load(BatchRange batchRange) throws Exception {
});
}

/**
* Creates the caching reader with a maximum size of {@link MAXIMUM_WEIGHT} and an element expiry
* duration of {@link EXPIRE_AFTER}.
*
* @param shuffleReader wrapped reader.
*/
public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader) {
this(shuffleReader, MAXIMUM_WEIGHT, EXPIRE_AFTER);
this(shuffleReader, MAXIMUM_BATCHES, EXPIRE_AFTER_MS);
}

/**
* Creates the caching reader with an element expiry duration of {@link EXPIRE_AFTER}.
*
* @param shuffleReader wrapped reader.
* @param maximumWeightBytes maximum bytes for the cache.
*/
public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, long maximumWeightBytes) {
this(shuffleReader, maximumWeightBytes, EXPIRE_AFTER);
public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, int maximumBatches) {
this(shuffleReader, maximumBatches, EXPIRE_AFTER_MS);
}

@Override
Expand Down Expand Up @@ -129,18 +102,4 @@ public int hashCode() {
return Objects.hashCode(startPosition, endPosition);
}
}

/**
* Returns the weight of a Batch, in bytes, within the range [0, Integer.MAX_VALUE].
*
* <p>The cache holds {@link MAX_WEIGHT} bytes. If {@link MAX_WEIGHT} is increased beyond
* Integer.MAX_VALUE bytes, a new weighing heuristic will be required to avoid under representing
* the number of bytes in memory.
*/
static final class BatchWeigher implements Weigher<BatchRange, Batch> {
@Override
public int weigh(BatchRange key, Batch value) {
return Ints.saturatedCast(value.bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ public interface ShuffleBatchReader {
public static class Batch {
public final List<ShuffleEntry> entries;
@Nullable public final ShufflePosition nextStartPosition;
public final long bytes;

public Batch(
List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition, long bytes) {
public Batch(List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition) {
this.entries = entries;
this.nextStartPosition = nextStartPosition;
this.bytes = bytes;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import static com.google.api.client.util.Lists.newArrayList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -68,9 +68,8 @@ public void readerCanRead() throws Exception {
ArrayList<ShuffleEntry> entries = new ArrayList<>();
entries.add(e1);
entries.add(e2);
long batchSize = (long) e1.length() + e2.length();
when(batchReader.read(START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
.thenReturn(new ShuffleBatchReader.Batch(entries, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e1, e2));
}
Expand All @@ -82,9 +81,8 @@ public void readerIteratorCanBeCopied() throws Exception {
ArrayList<ShuffleEntry> entries = new ArrayList<>();
entries.add(e1);
entries.add(e2);
long batchSize = (long) e1.length() + e2.length();
when(batchReader.read(START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize));
.thenReturn(new ShuffleBatchReader.Batch(entries, null));
Reiterator<ShuffleEntry> it = reader.read(START_POSITION, END_POSITION);
assertThat(it.hasNext(), equalTo(Boolean.TRUE));
assertThat(it.next(), equalTo(e1));
Expand All @@ -104,9 +102,9 @@ public void readerShouldMergeMultipleBatchResults() throws Exception {
ShuffleEntry e2 = new ShuffleEntry(KEY, SKEY, VALUE);
List<ShuffleEntry> e2s = Collections.singletonList(e2);
when(batchReader.read(START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, e1.length()));
.thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
when(batchReader.read(NEXT_START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(e2s, null, e2.length()));
.thenReturn(new ShuffleBatchReader.Batch(e2s, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e1, e2));

Expand All @@ -122,11 +120,11 @@ public void readerShouldMergeMultipleBatchResultsIncludingEmptyShards() throws E
ShuffleEntry e3 = new ShuffleEntry(KEY, SKEY, VALUE);
List<ShuffleEntry> e3s = Collections.singletonList(e3);
when(batchReader.read(START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, 0));
.thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION));
when(batchReader.read(NEXT_START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION, 0));
.thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION));
when(batchReader.read(SECOND_NEXT_START_POSITION, END_POSITION))
.thenReturn(new ShuffleBatchReader.Batch(e3s, null, e3.length()));
.thenReturn(new ShuffleBatchReader.Batch(e3s, null));
List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION));
assertThat(results, contains(e3));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -37,7 +37,7 @@
@RunWith(JUnit4.class)
public final class CachingShuffleBatchReaderTest {
private final ShuffleBatchReader.Batch testBatch =
new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null, 0);
new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null);

@Test
public void readerShouldCacheReads() throws IOException {
Expand Down

0 comments on commit 623c5ed

Please sign in to comment.