-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Beam-2535] : Support outputTimestamp and watermark holds in timers. #9677
Conversation
…urrent 4700 pull request into my branch. And apply required changes
…urrent 4700 pull request into my branch. And apply required changes
…urrent 4700 pull request into my branch. And apply required changes
timerData.getTimerId(), | ||
window, | ||
timerData.getTimestamp(), | ||
timerData.getTimestamp(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this timerData.getTimestamp()
timer.getTimerId(), | ||
window, | ||
timer.getTimestamp(), | ||
timer.getTimestamp(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be timer.getOutputTimestamp()
StateNamespace namespace, | ||
String timerId, | ||
Instant timestamp, | ||
Instant OutputTimestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: case is wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutputTimestamp should be outputTimestamp
@@ -119,7 +119,8 @@ public boolean receive(String pCollectionId, Object receivedElement) { | |||
String timerId = timerSpec.timerId(); | |||
|
|||
TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals(); | |||
timerInternals.setTimer(namespace, timerId, timer.getTimestamp(), timeDomain); | |||
timerInternals.setTimer( | |||
namespace, timerId, timer.getTimestamp(), timer.getTimestamp(), timeDomain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the second timer.getTimestamp() should be timer.getOutputTimestamp()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here timer is not belongs to TimerData.java so its don`t have getOutputTimestamp() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we should add outputTimestamp to TimerInternals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that the Timer object always gets constructed with an output timestamp so that we can plumb it through here.
timer.getTimerId(), | ||
window, | ||
timer.getTimestamp(), | ||
timer.getTimestamp(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timer.getOutputTimestamp
Instant target, | ||
Instant targetOutput, | ||
TimeDomain timeDomain) { | ||
getInternals().setTimer(namespace, timerId, target, target, timeDomain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second "target" should be "targetOutput"
timer.getTimerId(), | ||
window, | ||
timer.getTimestamp(), | ||
timer.getTimestamp(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timer.getOutputTimestamp()
@@ -92,7 +92,8 @@ private static void fireTimer( | |||
TimerInternals.TimerData timer, DoFnRunner<KV<?, ?>, ?> doFnRunner) { | |||
StateNamespace namespace = timer.getNamespace(); | |||
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); | |||
doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); | |||
doFnRunner.onTimer( | |||
timer.getTimerId(), window, timer.getTimestamp(), timer.getTimestamp(), timer.getDomain()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second timer.getTimestamp() should be timer.getOutputTimestamp()
@@ -160,7 +160,7 @@ public void commit() throws IOException { | |||
|
|||
// Set a timer to continue processing this element. | |||
timerInternals.setTimer( | |||
stateNamespace, "sdfContinuation", wakeupTime, TimeDomain.PROCESSING_TIME); | |||
stateNamespace, "sdfContinuation", wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double check the second wakeupTime. Looks suspicious
String timerId, | ||
Instant target, | ||
Instant targetOutput, | ||
TimeDomain timeDomain) { | ||
TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are ignoring targetOutput
Any test results after commit 524d991? |
Some test cases still failing after changed |
So how many previously failing tests are now passing? How many failures
remain?
…On Tue, Oct 15, 2019 at 10:55 PM xubii ***@***.***> wrote:
Any test results after commit 524d991?
Some test cases still failing after changed
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#9677?email_source=notifications&email_token=AHYHLVWYBA7CUFAYDI7QWY3QO2UDPA5CNFSM4I3DCXKKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEBLF5JA#issuecomment-542531236>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AHYHLVV6H6PSQGP6VH74C5LQO2UDPANCNFSM4I3DCXKA>
.
--
*Shehzaad Nakhoda*
CTO
USA/WhatsApp: +1 6502085107
PAK: +92 3082654179
Skype: shehzaad.nakhoda
<http:https://venturedive.com/>
|
The JavaPortabilityApi test seems to have a compile error.
The two SQL failures are the same: they are ZetaSQL and Calcite SQL versions of testing LIMIT with OFFSET. This uses stateful DoFn here: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java#L248 |
The HBase tests are using SplittableDoFn. It is getting a null range to the key range tracker: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java#L54 Since SDF is probably implemented using state & timers under the hood, this is probably a good bug to catch. |
run dataflow validatesrunner |
I expect that test suite to uncover some more instances of state/timers having trouble. |
Notably, the thing that is null is never permitted to be null. So our static analysis has failed to detect a critical error somewhere. |
run dataflow validatesrunner |
8fb2714
to
b0eb6f8
Compare
b0eb6f8
to
7b67a92
Compare
retest this please |
I added a test to this branch that should verify this behavior. It might require running through spotless |
Retest this please |
@reuvenlax , can you please run the jobs? Seems like I cannot run it according to new Jenkins job execution policy. |
retest this please |
1 similar comment
retest this please |
retest this please |
Run Java PreCommit |
run spark validatesrunner |
run flink validatesrunner |
Run Flink ValidatesRunner |
Run Spark ValidatesRunner |
This is almost done, however you need to exclude the Flink runner from the new test, as it is failing. To do this you need to introduce a new testing tag (e.g. look at the UsesTestStream interface) and then add it to the excludeCategories list in flink_runner.gradle |
@reuvenlax , done |
Run Flink ValidatesRunner |
1 similar comment
Run Flink ValidatesRunner |
Seems like this PR is breaking: @xubii could you please help look into it? |
File JIAR here: https://issues.apache.org/jira/browse/BEAM-9083 |
This PR also breaks dataflow post commit: https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/ |
Flink tests should explicitly excluded, but maybe we didn't exclude the
portable flink tests. There's a pending PR to add support for Flink.
We should add exclusions for Samza, Spark, and the portableApi Dataflow
runners.
…On Mon, Jan 13, 2020 at 11:27 AM Boyuan Zhang ***@***.***> wrote:
This PR also breaks dataflow post commit:
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#9677?email_source=notifications&email_token=AFAYJVKOLCBMPNP2RXYIEXTQ5S6BFA5CNFSM4I3DCXKKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEIZ7A3Q#issuecomment-573829230>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVOJASD45WJPK3X3YKTQ5S6BFANCNFSM4I3DCXKA>
.
|
Thank Reuven for the confirmation! Is there anyone actively working on excluding these tests from the mentioned runner? If not, I'm going to work on a PR to add exclusions. |
Nobody yet. The test is already marked with an test category (because we
currently exclude the regular Flink runner).
It also looks like the test should have been marked with UsesTestStream,
which I think would fix the Spark problem (probably by excluding Spark).
On Mon, Jan 13, 2020 at 12:04 PM Boyuan Zhang <[email protected]>
wrote:
… Flink tests should explicitly excluded, but maybe we didn't exclude the
portable flink tests. There's a pending PR to add support for Flink. We
should add exclusions for Samza, Spark, and the portableApi Dataflow
runners.
… <#m_3450155051590485810_>
On Mon, Jan 13, 2020 at 11:27 AM Boyuan Zhang *@*.***> wrote: This PR
also breaks dataflow post commit:
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/
— You are receiving this because you modified the open/close state. Reply
to this email directly, view it on GitHub <#9677
<#9677>?email_source=notifications&email_token=AFAYJVKOLCBMPNP2RXYIEXTQ5S6BFA5CNFSM4I3DCXKKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEIZ7A3Q#issuecomment-573829230>,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AFAYJVOJASD45WJPK3X3YKTQ5S6BFANCNFSM4I3DCXKA
.
Thank Reuven for the confirmation! Is there anyone actively working on
excluding these tests from the mentioned runner? If not, I'm going to work
on a PR to add exclusions.
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#9677?email_source=notifications&email_token=AFAYJVMJA3CJYO4CZTLCXF3Q5TCNVA5CNFSM4I3DCXKKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEI2DBAI#issuecomment-573845633>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVM62OSOGTX4GIVUAFDQ5TCNVANCNFSM4I3DCXKA>
.
|
It is the initial phase to make staled PR (#4700) compatible with the upstream master. The default implementation is done for Direct Runner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.