Skip to content

Commit

Permalink
Merge pull request apache#11725: [BEAM-10015] Fix output timestamp on…
Browse files Browse the repository at this point in the history
… dataflow runner
  • Loading branch information
reuvenlax committed May 16, 2020
1 parent d299877 commit c395c84
Show file tree
Hide file tree
Showing 31 changed files with 439 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ public void testSerialization() {
TimerDataCoderV2 timerDataCoder = TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
TimerData timerData =
TimerData.of(
"arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME);
"arbitrary-id",
StateNamespaces.global(),
new Instant(0),
new Instant(0),
TimeDomain.EVENT_TIME);
String key = "key";
ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ public TimersImpl(StateNamespace namespace) {

@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}

@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,8 @@ public String getErrorContext() {
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
TimerInternals.TimerData.of(
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,27 +213,6 @@ public static TimerData of(
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain);
}

/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
* generated. Construct a {@link TimerData} for the given parameters except for {@code
* outputTimestamp}. {@code outputTimestamp} is set to timer {@code timestamp}.
*/
public static TimerData of(
String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
return new AutoValue_TimerInternals_TimerData(
timerId, "", namespace, timestamp, timestamp, domain);
}

public static TimerData of(
String timerId,
String timerFamilyId,
StateNamespace namespace,
Instant timestamp,
TimeDomain domain) {
return new AutoValue_TimerInternals_TimerData(
timerId, timerFamilyId, namespace, timestamp, timestamp, domain);
}

/**
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
Expand All @@ -249,15 +228,6 @@ public static TimerData of(
return of(timerId, namespace, timestamp, outputTimestamp, domain);
}

/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}. Also, output
* timestamp is set to the timer timestamp by default.
*/
public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
return of(namespace, timestamp, timestamp, domain);
}

/**
* {@inheritDoc}.
*
Expand Down Expand Up @@ -364,7 +334,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, domain);
return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class InMemoryTimerInternalsTest {
@Test
public void testFiringEventTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME);
TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME);
TimerData eventTimer1 =
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData eventTimer2 =
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);

underTest.setTimer(eventTimer1);
underTest.setTimer(eventTimer2);
Expand Down Expand Up @@ -79,7 +81,7 @@ public void testResetById() throws Exception {
underTest.advanceInputWatermark(laterTimestamp.plus(1L));
assertThat(
underTest.removeNextEventTimer(),
equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, TimeDomain.EVENT_TIME)));
equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, laterTimestamp, TimeDomain.EVENT_TIME)));
}

@Test
Expand Down Expand Up @@ -107,8 +109,10 @@ public void testDeletionById() throws Exception {
@Test
public void testFiringProcessingTimeTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);

underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);
Expand Down Expand Up @@ -136,14 +140,20 @@ public void testFiringProcessingTimeTimers() throws Exception {
@Test
public void testTimerOrdering() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData eventTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTime1 =
TimerData.of(NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData eventTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTime2 =
TimerData.of(NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData.of(
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);

underTest.setTimer(processingTime1);
underTest.setTimer(eventTime1);
Expand Down Expand Up @@ -176,8 +186,10 @@ public void testTimerOrdering() throws Exception {
@Test
public void testDeduplicate() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData eventTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData processingTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
underTest.setTimer(eventTime);
underTest.setTimer(eventTime);
underTest.setTimer(processingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public void testCoderProperties() throws Exception {
public void testEncodeDecodeEqual() throws Exception {
Iterable<TimerData> timers =
ImmutableList.of(
TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
TimerData.of(
StateNamespaces.global(),
new Instant(500L),
new Instant(500L),
TimeDomain.EVENT_TIME));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValue.valueInGlobalWindow(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
ArrayList<TimerData> timers = new ArrayList<>(1);
timers.add(
TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
TimerData.of(
StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
runner.onTimers(timers);
runner.persist();
}
Expand All @@ -583,6 +584,7 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
TimerData.of(
StateNamespaces.window(windowFn.windowCoder(), window),
timer.getTimestamp(),
timer.getTimestamp(),
timer.getValue()));
}
runner.onTimers(timerData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public void onTimer(OnTimerContext context) {
TimerData.of(
DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowCoder, (W) context.window()),
context.fireTimestamp(),
context.timestamp(),
context.timeDomain()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public void testOnTimerCalled() {
timerId,
StateNamespaces.window(IntervalWindow.getCoder(), window),
timestamp,
timestamp,
TimeDomain.EVENT_TIME)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public void testTimerDataCoder() throws Exception {
CoderProperties.coderDecodeEncodeEqual(
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE),
TimerData.of(
"arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
"arbitrary-id",
StateNamespaces.global(),
new Instant(0),
new Instant(0),
TimeDomain.EVENT_TIME));

Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
CoderProperties.coderDecodeEncodeEqual(
Expand All @@ -52,6 +56,7 @@ public void testTimerDataCoder() throws Exception {
StateNamespaces.window(
windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
new Instant(99),
new Instant(99),
TimeDomain.PROCESSING_TIME));
}

Expand All @@ -64,10 +69,12 @@ public void testCoderIsSerializableWithWellKnownCoderType() {
public void testCompareEqual() {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();
TimerData timer = TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME);
TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);

assertThat(
timer, comparesEqualTo(TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME)));
timer,
comparesEqualTo(
TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME)));
}

@Test
Expand All @@ -76,8 +83,10 @@ public void testCompareByTimestamp() {
Instant secondTimestamp = new Instant(200);
StateNamespace namespace = StateNamespaces.global();

TimerData firstTimer = TimerData.of(namespace, firstTimestamp, TimeDomain.EVENT_TIME);
TimerData secondTimer = TimerData.of(namespace, secondTimestamp, TimeDomain.EVENT_TIME);
TimerData firstTimer =
TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME);
TimerData secondTimer =
TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME);

assertThat(firstTimer, lessThan(secondTimer));
}
Expand All @@ -87,10 +96,10 @@ public void testCompareByDomain() {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();

TimerData eventTimer = TimerData.of(namespace, timestamp, TimeDomain.EVENT_TIME);
TimerData procTimer = TimerData.of(namespace, timestamp, TimeDomain.PROCESSING_TIME);
TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcTimer =
TimerData.of(namespace, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);

assertThat(eventTimer, lessThan(procTimer));
assertThat(eventTimer, lessThan(synchronizedProcTimer));
Expand All @@ -107,8 +116,10 @@ public void testCompareByNamespace() {
StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow);
StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow);

TimerData secondEventTime = TimerData.of(firstWindowNs, timestamp, TimeDomain.EVENT_TIME);
TimerData thirdEventTime = TimerData.of(secondWindowNs, timestamp, TimeDomain.EVENT_TIME);
TimerData secondEventTime =
TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
TimerData thirdEventTime =
TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);

assertThat(secondEventTime, lessThan(thirdEventTime));
}
Expand All @@ -118,8 +129,10 @@ public void testCompareByTimerId() {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();

TimerData id0Timer = TimerData.of("id0", namespace, timestamp, TimeDomain.EVENT_TIME);
TimerData id1Timer = TimerData.of("id1", namespace, timestamp, TimeDomain.EVENT_TIME);
TimerData id0Timer =
TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
TimerData id1Timer =
TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);

assertThat(id0Timer, lessThan(id1Timer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,12 @@ public TestTimers(StateNamespace namespace) {

@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}

@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,22 @@ public void setup() {
@Test
public void setTimerAddsToBuilder() {
TimerData eventTimer =
TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
TimerData.of(
StateNamespaces.global(),
new Instant(20145L),
new Instant(20145L),
TimeDomain.EVENT_TIME);
TimerData processingTimer =
TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
TimerData.of(
StateNamespaces.global(),
new Instant(125555555L),
new Instant(125555555L),
TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTimer =
TimerData.of(
StateNamespaces.global(),
new Instant(98745632189L),
new Instant(98745632189L),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
internals.setTimer(eventTimer);
internals.setTimer(processingTimer);
Expand All @@ -82,13 +91,22 @@ public void setTimerAddsToBuilder() {
@Test
public void deleteTimerDeletesOnBuilder() {
TimerData eventTimer =
TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
TimerData.of(
StateNamespaces.global(),
new Instant(20145L),
new Instant(20145L),
TimeDomain.EVENT_TIME);
TimerData processingTimer =
TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
TimerData.of(
StateNamespaces.global(),
new Instant(125555555L),
new Instant(125555555L),
TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTimer =
TimerData.of(
StateNamespaces.global(),
new Instant(98745632189L),
new Instant(98745632189L),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
internals.deleteTimer(eventTimer);
internals.deleteTimer(processingTimer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ public void removesOnExceptionInOnTimer() throws Exception {

try {
evaluator.onTimer(
TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
TimerData.of(
"foo",
StateNamespaces.global(),
new Instant(0),
new Instant(0),
TimeDomain.EVENT_TIME),
"",
GlobalWindow.INSTANCE);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public void extractFiredTimersExtractsTimers() {

StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
TimerData toFire =
TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
TimerData.of(
StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
TransformResult<?> timerResult =
StepTransformResult.withoutHold(downstreamProducer)
.withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
Expand Down
Loading

0 comments on commit c395c84

Please sign in to comment.