Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Beam-2535] : Support outputTimestamp and watermark holds in timers. #9677

Merged
merged 59 commits into from
Jan 8, 2020

Conversation

xubii
Copy link
Contributor

@xubii xubii commented Sep 27, 2019

It is the initial phase to make staled PR (#4700) compatible with the upstream master. The default implementation is done for Direct Runner.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

…urrent 4700 pull request into my branch. And apply required changes
…urrent 4700 pull request into my branch. And apply required changes
…urrent 4700 pull request into my branch. And apply required changes
timerData.getTimerId(),
window,
timerData.getTimestamp(),
timerData.getTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this timerData.getTimestamp()

timer.getTimerId(),
window,
timer.getTimestamp(),
timer.getTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be timer.getOutputTimestamp()

StateNamespace namespace,
String timerId,
Instant timestamp,
Instant OutputTimestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: case is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutputTimestamp should be outputTimestamp

@@ -119,7 +119,8 @@ public boolean receive(String pCollectionId, Object receivedElement) {
String timerId = timerSpec.timerId();

TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
timerInternals.setTimer(namespace, timerId, timer.getTimestamp(), timeDomain);
timerInternals.setTimer(
namespace, timerId, timer.getTimestamp(), timer.getTimestamp(), timeDomain);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second timer.getTimestamp() should be timer.getOutputTimestamp()

Copy link
Contributor Author

@xubii xubii Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here timer is not belongs to TimerData.java so its don`t have getOutputTimestamp() method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should add outputTimestamp to TimerInternals.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that the Timer object always gets constructed with an output timestamp so that we can plumb it through here.

timer.getTimerId(),
window,
timer.getTimestamp(),
timer.getTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer.getOutputTimestamp

Instant target,
Instant targetOutput,
TimeDomain timeDomain) {
getInternals().setTimer(namespace, timerId, target, target, timeDomain);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second "target" should be "targetOutput"

timer.getTimerId(),
window,
timer.getTimestamp(),
timer.getTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer.getOutputTimestamp()

@@ -92,7 +92,8 @@ private static void fireTimer(
TimerInternals.TimerData timer, DoFnRunner<KV<?, ?>, ?> doFnRunner) {
StateNamespace namespace = timer.getNamespace();
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
doFnRunner.onTimer(
timer.getTimerId(), window, timer.getTimestamp(), timer.getTimestamp(), timer.getDomain());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second timer.getTimestamp() should be timer.getOutputTimestamp()

@@ -160,7 +160,7 @@ public void commit() throws IOException {

// Set a timer to continue processing this element.
timerInternals.setTimer(
stateNamespace, "sdfContinuation", wakeupTime, TimeDomain.PROCESSING_TIME);
stateNamespace, "sdfContinuation", wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double check the second wakeupTime. Looks suspicious

String timerId,
Instant target,
Instant targetOutput,
TimeDomain timeDomain) {
TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are ignoring targetOutput

@shehzaadn-vd
Copy link
Contributor

Any test results after commit 524d991?

@xubii
Copy link
Contributor Author

xubii commented Oct 16, 2019

Any test results after commit 524d991?

Some test cases still failing after changed

@shehzaadn-vd
Copy link
Contributor

shehzaadn-vd commented Oct 16, 2019 via email

@kennknowles
Copy link
Member

kennknowles commented Oct 17, 2019

The JavaPortabilityApi test seems to have a compile error.

Test Result (4 failures / +4)

 - org.apache.beam.sdk.extensions.sql.impl.rel.BeamSortRelTest.testOrderBy_with_offset2
 - org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLDialectSpecTest.testZetaSQLSelectFromTableLimitOffset
 - org.apache.beam.sdk.io.hbase.HBaseIOTest.testReadingKeyRangeMiddleSDF
 - org.apache.beam.sdk.io.hbase.HBaseIOTest.testReadingSDF

The two SQL failures are the same: they are ZetaSQL and Calcite SQL versions of testing LIMIT with OFFSET. This uses stateful DoFn here: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java#L248

@kennknowles
Copy link
Member

The HBase tests are using SplittableDoFn. It is getting a null range to the key range tracker: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java#L54

Since SDF is probably implemented using state & timers under the hood, this is probably a good bug to catch.

@kennknowles
Copy link
Member

run dataflow validatesrunner

@kennknowles
Copy link
Member

I expect that test suite to uncover some more instances of state/timers having trouble.

@kennknowles
Copy link
Member

Notably, the thing that is null is never permitted to be null. So our static analysis has failed to detect a critical error somewhere.

@xubii
Copy link
Contributor Author

xubii commented Oct 18, 2019

run dataflow validatesrunner

@reuvenlax
Copy link
Contributor

retest this please

@reuvenlax
Copy link
Contributor

I added a test to this branch that should verify this behavior. It might require running through spotless

@rehmanmuradali
Copy link
Contributor

Retest this please

@rehmanmuradali
Copy link
Contributor

@reuvenlax , can you please run the jobs? Seems like I cannot run it according to new Jenkins job execution policy.

@aromanenko-dev
Copy link
Contributor

retest this please

1 similar comment
@iemejia
Copy link
Member

iemejia commented Jan 8, 2020

retest this please

@reuvenlax
Copy link
Contributor

retest this please

@reuvenlax
Copy link
Contributor

Run Java PreCommit

@reuvenlax
Copy link
Contributor

run spark validatesrunner

@reuvenlax
Copy link
Contributor

run flink validatesrunner

@reuvenlax
Copy link
Contributor

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor

Run Spark ValidatesRunner

@reuvenlax
Copy link
Contributor

This is almost done, however you need to exclude the Flink runner from the new test, as it is failing. To do this you need to introduce a new testing tag (e.g. look at the UsesTestStream interface) and then add it to the excludeCategories list in flink_runner.gradle

@rehmanmuradali
Copy link
Contributor

This is almost done, however you need to exclude the Flink runner from the new test, as it is failing. To do this you need to introduce a new testing tag (e.g. look at the UsesTestStream interface) and then add it to the excludeCategories list in flink_runner.gradle

@reuvenlax , done

@reuvenlax
Copy link
Contributor

Run Flink ValidatesRunner

1 similar comment
@reuvenlax
Copy link
Contributor

Run Flink ValidatesRunner

@reuvenlax reuvenlax changed the title [Beam-2535] : Apply changes from stale PR 4700 on latest code [Beam-2535] : Support outputTimestamp and watermark holds in timers. Jan 8, 2020
@reuvenlax reuvenlax merged commit 4b77225 into apache:master Jan 8, 2020
@boyuanzz
Copy link
Contributor

boyuanzz commented Jan 9, 2020

Seems like this PR is breaking:
Java_ValidatesRunner_Samza
Java_ValidatesRunner_Spark
Java_PVR_Flink_Streaming
Java_PVR_Flink_Batch

@xubii could you please help look into it?

@boyuanzz
Copy link
Contributor

File JIAR here: https://issues.apache.org/jira/browse/BEAM-9083

@reuvenlax
Copy link
Contributor

reuvenlax commented Jan 13, 2020 via email

@boyuanzz
Copy link
Contributor

Flink tests should explicitly excluded, but maybe we didn't exclude the portable flink tests. There's a pending PR to add support for Flink. We should add exclusions for Samza, Spark, and the portableApi Dataflow runners.

On Mon, Jan 13, 2020 at 11:27 AM Boyuan Zhang @.***> wrote: This PR also breaks dataflow post commit: https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/ https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/ — You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub <#9677?email_source=notifications&email_token=AFAYJVKOLCBMPNP2RXYIEXTQ5S6BFA5CNFSM4I3DCXKKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEIZ7A3Q#issuecomment-573829230>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFAYJVOJASD45WJPK3X3YKTQ5S6BFANCNFSM4I3DCXKA .

Thank Reuven for the confirmation! Is there anyone actively working on excluding these tests from the mentioned runner? If not, I'm going to work on a PR to add exclusions.

@reuvenlax
Copy link
Contributor

reuvenlax commented Jan 13, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants