Skip to content

Commit

Permalink
Merge pull request apache#9677 from xubii/time-output-timestamp
Browse files Browse the repository at this point in the history
[Beam-2535] : Support outputTimestamp and watermark holds in timers.
  • Loading branch information
reuvenlax committed Jan 8, 2020
2 parents a666187 + 6fdb1e1 commit 4b77225
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public void testEventTimeTimers() {
timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);

TimerData timerData0 =
TimerData.of("timerData0", StateNamespaces.global(), instant0, TimeDomain.EVENT_TIME);
TimerData.of(
"timerData0", StateNamespaces.global(), instant0, instant0, TimeDomain.EVENT_TIME);
timerInternals.setTimer(timerData0);

TimerData timerData1 =
TimerData.of("timerData1", StateNamespaces.global(), instant1, TimeDomain.EVENT_TIME);
TimerData.of(
"timerData1", StateNamespaces.global(), instant1, instant1, TimeDomain.EVENT_TIME);
timerInternals.setTimer(timerData1);

timerInternals.fireReadyTimers(instant0.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
Expand Down Expand Up @@ -94,11 +96,13 @@ public void testDeleteTimer() {
timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);

TimerData timerData0 =
TimerData.of("timerData0", StateNamespaces.global(), instant0, TimeDomain.EVENT_TIME);
TimerData.of(
"timerData0", StateNamespaces.global(), instant0, instant0, TimeDomain.EVENT_TIME);
timerInternals.setTimer(timerData0);

TimerData timerData1 =
TimerData.of("timerData1", StateNamespaces.global(), instant1, TimeDomain.EVENT_TIME);
TimerData.of(
"timerData1", StateNamespaces.global(), instant1, instant1, TimeDomain.EVENT_TIME);
timerInternals.setTimer(timerData1);

Map<?, Set<Slice>> timerMap = timerInternals.getTimerSet(TimeDomain.EVENT_TIME).getMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nullable T payload) {
return new AutoValue_Timer(timestamp, outputTimestamp, payload);
}

/**
* Returns the timestamp of when the timer is scheduled to fire.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,14 @@ public void onTimer(
TimeDomain timeDomain) {

// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timestamp of the
// timer itself. Otherwise, they are set to the input timestamp, which is by definition
// otherwise specified. If this is an event time timer, then they have the timer's output
// timestamp. Otherwise, they are set to the input timestamp, which is by definition
// non-late.
Instant effectiveTimestamp;
switch (timeDomain) {
case EVENT_TIME:
effectiveTimestamp = timestamp;
effectiveTimestamp = outputTimestamp;
break;

case PROCESSING_TIME:
case SYNCHRONIZED_PROCESSING_TIME:
effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime();
Expand Down Expand Up @@ -912,6 +911,8 @@ private class TimerInternalsTimer implements Timer {
private final StateNamespace namespace;
private final String timerId;
private final TimerSpec spec;
private Instant target;
private Instant outputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;

Expand All @@ -930,30 +931,14 @@ public TimerInternalsTimer(

@Override
public void set(Instant target) {
// Verifies that the time domain of this timer is acceptable for absolute timers.
if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
throw new IllegalStateException(
"Can only set relative timers in processing time domain. Use #setRelative()");
}

// Ensures that the target time is reasonable. For event time timers this means that the time
// should be prior to window GC time.
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
checkArgument(
!target.isAfter(windowExpiry),
"Attempted to set event time timer for %s but that is after"
+ " the expiration of window %s",
target,
windowExpiry);
}

setUnderlyingTimer(target);
this.target = target;
verifyAbsoluteTimeDomain();
setAndVerifyOutputTimestamp();
setUnderlyingTimer();
}

@Override
public void setRelative() {
Instant target;
Instant now = getCurrentTime();
if (period.equals(Duration.ZERO)) {
target = now.plus(offset);
Expand All @@ -962,7 +947,9 @@ public void setRelative() {
target = millisSinceStart == 0 ? now : now.plus(period).minus(millisSinceStart);
}
target = minTargetAndGcTime(target);
setUnderlyingTimer(target);

setAndVerifyOutputTimestamp();
setUnderlyingTimer();
}

@Override
Expand Down Expand Up @@ -991,13 +978,58 @@ private Instant minTargetAndGcTime(Instant target) {
return target;
}

@Override
public Timer withOutputTimestamp(Instant outputTimestamp) {
this.outputTimestamp = outputTimestamp;
return this;
}

/** Verifies that the time domain of this timer is acceptable for absolute timers. */
private void verifyAbsoluteTimeDomain() {
if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
throw new IllegalStateException(
"Cannot only set relative timers in processing time domain." + " Use #setRelative()");
}
}

/**
*
*
* <ul>
* Ensures that:
* <li>Users can't set {@code outputTimestamp} for processing time timers.
* <li>Event time timers' {@code outputTimestamp} is set before window expiration.
* </ul>
*/
private void setAndVerifyOutputTimestamp() {
// Output timestamp is currently not supported in processing time timers.
if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
throw new IllegalStateException("Cannot set outputTimestamp in processing time domain.");
}
// Output timestamp is set to the delivery time if not initialized by an user.
if (outputTimestamp == null) {
outputTimestamp = target;
}

if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
checkArgument(
!target.isAfter(windowExpiry),
"Attempted to set event time timer that outputs for %s but that is"
+ " after the expiration of window %s",
target,
windowExpiry);
}
}

/**
* Sets the timer for the target time without checking anything about whether it is a reasonable
* thing to do. For example, absolute processing time timers are not really sensible since the
* user has no way to compute a good choice of time.
*/
private void setUnderlyingTimer(Instant target) {
timerInternals.setTimer(namespace, timerId, "", target, target, spec.getTimeDomain());
private void setUnderlyingTimer() {
timerInternals.setTimer(
namespace, timerId, "", target, outputTimestamp, spec.getTimeDomain());
}

private Instant getCurrentTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
* and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, TimeDomain)}
* and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, Instant, TimeDomain)}
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,20 @@ abstract class TimerData implements Comparable<TimerData> {

// When adding a new field, make sure to add it to the compareTo() method.

/** Construct a {@link TimerData} for the given parameters. */
public static TimerData of(
String timerId,
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
return new AutoValue_TimerInternals_TimerData(
timerId, timerId, namespace, timestamp, outputTimestamp, domain);
}

/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
* generated.
* Construct a {@link TimerData} for the given parameters except for {@code outputTimestamp}.
* {@code outputTimestamp} is set to timer {@code timestamp}.
*/
public static TimerData of(
String timerId,
Expand All @@ -214,17 +225,27 @@ public static TimerData of(
}

/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
*/
public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
public static TimerData of(
StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) {
String timerId =
new StringBuilder()
.append(domain.ordinal())
.append(':')
.append(timestamp.getMillis())
.toString();
return of(timerId, namespace, timestamp, domain);
return of(timerId, namespace, timestamp, outputTimestamp, domain);
}

/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}. Also, output
* timestamp is set to the timer timestamp by default.
*/
public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
return of(namespace, timestamp, timestamp, domain);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -71,6 +73,8 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo

private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;

private final EvaluationContext evaluationContext;

StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
this.delegateFactory =
new ParDoEvaluatorFactory<>(
Expand All @@ -92,6 +96,8 @@ public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedStatefulParDo
CacheBuilder.newBuilder()
.weakValues()
.build(new CleanupSchedulingLoader(evaluationContext));

this.evaluationContext = evaluationContext;
}

@Override
Expand Down Expand Up @@ -146,7 +152,13 @@ private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(
application.getTransform().getSchemaInformation(),
application.getTransform().getSideInputMapping());

return new StatefulParDoEvaluator<>(delegateEvaluator);
DirectStepContext stepContext =
evaluationContext
.getExecutionContext(application, inputBundle.getKey())
.getStepContext(evaluationContext.getStepName(application));

stepContext.stateInternals().commit();
return new StatefulParDoEvaluator<>(delegateEvaluator, stepContext);
}

private class CleanupSchedulingLoader
Expand Down Expand Up @@ -241,10 +253,14 @@ private static class StatefulParDoEvaluator<K, InputT>
private final List<TimerData> pushedBackTimers = new ArrayList<>();
private final DirectTimerInternals timerInternals;

DirectStepContext stepContext;

public StatefulParDoEvaluator(
DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator,
DirectStepContext stepContext) {
this.delegateEvaluator = delegateEvaluator;
this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
this.stepContext = stepContext;
}

@Override
Expand All @@ -269,6 +285,12 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
BoundedWindow timerWindow = windowNamespace.getWindow();
delegateEvaluator.onTimer(timer, timerWindow);

StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer);

stepContext.stateInternals().state(timer.getNamespace(), timerWatermarkHoldTag).clear();
stepContext.stateInternals().commit();

if (timerInternals.containsUpdateForTimeBefore(currentInputWatermark)) {
break;
}
Expand All @@ -278,15 +300,41 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes

@Override
public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {

TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle();
boolean isTimerDeclared = false;
for (TimerData timerData : delegateResult.getTimerUpdate().getSetTimers()) {
StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timerData);

stepContext
.stateInternals()
.state(timerData.getNamespace(), timerWatermarkHoldTag)
.add(timerData.getOutputTimestamp());
isTimerDeclared = true;
}

CopyOnAccessInMemoryStateInternals state;
Instant watermarkHold;

if (isTimerDeclared && delegateResult.getState() != null) { // For both State and Timer Holds
state = delegateResult.getState();
watermarkHold = stepContext.commitState().getEarliestWatermarkHold();
} else if (isTimerDeclared) { // For only Timer holds
state = stepContext.commitState();
watermarkHold = state.getEarliestWatermarkHold();
} else { // For only State ( non Timer ) holds
state = delegateResult.getState();
watermarkHold = delegateResult.getWatermarkHold();
}

TimerUpdate timerUpdate =
delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
pushedBackTimers.clear();
StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> regroupedResult =
StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
delegateResult.getTransform(), delegateResult.getWatermarkHold())
delegateResult.getTransform(), watermarkHold)
.withTimerUpdate(timerUpdate)
.withState(delegateResult.getState())
.withState(state)
.withMetricUpdates(delegateResult.getLogicalMetricUpdates())
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));

Expand All @@ -306,4 +354,10 @@ public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Ex
return regroupedResult.build();
}
}

private static StateTag<WatermarkHoldState> setTimerTag(TimerData timerData) {
return StateTags.makeSystemTagInternal(
StateTags.watermarkStateInternal(
"timer-" + timerData.getTimerId(), TimestampCombiner.EARLIEST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ synchronized Instant getEarliestTimerTimestamp() {
if (pendingTimers.isEmpty()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
return pendingTimers.firstEntry().getElement().getTimestamp();
return pendingTimers.firstEntry().getElement().getOutputTimestamp();
}
}

Expand Down Expand Up @@ -465,7 +465,8 @@ public synchronized WatermarkUpdate refresh() {
Instant oldWatermark = currentWatermark.get();
Instant newWatermark =
INSTANT_ORDERING.min(
inputWatermark.get(), inputWatermark.getEarliestTimerTimestamp(), holds.getMinHold());
inputWatermark.get(), holds.getMinHold(), inputWatermark.getEarliestTimerTimestamp());

newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
currentWatermark.set(newWatermark);
return updateAndTrace(getName(), oldWatermark, newWatermark);
Expand Down
1 change: 1 addition & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def createValidatesRunnerTask(Map m) {
maxParallelForks 2
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
Expand Down
Loading

0 comments on commit 4b77225

Please sign in to comment.