Skip to content

Commit

Permalink
Merge branch 'max_commit_bytes' of https://github.com/scwhittle/beam
Browse files Browse the repository at this point in the history
…into max_commit_bytes
  • Loading branch information
drieber committed Feb 11, 2019
2 parents 67bf8b9 + bef135b commit 1fed9bd
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum StreamingSystemCounterNames {
WINDMILL_SHUFFLE_BYTES_READ("WindmillShuffleBytesRead"),
WINDMILL_STATE_BYTES_READ("WindmillStateBytesRead"),
WINDMILL_STATE_BYTES_WRITTEN("WindmillStateBytesWritten"),
WINDMILL_MAX_WORK_ITEM_COMMIT_BYTES("WindmillMaxWorkItemCommitBytes"),
JAVA_HARNESS_USED_MEMORY("dataflow_java_harness_used_memory"),
JAVA_HARNESS_MAX_MEMORY("dataflow_java_harness_max_memory"),
JAVA_HARNESS_RESTARTS("dataflow_java_harness_restarts"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.CommitWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkStream;
Expand Down Expand Up @@ -174,7 +175,7 @@ public class StreamingDataflowWorker {
// retrieving extra work from Windmill without working on it, leading to better
// prioritization / utilization.
static final int MAX_WORK_UNITS_QUEUED = 100;
static final long MAX_COMMIT_BYTES = 32 << 20;
static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20;
static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB
static final int NUM_COMMIT_STREAMS = 1;
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
Expand Down Expand Up @@ -205,6 +206,35 @@ private static boolean isOutOfMemoryError(Throwable t) {
return false;
}

private static class KeyCommitTooLargeException extends Exception {
public static KeyCommitTooLargeException causedBy(
String computationId, long byteLimit, WorkItemCommitRequest request) {
StringBuilder message = new StringBuilder();
message.append("Commit request for stage ");
message.append(computationId);
message.append(" and key ");
message.append(request.getKey().toStringUtf8());
if (request.getSerializedSize() > 0) {
message.append(
" has size "
+ request.getSerializedSize()
+ " which is more than the limit of "
+ byteLimit);
} else {
message.append(" is larger than 2GB and cannot be processed");
}
message.append(
". This may be caused by grouping a very "
+ "large amount of data in a single window without using Combine,"
+ " or by producing a large amount of data from a single input element.");
return new KeyCommitTooLargeException(message.toString());
}

private KeyCommitTooLargeException(String message) {
super(message);
}
}

private static MapTask parseMapTask(String input) throws IOException {
return Transport.getJsonFactory().fromString(input, MapTask.class);
}
Expand Down Expand Up @@ -312,6 +342,7 @@ static class Commit {
public Commit(
Windmill.WorkItemCommitRequest request, ComputationState computationState, Work work) {
this.request = request;
assert request.getSerializedSize() > 0;
this.computationState = computationState;
this.work = work;
}
Expand All @@ -329,20 +360,7 @@ public Work getWork() {
}

public int getSize() {
int commitSize = request.getSerializedSize();
if (commitSize < 0) {
throw new IllegalStateException(
"Commit request for stage "
+ computationState.getComputationId()
+ " and key "
+ request.getKey().toStringUtf8()
+ " is larger than 2GB and cannot be processed."
+ " This may be caused by grouping a very "
+ "large amount of data in a single window without using Combine,"
+ " or by producing a "
+ "large amount of data from a single input element.");
}
return commitSize;
return request.getSerializedSize();
}
}

Expand Down Expand Up @@ -388,6 +406,7 @@ public int getSize() {
// Built-in cumulative counters.
private final Counter<Long, Long> javaHarnessUsedMemory;
private final Counter<Long, Long> javaHarnessMaxMemory;
private final Counter<Integer, Integer> windmillMaxObservedWorkItemCommitBytes;
private Timer refreshActiveWorkTimer;
private Timer statusPageTimer;

Expand All @@ -408,6 +427,8 @@ public int getSize() {

// Limit on bytes sinked (committed) in a work item.
private final long maxSinkBytes; // = MAX_SINK_BYTES unless disabled in options.
// Possibly overridden by streaming engine config.
private int maxWorkItemCommitBytes = Integer.MAX_VALUE;

private final EvictingQueue<String> pendingFailuresToReport =
EvictingQueue.<String>create(MAX_FAILURES_TO_REPORT_IN_UPDATE);
Expand Down Expand Up @@ -559,6 +580,9 @@ public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
this.javaHarnessMaxMemory =
pendingCumulativeCounters.longSum(
StreamingSystemCounterNames.JAVA_HARNESS_MAX_MEMORY.counterName());
this.windmillMaxObservedWorkItemCommitBytes =
pendingCumulativeCounters.intMax(
StreamingSystemCounterNames.WINDMILL_MAX_WORK_ITEM_COMMIT_BYTES.counterName());
this.isDoneFuture = new CompletableFuture<>();

this.threadFactory =
Expand Down Expand Up @@ -697,6 +721,11 @@ public void setRetryLocallyDelayMs(int retryLocallyDelayMs) {
this.retryLocallyDelayMs = retryLocallyDelayMs;
}

@VisibleForTesting
public void setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
}

@VisibleForTesting
public boolean workExecutorIsEmpty() {
return workUnitExecutor.getQueue().isEmpty();
Expand Down Expand Up @@ -1247,7 +1276,25 @@ private void process(

// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
commitQueue.put(new Commit(outputBuilder.build(), computationState, work));

WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
int commitSize = commitRequest.getSerializedSize();
// Detect overflow of integer serialized size or if the byte limit was exceeded.
windmillMaxObservedWorkItemCommitBytes.addValue(
commitSize < 0 ? Integer.MAX_VALUE : commitSize);
if (commitSize < 0) {
throw KeyCommitTooLargeException.causedBy(computationId, byteLimit, commitRequest);
} else if (commitSize > byteLimit) {
// Once supported, we should communicate the desired truncation for the commit to the
// streaming engine. For now we report the error but attempt the commit so that it will be
// truncated by the streaming engine backend.
KeyCommitTooLargeException e =
KeyCommitTooLargeException.causedBy(computationId, byteLimit, commitRequest);
reportFailure(computationId, workItem, e);
LOG.error(e.toString());
}
commitQueue.put(new Commit(commitRequest, computationState, work));

// Compute shuffle and state byte statistics these will be flushed asynchronously.
long stateBytesWritten = outputBuilder.clearOutputMessages().build().getSerializedSize();
Expand Down Expand Up @@ -1278,25 +1325,23 @@ private void process(

t = t instanceof UserCodeException ? t.getCause() : t;

boolean retryLocally = false;
if (KeyTokenInvalidException.isKeyTokenInvalidException(t)) {
LOG.debug(
"Execution of work for {} for key {} failed due to token expiration. "
+ "Will not retry locally.",
computationId,
key.toStringUtf8());
computationState.completeWork(key, workItem.getWorkToken());
} else {
LOG.error("Uncaught exception: ", t);
LastExceptionDataProvider.reportException(t);
LOG.debug("Failed work: {}", work);
boolean retryLocally;
if (!reportFailure(computationId, workItem, t)) {
LOG.error(
"Execution of work for {} for key {} failed, and Windmill "
+ "indicated not to retry locally.",
computationId,
key.toStringUtf8());
retryLocally = false;
} else if (isOutOfMemoryError(t)) {
File heapDump = memoryMonitor.tryToDumpHeap();
LOG.error(
Expand All @@ -1305,23 +1350,22 @@ private void process(
computationId,
key.toStringUtf8(),
heapDump == null ? "not written" : ("written to '" + heapDump + "'"));
retryLocally = false;
} else {
LOG.error(
"Execution of work for {} for key {} failed. Will retry locally.",
computationId,
key.toStringUtf8());
retryLocally = true;
}

if (retryLocally) {
// Try again after some delay and at the end of the queue to avoid a tight loop.
sleep(retryLocallyDelayMs);
workUnitExecutor.forceExecute(work);
} else {
// Consider the item invalid. It will eventually be retried by Windmill.
computationState.completeWork(key, workItem.getWorkToken());
}
}
if (retryLocally) {
// Try again after some delay and at the end of the queue to avoid a tight loop.
sleep(retryLocallyDelayMs);
workUnitExecutor.forceExecute(work);
} else {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs
// to be processed.
computationState.completeWork(key, workItem.getWorkToken());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down Expand Up @@ -1374,7 +1418,7 @@ private void commitLoop() {
// Send the request if we've exceeded the bytes or there is no more
// pending work. commitBytes is a long, so this cannot overflow.
commitBytes += commit.getSize();
if (commitBytes >= MAX_COMMIT_BYTES) {
if (commitBytes >= TARGET_COMMIT_BUNDLE_BYTES) {
break;
}
commit = commitQueue.poll();
Expand Down Expand Up @@ -1473,6 +1517,9 @@ private void getConfigFromWindmill(String computation) {
Windmill.GetConfigRequest.newBuilder().addComputations(computation).build();

Windmill.GetConfigResponse response = windmillServer.getConfig(request);
// The max work item commit bytes should be modified to be dynamic once it is available in
// the request.
setMaxWorkItemCommitBytes(180 << 20);
for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry :
response.getSystemNameToComputationIdMapList()) {
systemNameToComputationIdMap.put(entry.getSystemName(), entry.getComputationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,73 @@ public void testKeyTokenInvalidException() throws Exception {
assertEquals(1, result.size());
}

static class LargeCommitFn extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().getKey().equals("large_key")) {
StringBuilder s = new StringBuilder();
for (int i = 0; i < 100; ++i) s.append("large_commit");
c.output(KV.of(c.element().getKey(), s.toString()));
} else {
c.output(c.element());
}
}
}

@Test
public void testKeyCommitTooLargeException() throws Exception {
KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

List<ParallelInstruction> instructions =
Arrays.asList(
makeSourceInstruction(kvCoder),
makeDoFnInstruction(new LargeCommitFn(), 0, kvCoder),
makeSinkInstruction(kvCoder, 1));

FakeWindmillServer server = new FakeWindmillServer(errorCollector);
server.setExpectedExceptionCount(1);

StreamingDataflowWorker worker =
makeWorker(instructions, createTestingPipelineOptions(server), true /* publishCounters */);
worker.setMaxWorkItemCommitBytes(1000);
worker.start();

server.addWorkToOffer(makeInput(1, 0, "large_key"));
server.addWorkToOffer(makeInput(2, 0, "key"));
server.waitForEmptyWorkQueue();

Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1);

assertEquals(2, result.size());
assertEquals(makeExpectedOutput(2, 0, "key", "key").build(), result.get(2L));
assertTrue(result.containsKey(1L));
assertEquals("large_key", result.get(1L).getKey().toStringUtf8());
assertTrue(result.get(1L).getSerializedSize() > 1000);

// Spam worker updates a few times.
int maxTries = 10;
while (--maxTries > 0) {
worker.reportPeriodicWorkerUpdates();
Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
}

// We should see an exception reported for the large commit but not the small one.
ArgumentCaptor<WorkItemStatus> workItemStatusCaptor =
ArgumentCaptor.forClass(WorkItemStatus.class);
verify(mockWorkUnitClient, atLeast(2)).reportWorkItemStatus(workItemStatusCaptor.capture());
List<WorkItemStatus> capturedStatuses = workItemStatusCaptor.getAllValues();
boolean foundErrors = false;
for (WorkItemStatus status : capturedStatuses) {
if (!status.getErrors().isEmpty()) {
assertFalse(foundErrors);
foundErrors = true;
String errorMessage = status.getErrors().get(0).getMessage();
assertThat(errorMessage, Matchers.containsString("KeyCommitTooLargeException"));
}
}
assertTrue(foundErrors);
}

static class ChangeKeysFn extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
Expand Down

0 comments on commit 1fed9bd

Please sign in to comment.