Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1514] change default timestamp in KafkaIO #2267

Closed
wants to merge 8 commits into from

Conversation

mingmxu
Copy link

@mingmxu mingmxu commented Mar 17, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

xumingmin and others added 2 commits March 15, 2017 14:45
1). fix issue of NO_TIMESTAMP type in 10;
2). rename field to 'timestamp';
@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.12% when pulling 03f8497 on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8529/
--none--

@davorbonaci
Copy link
Member

R: @rangadi

@@ -57,4 +74,31 @@ public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPa
mapContext.setVariable("tp", topicPartitions);
assignExpression.getValue(mapContext);
}

private boolean hasTimestamp(){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this method? Could you check the return type when you check 'timestamp' method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used to check whether ConsumerRecord.class has a method timestamp which returns Long.TYPE. Kafka0.9 doesn't have it, Kafka0.10 has. The check should be complete as

hasEventTimestamp = timestampMethod != null
          && timestampMethod.getReturnType().equals(Long.TYPE);

Right?

@rangadi
Copy link
Contributor

rangadi commented Mar 21, 2017

@xumingmin, did you check the review comments from the earlier PR for this? What do you think of them?

return hasEventTimestamp;
}

public long getEventTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename ito getTimestamp or getRecordTimestamp()? Rest of KafkaIO refers each messages as 'record' rather than an 'event.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surely, will do

@@ -986,11 +996,13 @@ public boolean advance() throws IOException {
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
consumerSpEL.getEventTimestamp(rawRecord),
decode(rawRecord.key(), source.spec.getKeyCoder()),
decode(rawRecord.value(), source.spec.getValueCoder()));

curTimestamp = (source.spec.getTimestampFn() == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curTimestamp is same as record timestamp. Here, we apply user function only to curTimestamp, why? To avoid this confusion, I suggest removing curTimestamp.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curTimestamp is different from KafkaRecord.timestamp when setTimestampFn() is there.
Example like it's a topic of checkout entries, I can setTimestampFn() to extract checkout_timestamp as curTimestamp, neither Kafka record timestamp nor current_timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, right. curTimestamp defaults to kafkaRecord timestamp and could be overridden by the user.

@rangadi
Copy link
Contributor

rangadi commented Mar 21, 2017

Can you add comment in getWatermark()?
What are the guarantees about Kafka timestamps? If they are server side ingest timestamps, are they expected to be monotonically increasing? What happens if these are producer side publish timestamp? Could they be out of order? getWatermark() should return monotonically increasing timestamp.

@mingmxu
Copy link
Author

mingmxu commented Mar 21, 2017

@rangadi should all comments in previous PR included here, kindly let me know if anything missing.

getWatermark() is the one I didn't notice. With message timestamp in Kafka 0.10, I don't think it can guarantee to be monotonically increasing:

  1. with LogAppendTime timestamp may be out-of-order when reading from multiple partitions;
  2. with CreateTime, it's similar, very likely out-of-order;

My thought is, I should change the logic of curTimestamp to

curTimestamp = max(curTimestamp,  
      (source.spec.getTimestampFn() == null) ? new Instant(record.getTimestamp())
                  : source.spec.getTimestampFn().apply(record)
   );

Is that the right way to go?

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 69.919% when pulling c8eb75e on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8639/
--none--

@@ -50,15 +50,15 @@
parser.parseExpression("#consumer.assign(#tp)");

private Method timestampMethod;
private boolean hasEventTimestamp = false;
private boolean hasRecordTimestamp = false;

public ConsumerSpEL() {
try {
timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
} catch (NoSuchMethodException | SecurityException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about hasRecordTimeStamp = timestampMethod.getReturnType().equals(Long.TYPE) here?
That way you can remove lines 78-87.

@rangadi
Copy link
Contributor

rangadi commented Mar 22, 2017

getWatermark() handling becomes quite important and more tricky now. I think we need to handle it appropriately if kafka timestamp becomes the default.

  • LogAppendTime :
    • We could handle it by keeping minimum timestamp we have seen across the partitions (also update it only after we know have tried to read from the partition with lowest timestamp). i.e. upto 1 sec latency.
  • with createTime:
    • ¯\_(ツ)_/¯
    • May be we need to do something like what PubsubIO. Delay a minute or so..

Current watermark is ok only because of processing time. If we make new timestamps the default, it could be surprising to unsuspecting users. What do you think?

LogAppendTime might be the most common use case. It might be good enough for now to handle that. This would handle backlogged pipelines very nicely. The windows will fire correctly.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.2%) to 69.919% when pulling 55bb5ec on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8641/
--none--

@mingmxu
Copy link
Author

mingmxu commented Mar 22, 2017

Add up the two options you suggested, I think we can provide a default watermarkFn(), it returns a watermark with the minimum value of:

  1. latest_timestamp minus 1 minute;
  2. timestamp less than 99% records in past minutes for example. The percentage/statistics window is adjustable. --It may be costly.

The watermark should cover as many records as possible, to avoid marking as late-record.

@rangadi
Copy link
Contributor

rangadi commented Mar 23, 2017

(1): What happens when the pipeline is behind by 5 minutes. We will end up marking 4 minutes of that as late data.

I think we can do a pretty good job with LogAppendTime. E.g. we can keep track of latest timestamp from each of the partitions and return the least among them. Only tricky part is handling idle partitions where we might not see any records for some time. We know if we have completely consumed a partition. We can advance the watermark to something like 'current time - 5 seconds' in that case.

(2): Not sure that. Why not less than all of them. In case of 'createTime' we can require user to set the watermarkFn for now.

Implementation wise, we can have multiple watermark policies 'LogAppendTimePolicy', 'ProcessingTimePolicy', 'CreateTimePolicy' (that last one just requires user to set watermakFn).

For this PR, we can default to old behavior of 'processing time' for 'curTimestamp' and follow up with proper support for LogAppendTime. Rest of the PR could remain the same, i.e. 'timestmap' in KafkaRecord would be the timestamp provided by Kafka.

In your Kafka 0.10.x deployment, did you enable per record timestamps?

@rangadi
Copy link
Contributor

rangadi commented Mar 23, 2017

cc: @amitsela

@mingmxu
Copy link
Author

mingmxu commented Mar 23, 2017

Use createTime in my case, however I provide customized TimestampFn() and watermarkFn() to handle all know 'issues', like latency skew between partitions, restart from an offset days ago. Meanwhile need to handle late-data properly.

I would agree to step back to keep the old behavior like:

#event time
curTimestamp = (source.spec.getTimestampFn() == null)
              ? new Instant(record.getTimestamp())
                  : source.spec.getTimestampFn().apply(record);
#watermark
return source.spec.getWatermarkFn() != null
          ? source.spec.getWatermarkFn().apply(curRecord) : Instant.now();

Developers need to tune the watermarkFn, any thoughts?

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a bunch of minor comments.

@@ -57,4 +74,20 @@ public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPa
mapContext.setVariable("tp", topicPartitions);
assignExpression.getValue(mapContext);
}

public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
long timestamp = -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this doesn't have to be initialized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

try {
//for Kafka 0.9, set to System.currentTimeMillis();
//for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis();
if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) == -1L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I don't know of something specific that would cause a -1 return value, and since there is a return value if the code gets here, I'd go with < 0 instead of == -1L

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1L is the timestamp returned when NO_TIMESTAMP in Kafka0.10, could change to <=0 to cover more unexpected cases.

@@ -31,6 +31,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you have a bunch of newlines added in-between imports that probably the IDE's "auto-indent" added, they are unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of code standard, same for the added @Override in KafkaIO, intend to keep it for better reading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually take care not to impose my code standards in older code it am modifying it anyway. But ok :-).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point to follow, let me take a look at my format profile.

@amitsela
Copy link
Member

I think @xumingmin has a point here - by adding the "Kafka timestamp" to the KafkaRecord and providing it to the WM function we give users more information, while still defaulting to a naive "now" WM.

@rangadi
Copy link
Contributor

rangadi commented Mar 23, 2017

I think @xumingmin has a point here - by adding the "Kafka timestamp" to the KafkaRecord and providing it to the WM function we give users more information

Yes, that is what I am suggesting here short of proper handling of LogAppendTime. See the caveat below.

while still defaulting to a naive "now" WM.

You mean 'curTimestamp'? This policy requires that we should also default 'curTimestamp' to processing timestamp, not kafka record timestamp.

return source.spec.getWatermarkFn() != null
? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is a good default behavior. Could you explain?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set the default watermark to current_timestamp if no user-specified WarermarkFn. This should be the same as before since curTimestamp in kafka0.9 equals to current_timestamp if no timestampFn provided.

I guess in kafka0.9, if timestampFn is provided, this watermark cannot guarantee to monotone increasing.

Copy link
Contributor

@rangadi rangadi Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set the default watermark to current_timestamp if no user-specified WarermarkFn.

As I mentioned earlier, this works only when curTimestamp is also now. Do you agree? Otherwise, most if not all the records could end up as late, which would be a very surprising result for users. I don't think just asking the user to 'set the right watermark fn' as a requirement is a good default. Places a lot of burden even on simple use cases.

I guess in kafka0.9, if timestampFn is provided, this watermark cannot guarantee to monotone increasing.

True, but that is not the default. That needed improvement anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I assume it's part of kafka0.10 client support, so users follow this way need to take care of the new scenario.
kafka_record_timestamp is kind of customized event_timestamp for me, so I would ask developers to handle late-data properly, mostly with Trigger.withAllowedLateness().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Often the consumers of the streams may not be aware of these timestamps or might just want KafkaIO to do the most sensible thing. I am not sure having a default implementation that marks most records as late and expecting user to handle the correctly is a great policy.

To summarize, my proposal (reasons are discussed earlier):

  • Default to processing timestamps. i.e. KafkaRecord will have the timestamp provided by the Kafka (if any) and 'curTimestamp' and 'watermark' don't change from previous behavior. The simplest option.
  • OR Handle LogAppendTimestamp properly to return appropriate watermark (essentially min(log_append_time for each partition read). We can do this in this PR or in another one.

I think this is an important aspect of this PR. Would love to hear yours and others thoughts too. cc @kennknowles @dhalperi @davorbonaci

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with 1) ,

by default both event_timestamp and watermark as processing timestamps.

Developers can combine event_timestamp and watermark if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @rangadi , updated as talked.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.05%) to 70.162% when pulling 20a0e06 on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8727/
--none--

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Couple of suggestioins for comments above. Thanks @xumingmin.

timestamp = System.currentTimeMillis();
}
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
// should not go to here, as it's confirmed that method timestamp() is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase? "Not expected. timestampMethod() is already checked."

@@ -203,6 +203,12 @@
* {@link ProducerConfig} for sink. E.g. if you would like to enable offset
* <em>auto commit</em> (for external monitoring or other purposes), you can set
* <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
*
* <h3>Event Timestamp and Watermark</h3>
* The default event_timestamp and watermark increases along as processing timestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: s/Event/Record/ ?
"By default record timestamp and watermark are based on processing time in KafkaIO reader. This can be overridden by providing {@code WatermarkFn} with {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with {@link Read#withTimestampFn(SerializableFunction)}. Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any, otherwise it is set to processing time."

@coveralls
Copy link

Coverage Status

Coverage increased (+0.06%) to 70.172% when pulling 6148ac6 on XuMingmin:BEAM-1705 into b672cde on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.06%) to 70.178% when pulling 6148ac6 on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8739/
--none--

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8738/
--none--

@rangadi
Copy link
Contributor

rangadi commented Mar 23, 2017

👍

@coveralls
Copy link

Coverage Status

Coverage increased (+0.06%) to 70.172% when pulling 6148ac6 on XuMingmin:BEAM-1705 into b672cde on apache:master.

@asfbot
Copy link

asfbot commented Mar 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8743/
--none--

Copy link
Member

@davorbonaci davorbonaci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xumingmin and @rangadi. Merging.

@asfgit asfgit closed this in 5c2da7d Mar 24, 2017
@mingmxu
Copy link
Author

mingmxu commented Mar 24, 2017

thank you @davorbonaci
special thanks to @rangadi for the discussion, really helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants