Skip to content

Commit

Permalink
Merge pull request apache#10534: Beam-2535: Support timer output time…
Browse files Browse the repository at this point in the history
…stamps in the flink runner
  • Loading branch information
reuvenlax committed Jan 22, 2020
1 parent 17974cd commit dd0b001
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ public static Timer<Void> of(Instant time) {

/** Returns a timer for the given timestamp with a user specified payload. */
public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
return new AutoValue_Timer(timestamp, timestamp, payload);
}

/** Returns a timer for the given timestamp with a user specified payload and outputTimestamp. */
public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nullable T payload) {
return new AutoValue_Timer(timestamp, outputTimestamp, payload);
return new AutoValue_Timer(timestamp, payload);
}

/**
Expand All @@ -63,9 +58,6 @@ public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nulla
*/
public abstract Instant getTimestamp();

/* Returns the outputTimestamps */
public abstract Instant getOutputTimestamp();

/** A user supplied payload. */
@Nullable
public abstract T getPayload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void onTimer(
}

OnTimerArgumentProvider argumentProvider =
new OnTimerArgumentProvider(timerId, window, effectiveTimestamp, timeDomain);
new OnTimerArgumentProvider(timerId, window, timestamp, effectiveTimestamp, timeDomain);
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
}

Expand Down Expand Up @@ -774,6 +774,7 @@ public TimerMap timerFamily(String timerFamilyId) {
private class OnTimerArgumentProvider extends DoFn<InputT, OutputT>.OnTimerContext
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private final BoundedWindow window;
private final Instant fireTimestamp;
private final Instant timestamp;
private final TimeDomain timeDomain;
private final String timerId;
Expand All @@ -796,10 +797,15 @@ private StateNamespace getNamespace() {
}

private OnTimerArgumentProvider(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
String timerId,
BoundedWindow window,
Instant fireTimestamp,
Instant timestamp,
TimeDomain timeDomain) {
fn.super();
this.timerId = timerId;
this.window = window;
this.fireTimestamp = fireTimestamp;
this.timestamp = timestamp;
this.timeDomain = timeDomain;
}
Expand All @@ -809,6 +815,11 @@ public Instant timestamp() {
return timestamp;
}

@Override
public Instant fireTimestamp() {
return fireTimestamp;
}

@Override
public BoundedWindow window() {
return window;
Expand Down
1 change: 0 additions & 1 deletion runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def createValidatesRunnerTask(Map m) {
maxParallelForks 2
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,10 @@ public void processWatermark1(Watermark mark) throws Exception {
Instant watermarkHold = keyedStateInternals.watermarkHold();

long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());

if (timerInternals.getWatermarkHoldMs() < Long.MAX_VALUE) {
combinedWatermarkHold =
Math.min(combinedWatermarkHold, timerInternals.getWatermarkHoldMs());
}
long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);

if (potentialOutputWatermark > currentOutputWatermark) {
Expand Down Expand Up @@ -811,7 +814,7 @@ protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
// This is a user timer, so namespace must be WindowNamespace
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
pushbackDoFnRunner.onTimer(
timerData.getTimerId(),
timerData.getTimerFamilyId(),
Expand Down Expand Up @@ -1084,13 +1087,31 @@ class FlinkTimerInternals implements TimerInternals {
*/
final MapState<String, TimerData> pendingTimersById;

long watermarkHoldMs = Long.MAX_VALUE;

private FlinkTimerInternals() {
MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
new MapStateDescriptor<>(
"pending-timers", new StringSerializer(), new CoderTypeSerializer<>(timerCoder));
this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
}

long getWatermarkHoldMs() {
return watermarkHoldMs;
}

void updateWatermarkHold() {
this.watermarkHoldMs = Long.MAX_VALUE;
try {
for (TimerData timerData : pendingTimersById.values()) {
this.watermarkHoldMs =
Math.min(timerData.getOutputTimestamp().getMillis(), this.watermarkHoldMs);
}
} catch (Exception e) {
throw new RuntimeException("Exception while reading set of timers", e);
}
}

@Override
public void setTimer(
StateNamespace namespace,
Expand Down Expand Up @@ -1118,6 +1139,7 @@ public void setTimer(TimerData timer) {
// before we set the new one.
cancelPendingTimerById(contextTimerId);
registerTimer(timer, contextTimerId);
updateWatermarkHold();
} catch (Exception e) {
throw new RuntimeException("Failed to set timer", e);
}
Expand All @@ -1142,13 +1164,16 @@ private void registerTimer(TimerData timer, String contextTimerId) throws Except
private void cancelPendingTimerById(String contextTimerId) throws Exception {
TimerData oldTimer = pendingTimersById.get(contextTimerId);
if (oldTimer != null) {
deleteTimer(oldTimer);
deleteTimerInternal(oldTimer, false);
}
}

void cleanupPendingTimer(TimerData timer) {
void cleanupPendingTimer(TimerData timer, boolean updateWatermark) {
try {
pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
if (updateWatermark) {
updateWatermarkHold();
}
} catch (Exception e) {
throw new RuntimeException("Failed to cleanup state with pending timers", e);
}
Expand All @@ -1170,16 +1195,21 @@ public void deleteTimer(StateNamespace namespace, String timerId, String timerFa
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
try {
cancelPendingTimerById(getContextTimerId(timerId, namespace));
updateWatermarkHold();
} catch (Exception e) {
throw new RuntimeException("Failed to cancel timer", e);
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
@Override
@Deprecated
public void deleteTimer(TimerData timerKey) {
cleanupPendingTimer(timerKey);
deleteTimerInternal(timerKey, true);
}

void deleteTimerInternal(TimerData timerKey, boolean updateWatermark) {
cleanupPendingTimer(timerKey, true);
long time = timerKey.getTimestamp().getMillis();
switch (timerKey.getDomain()) {
case EVENT_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public void onTimer(
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
KV.of(timerKey, Timer.of(timestamp, new byte[0])),
timestamp,
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public <AdditionalOutputT> void outputWindowedValue(

@Override
protected void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> timer) {
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) {
// ignore this, it can only be a state cleanup timers from StatefulDoFnRunner and ProcessFn
// does its own state cleanup and should never set event-time timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {

@Override
protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public boolean receive(String pCollectionId, Object receivedElement) {

TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
timerInternals.setTimer(
namespace, timerId, "", timer.getTimestamp(), timer.getOutputTimestamp(), timeDomain);
namespace, timerId, "", timer.getTimestamp(), windowedValue.getTimestamp(), timeDomain);

timerIdToKey.put(timerId, windowedValue.getValue().getKey());
timerIdToPayload.put(timerId, timer.getPayload());
Expand All @@ -144,7 +144,7 @@ private void fireTimers() throws Exception {
KV.of(
timerIdToKey.get(timerData.getTimerId()),
Timer.of(timerData.getTimestamp(), timerIdToPayload.get(timerData.getTimerId()))),
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
Collections.singleton(window),
PaneInfo.NO_FIRING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ public void onTimer(
long testTimerOffset = 123456;
// Arbitrary key.
Object timer = timerBytes("X", testTimerOffset);
Object windowedTimer = WindowedValue.valueInGlobalWindow(timer);
Object windowedTimer =
WindowedValue.timestampedValueInGlobalWindow(
timer, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));

// Simulate the SDK Harness sending a timer element to the Runner Harness.
assertTrue(timerReceiver.receive(timerOutputPCollection, windowedTimer));
Expand Down Expand Up @@ -349,10 +351,14 @@ public void onTimer2(
long testTimerOffset = 123456;
// Arbitrary key.
Object timer1 = timerBytes("X", testTimerOffset);
Object windowedTimer1 = WindowedValue.valueInGlobalWindow(timer1);
Object windowedTimer1 =
WindowedValue.timestampedValueInGlobalWindow(
timer1, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));

Object timer2 = timerBytes("Y", testTimerOffset);
Object windowedTimer2 = WindowedValue.valueInGlobalWindow(timer2);
Object windowedTimer2 =
WindowedValue.timestampedValueInGlobalWindow(
timer2, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(testTimerOffset));

// Simulate the SDK Harness sending a timer element to the Runner Harness.
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class JobInvocation {
private final List<JobMessage> messageHistory;
private final List<Consumer<JobStateEvent>> stateObservers;
private final List<Consumer<JobMessage>> messageObservers;

private JobApi.MetricResults metrics;
private PortablePipelineResult resultHandle;
@Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ private static void fireTimer(
Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
Instant timestamp = timer.getTimestamp();
Instant outputTimestamp = timer.getOutputTimestamp();
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
KV.of(currentTimerKey, Timer.of(timestamp, new byte[0])),
timestamp,
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
timerConsumer.accept(timer.getTimerId(), timerValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,9 @@ public void processElement(
@TimerId("event") Timer eventTimeTimer,
@TimerId("processing") Timer processingTimeTimer) {
context.output(KV.of("main" + context.element().getKey(), ""));
eventTimeTimer.set(context.timestamp().plus(1L));
eventTimeTimer
.withOutputTimestamp(context.timestamp())
.set(context.timestamp().plus(1L));
processingTimeTimer.offset(Duration.millis(2L));
processingTimeTimer.setRelative();
}
Expand All @@ -1027,7 +1029,9 @@ public void eventTimer(
@TimerId("event") Timer eventTimeTimer,
@TimerId("processing") Timer processingTimeTimer) {
context.output(KV.of("event", ""));
eventTimeTimer.set(context.timestamp().plus(11L));
eventTimeTimer
.withOutputTimestamp(context.timestamp())
.set(context.timestamp().plus(11L));
processingTimeTimer.offset(Duration.millis(12L));
processingTimeTimer.setRelative();
}
Expand All @@ -1038,7 +1042,9 @@ public void processingTimer(
@TimerId("event") Timer eventTimeTimer,
@TimerId("processing") Timer processingTimeTimer) {
context.output(KV.of("processing", ""));
eventTimeTimer.set(context.timestamp().plus(21L));
eventTimeTimer
.withOutputTimestamp(context.timestamp())
.set(context.timestamp().plus(21L));
processingTimeTimer.offset(Duration.millis(22L));
processingTimeTimer.setRelative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ public abstract class OnTimerContext extends WindowedContext {
/** Returns the timestamp of the current timer. */
public abstract Instant timestamp();

/** Returns the output timestamp of the current timer. */
public abstract Instant fireTimestamp();

/** Returns the window in which the timer is firing. */
public abstract BoundedWindow window();

Expand Down
Loading

0 comments on commit dd0b001

Please sign in to comment.