Skip to content

Commit

Permalink
Merge pull request apache#11710 from [BEAM-9964] Setting --workerCach…
Browse files Browse the repository at this point in the history
…eMB for Streaming Pipeline

[BEAM-9964] Setting --workerCacheMB for Streaming Pipeline
  • Loading branch information
pabloem committed May 15, 2020
2 parents a93e24d + e5dfe5f commit ac190b8
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public int getSize() {
private final ConcurrentMap<String, String> systemNameToComputationIdMap =
new ConcurrentHashMap<>();

private final WindmillStateCache stateCache = new WindmillStateCache();
private final WindmillStateCache stateCache;

private final ThreadFactory threadFactory;
private DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
Expand Down Expand Up @@ -577,6 +577,7 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
boolean publishCounters,
HotKeyLogger hotKeyLogger)
throws IOException {
this.stateCache = new WindmillStateCache(options.getWorkerCacheMb());
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.workUnitClient = workUnitClient;
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
* thread at a time, so this is safe.
*/
public class WindmillStateCache implements StatusDataProvider {
// Convert Megabytes to bytes
private static final long MEGABYTES = 1024 * 1024;
// Estimate of overhead per StateId.
private static final int PER_STATE_ID_OVERHEAD = 20;
// Initial size of hash tables per entry.
Expand All @@ -64,13 +66,14 @@ public class WindmillStateCache implements StatusDataProvider {
private HashMultimap<ComputationKey, StateId> keyIndex =
HashMultimap.<ComputationKey, StateId>create();
private int displayedWeight = 0; // Only used for status pages and unit tests.
private long workerCacheBytes; // Copy workerCacheMb and convert to bytes.

public WindmillStateCache() {
public WindmillStateCache(Integer workerCacheMb) {
final Weigher<Weighted, Weighted> weigher = Weighers.weightedKeysAndValues();

workerCacheBytes = workerCacheMb * MEGABYTES;
stateCache =
CacheBuilder.newBuilder()
.maximumWeight(100000000 /* 100 MB */)
.maximumWeight(workerCacheBytes)
.recordStats()
.weigher(weigher)
.removalListener(
Expand All @@ -94,6 +97,10 @@ public long getWeight() {
return displayedWeight;
}

public long getMaxWeight() {
return workerCacheBytes;
}

/** Per-computation view of the state cache. */
public class ForComputation {
private final String computation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
Expand Down Expand Up @@ -87,10 +88,12 @@ public class StreamingModeExecutionContextTest {
private StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry(null);
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily");
Expand All @@ -100,7 +103,7 @@ public void setUp() {
"computationId",
new ReaderCache(),
stateNameMap,
new WindmillStateCache().forComputation("comp"),
new WindmillStateCache(options.getWorkerCacheMb()).forComputation("comp"),
StreamingStepMetricsContainer.createRegistry(),
new DataflowExecutionStateTracker(
ExecutionStateSampler.newForTest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand All @@ -41,6 +43,8 @@ public class WindmillStateCacheTest {
private static final String COMPUTATION = "computation";
private static final ByteString KEY = ByteString.copyFromUtf8("key");
private static final String STATE_FAMILY = "family";
private static final long MEGABYTES = 1024 * 1024;
DataflowWorkerHarnessOptions options;

private static class TestStateTag implements StateTag<TestState> {
final String id;
Expand Down Expand Up @@ -130,7 +134,8 @@ private static StateNamespace triggerNamespace(long start, int triggerIdx) {

@Before
public void setUp() {
cache = new WindmillStateCache();
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
cache = new WindmillStateCache(400);
assertEquals(0, cache.getWeight());
}

Expand Down Expand Up @@ -161,6 +166,12 @@ public void testBasic() throws Exception {
new TestState("t2"), keyCache.get(triggerNamespace(0, 0), new TestStateTag("tag2")));
}

/** Verifies that max weight is set */
@Test
public void testMaxWeight() throws Exception {
assertEquals(400 * MEGABYTES, cache.getMaxWeight());
}

/** Verifies that values are cached in the appropriate namespaces. */
@Test
public void testInvalidation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class WindmillStateInternalsTest {
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of());
private long workToken = 0;

DataflowWorkerHarnessOptions options;

@Mock private WindmillStateReader mockReader;

private WindmillStateInternals<String> underTest;
Expand All @@ -99,7 +103,8 @@ private static ByteString systemKey(StateNamespace namespace, String addrId) {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
cache = new WindmillStateCache();
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
cache = new WindmillStateCache(options.getWorkerCacheMb());
resetUnderTest();
}

Expand Down

0 comments on commit ac190b8

Please sign in to comment.