Skip to content

Commit

Permalink
support flink runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuven Lax committed Jan 14, 2020
1 parent f2d19fd commit fb686f2
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ public static Timer<Void> of(Instant time) {

/** Returns a timer for the given timestamp with a user specified payload. */
public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
return new AutoValue_Timer(timestamp, timestamp, payload);
}

/** Returns a timer for the given timestamp with a user specified payload and outputTimestamp. */
public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nullable T payload) {
return new AutoValue_Timer(timestamp, outputTimestamp, payload);
return new AutoValue_Timer(timestamp, payload);
}

/**
Expand All @@ -63,9 +58,6 @@ public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, @Nulla
*/
public abstract Instant getTimestamp();

/* Returns the outputTimestamps */
public abstract Instant getOutputTimestamp();

/** A user supplied payload. */
@Nullable
public abstract T getPayload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void onTimer(
}

OnTimerArgumentProvider argumentProvider =
new OnTimerArgumentProvider(window, effectiveTimestamp, timeDomain);
new OnTimerArgumentProvider(window, timestamp, effectiveTimestamp, timeDomain);
invoker.invokeOnTimer(timerId, argumentProvider);
}

Expand Down Expand Up @@ -732,6 +732,7 @@ private class OnTimerArgumentProvider extends DoFn<InputT, OutputT>.OnTimerConte
implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
private final BoundedWindow window;
private final Instant timestamp;
private final Instant outputTimestamp;
private final TimeDomain timeDomain;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
Expand All @@ -752,10 +753,11 @@ private StateNamespace getNamespace() {
}

private OnTimerArgumentProvider(
BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
BoundedWindow window, Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
fn.super();
this.window = window;
this.timestamp = timestamp;
this.outputTimestamp = outputTimestamp;
this.timeDomain = timeDomain;
}

Expand All @@ -764,6 +766,11 @@ public Instant timestamp() {
return timestamp;
}

@Override
public Instant outputTimestamp() {
return outputTimestamp;
}

@Override
public BoundedWindow window() {
return window;
Expand Down Expand Up @@ -892,7 +899,8 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {

@Override
public <T> void output(TupleTag<T> tag, T output) {
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWindowedValue(
tag, WindowedValue.of(output, outputTimestamp, window(), PaneInfo.NO_FIRING));
}

@Override
Expand Down
1 change: 0 additions & 1 deletion runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def createValidatesRunnerTask(Map m) {
maxParallelForks 2
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,10 @@ public void processWatermark1(Watermark mark) throws Exception {
Instant watermarkHold = keyedStateInternals.watermarkHold();

long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());

if (timerInternals.getWatermarkHoldMs() < Long.MAX_VALUE) {
combinedWatermarkHold =
Math.min(combinedWatermarkHold, timerInternals.getWatermarkHoldMs());
}
long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);

if (potentialOutputWatermark > currentOutputWatermark) {
Expand Down Expand Up @@ -811,7 +814,7 @@ protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
// This is a user timer, so namespace must be WindowNamespace
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
pushbackDoFnRunner.onTimer(
timerData.getTimerId(),
timerData.getTimerFamilyId(),
Expand Down Expand Up @@ -1084,13 +1087,31 @@ class FlinkTimerInternals implements TimerInternals {
*/
final MapState<String, TimerData> pendingTimersById;

long watermarkHoldMs = Long.MAX_VALUE;

private FlinkTimerInternals() {
MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
new MapStateDescriptor<>(
"pending-timers", new StringSerializer(), new CoderTypeSerializer<>(timerCoder));
this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor);
}

long getWatermarkHoldMs() {
return watermarkHoldMs;
}

void updateWatermarkHold() {
this.watermarkHoldMs = Long.MAX_VALUE;
try {
for (TimerData timerData : pendingTimersById.values()) {
this.watermarkHoldMs =
Math.min(timerData.getOutputTimestamp().getMillis(), this.watermarkHoldMs);
}
} catch (Exception e) {
throw new RuntimeException("Exception while reading set of timers", e);
}
}

@Override
public void setTimer(
StateNamespace namespace,
Expand Down Expand Up @@ -1118,6 +1139,7 @@ public void setTimer(TimerData timer) {
// before we set the new one.
cancelPendingTimerById(contextTimerId);
registerTimer(timer, contextTimerId);
updateWatermarkHold();
} catch (Exception e) {
throw new RuntimeException("Failed to set timer", e);
}
Expand All @@ -1142,13 +1164,16 @@ private void registerTimer(TimerData timer, String contextTimerId) throws Except
private void cancelPendingTimerById(String contextTimerId) throws Exception {
TimerData oldTimer = pendingTimersById.get(contextTimerId);
if (oldTimer != null) {
deleteTimer(oldTimer);
deleteTimerInternal(oldTimer, false);
}
}

void cleanupPendingTimer(TimerData timer) {
void cleanupPendingTimer(TimerData timer, boolean updateWatermark) {
try {
pendingTimersById.remove(getContextTimerId(timer.getTimerId(), timer.getNamespace()));
if (updateWatermark) {
updateWatermarkHold();
}
} catch (Exception e) {
throw new RuntimeException("Failed to cleanup state with pending timers", e);
}
Expand All @@ -1170,16 +1195,21 @@ public void deleteTimer(StateNamespace namespace, String timerId) {
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
try {
cancelPendingTimerById(getContextTimerId(timerId, namespace));
updateWatermarkHold();
} catch (Exception e) {
throw new RuntimeException("Failed to cancel timer", e);
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
@Override
@Deprecated
public void deleteTimer(TimerData timerKey) {
cleanupPendingTimer(timerKey);
deleteTimerInternal(timerKey, true);
}

void deleteTimerInternal(TimerData timerKey, boolean updateWatermark) {
cleanupPendingTimer(timerKey, true);
long time = timerKey.getTimestamp().getMillis();
switch (timerKey.getDomain()) {
case EVENT_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public void onTimer(
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
KV.of(timerKey, Timer.of(timestamp, new byte[0])),
timestamp,
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public <AdditionalOutputT> void outputWindowedValue(

@Override
protected void fireTimer(InternalTimer<ByteBuffer, TimerInternals.TimerData> timer) {
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
if (timer.getNamespace().getDomain().equals(TimeDomain.EVENT_TIME)) {
// ignore this, it can only be a state cleanup timers from StatefulDoFnRunner and ProcessFn
// does its own state cleanup and should never set event-time timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {

@Override
protected void fireTimer(InternalTimer<ByteBuffer, TimerData> timer) {
timerInternals.cleanupPendingTimer(timer.getNamespace());
timerInternals.cleanupPendingTimer(timer.getNamespace(), true);
doFnRunner.processElement(
WindowedValue.valueInGlobalWindow(
KeyedWorkItems.timersWorkItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public boolean receive(String pCollectionId, Object receivedElement) {

TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
timerInternals.setTimer(
namespace, timerId, "", timer.getTimestamp(), timer.getOutputTimestamp(), timeDomain);
namespace, timerId, "", timer.getTimestamp(), windowedValue.getTimestamp(), timeDomain);

timerIdToKey.put(timerId, windowedValue.getValue().getKey());
timerIdToPayload.put(timerId, timer.getPayload());
Expand All @@ -144,7 +144,7 @@ private void fireTimers() throws Exception {
KV.of(
timerIdToKey.get(timerData.getTimerId()),
Timer.of(timerData.getTimestamp(), timerIdToPayload.get(timerData.getTimerId()))),
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
Collections.singleton(window),
PaneInfo.NO_FIRING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class JobInvocation {
private final List<JobMessage> messageHistory;
private final List<Consumer<JobStateEvent>> stateObservers;
private final List<Consumer<JobMessage>> messageObservers;

private JobApi.MetricResults metrics;
private PortablePipelineResult resultHandle;
@Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ private static void fireTimer(
Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
Instant timestamp = timer.getTimestamp();
Instant outputTimestamp = timer.getOutputTimestamp();
WindowedValue<KV<Object, Timer>> timerValue =
WindowedValue.of(
KV.of(currentTimerKey, Timer.of(timestamp, new byte[0])),
timestamp,
outputTimestamp,
Collections.singleton(window),
PaneInfo.NO_FIRING);
timerConsumer.accept(timer.getTimerId(), timerValue);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ public abstract class OnTimerContext extends WindowedContext {
/** Returns the timestamp of the current timer. */
public abstract Instant timestamp();

/** Returns the output timestamp of the current timer. */
public abstract Instant outputTimestamp();

/** Returns the window in which the timer is firing. */
public abstract BoundedWindow window();

Expand Down
Loading

0 comments on commit fb686f2

Please sign in to comment.