Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9964] Setting --workerCacheMB for Streaming Pipeline #11710

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public int getSize() {
private final ConcurrentMap<String, String> systemNameToComputationIdMap =
new ConcurrentHashMap<>();

private final WindmillStateCache stateCache = new WindmillStateCache();
private final WindmillStateCache stateCache;

private final ThreadFactory threadFactory;
private DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
Expand Down Expand Up @@ -577,6 +577,7 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
boolean publishCounters,
HotKeyLogger hotKeyLogger)
throws IOException {
this.stateCache = new WindmillStateCache(options.getWorkerCacheMb());
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.workUnitClient = workUnitClient;
this.options = options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
* thread at a time, so this is safe.
*/
public class WindmillStateCache implements StatusDataProvider {
// Convert Megabytes to bytes
private static final long MEGABYTES = 1024 * 1024;
// Estimate of overhead per StateId.
private static final int PER_STATE_ID_OVERHEAD = 20;
// Initial size of hash tables per entry.
Expand All @@ -64,13 +66,14 @@ public class WindmillStateCache implements StatusDataProvider {
private HashMultimap<ComputationKey, StateId> keyIndex =
HashMultimap.<ComputationKey, StateId>create();
private int displayedWeight = 0; // Only used for status pages and unit tests.
private long workerCacheBytes; // Copy workerCacheMb and convert to bytes.

public WindmillStateCache() {
public WindmillStateCache(Integer workerCacheMb) {
final Weigher<Weighted, Weighted> weigher = Weighers.weightedKeysAndValues();

workerCacheBytes = workerCacheMb * MEGABYTES;
stateCache =
CacheBuilder.newBuilder()
.maximumWeight(100000000 /* 100 MB */)
.maximumWeight(workerCacheBytes)
.recordStats()
.weigher(weigher)
.removalListener(
Expand All @@ -94,6 +97,10 @@ public long getWeight() {
return displayedWeight;
}

public long getMaxWeight() {
return workerCacheBytes;
}

/** Per-computation view of the state cache. */
public class ForComputation {
private final String computation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
Expand Down Expand Up @@ -87,10 +88,12 @@ public class StreamingModeExecutionContextTest {
private StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry(null);
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily");
Expand All @@ -100,7 +103,7 @@ public void setUp() {
"computationId",
new ReaderCache(),
stateNameMap,
new WindmillStateCache().forComputation("comp"),
new WindmillStateCache(options.getWorkerCacheMb()).forComputation("comp"),
StreamingStepMetricsContainer.createRegistry(),
new DataflowExecutionStateTracker(
ExecutionStateSampler.newForTest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand All @@ -41,6 +43,8 @@ public class WindmillStateCacheTest {
private static final String COMPUTATION = "computation";
private static final ByteString KEY = ByteString.copyFromUtf8("key");
private static final String STATE_FAMILY = "family";
private static final long MEGABYTES = 1024 * 1024;
DataflowWorkerHarnessOptions options;

private static class TestStateTag implements StateTag<TestState> {
final String id;
Expand Down Expand Up @@ -130,7 +134,8 @@ private static StateNamespace triggerNamespace(long start, int triggerIdx) {

@Before
public void setUp() {
cache = new WindmillStateCache();
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
cache = new WindmillStateCache(400);
assertEquals(0, cache.getWeight());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a check to this test to make sure that the maximumWeight of the cache is the 100 MB? (perhaps use a number different than 100 to be sure).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this by adding a new Test method in WindmillStateCacheTest class. I created a new getter in the WindmillStateCache to retrieve the size of max weight on bytes, and compared it to the initial value set

}

Expand Down Expand Up @@ -161,6 +166,12 @@ public void testBasic() throws Exception {
new TestState("t2"), keyCache.get(triggerNamespace(0, 0), new TestStateTag("tag2")));
}

/** Verifies that max weight is set */
@Test
public void testMaxWeight() throws Exception {
assertEquals(400 * MEGABYTES, cache.getMaxWeight());
}

/** Verifies that values are cached in the appropriate namespaces. */
@Test
public void testInvalidation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagBag;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.TagValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class WindmillStateInternalsTest {
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of());
private long workToken = 0;

DataflowWorkerHarnessOptions options;

@Mock private WindmillStateReader mockReader;

private WindmillStateInternals<String> underTest;
Expand All @@ -99,7 +103,8 @@ private static ByteString systemKey(StateNamespace namespace, String addrId) {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
cache = new WindmillStateCache();
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
cache = new WindmillStateCache(options.getWorkerCacheMb());
resetUnderTest();
}

Expand Down