-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1514] change default timestamp in KafkaIO #2267
Conversation
sync-up code
1). fix issue of NO_TIMESTAMP type in 10; 2). rename field to 'timestamp';
Refer to this link for build results (access rights to CI server needed): |
R: @rangadi |
@@ -57,4 +74,31 @@ public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPa | |||
mapContext.setVariable("tp", topicPartitions); | |||
assignExpression.getValue(mapContext); | |||
} | |||
|
|||
private boolean hasTimestamp(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this method? Could you check the return type when you check 'timestamp' method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is used to check whether ConsumerRecord.class
has a method timestamp
which returns Long.TYPE
. Kafka0.9 doesn't have it, Kafka0.10 has. The check should be complete as
hasEventTimestamp = timestampMethod != null
&& timestampMethod.getReturnType().equals(Long.TYPE);
Right?
@xumingmin, did you check the review comments from the earlier PR for this? What do you think of them? |
return hasEventTimestamp; | ||
} | ||
|
||
public long getEventTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename ito getTimestamp or getRecordTimestamp()? Rest of KafkaIO refers each messages as 'record' rather than an 'event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surely, will do
@@ -986,11 +996,13 @@ public boolean advance() throws IOException { | |||
rawRecord.topic(), | |||
rawRecord.partition(), | |||
rawRecord.offset(), | |||
consumerSpEL.getEventTimestamp(rawRecord), | |||
decode(rawRecord.key(), source.spec.getKeyCoder()), | |||
decode(rawRecord.value(), source.spec.getValueCoder())); | |||
|
|||
curTimestamp = (source.spec.getTimestampFn() == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curTimestamp is same as record timestamp. Here, we apply user function only to curTimestamp, why? To avoid this confusion, I suggest removing curTimestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curTimestamp
is different from KafkaRecord.timestamp
when setTimestampFn()
is there.
Example like it's a topic of checkout entries, I can setTimestampFn()
to extract checkout_timestamp as curTimestamp
, neither Kafka record timestamp nor current_timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, right. curTimestamp defaults to kafkaRecord timestamp and could be overridden by the user.
Can you add comment in getWatermark()? |
@rangadi should all comments in previous PR included here, kindly let me know if anything missing.
My thought is, I should change the logic of curTimestamp = max(curTimestamp,
(source.spec.getTimestampFn() == null) ? new Instant(record.getTimestamp())
: source.spec.getTimestampFn().apply(record)
); Is that the right way to go? |
Refer to this link for build results (access rights to CI server needed): |
@@ -50,15 +50,15 @@ | |||
parser.parseExpression("#consumer.assign(#tp)"); | |||
|
|||
private Method timestampMethod; | |||
private boolean hasEventTimestamp = false; | |||
private boolean hasRecordTimestamp = false; | |||
|
|||
public ConsumerSpEL() { | |||
try { | |||
timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null); | |||
} catch (NoSuchMethodException | SecurityException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about hasRecordTimeStamp = timestampMethod.getReturnType().equals(Long.TYPE)
here?
That way you can remove lines 78-87.
getWatermark() handling becomes quite important and more tricky now. I think we need to handle it appropriately if kafka timestamp becomes the default.
Current watermark is ok only because of processing time. If we make new timestamps the default, it could be surprising to unsuspecting users. What do you think?
|
Refer to this link for build results (access rights to CI server needed): |
Add up the two options you suggested, I think we can provide a default
The watermark should cover as many records as possible, to avoid marking as late-record. |
(1): What happens when the pipeline is behind by 5 minutes. We will end up marking 4 minutes of that as late data. I think we can do a pretty good job with (2): Not sure that. Why not less than all of them. In case of 'createTime' we can require user to set the watermarkFn for now. Implementation wise, we can have multiple watermark policies 'LogAppendTimePolicy', 'ProcessingTimePolicy', 'CreateTimePolicy' (that last one just requires user to set watermakFn). For this PR, we can default to old behavior of 'processing time' for 'curTimestamp' and follow up with proper support for LogAppendTime. Rest of the PR could remain the same, i.e. 'timestmap' in KafkaRecord would be the timestamp provided by Kafka. In your Kafka 0.10.x deployment, did you enable per record timestamps? |
cc: @amitsela |
Use I would agree to step back to keep the old behavior like:
Developers need to tune the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a bunch of minor comments.
@@ -57,4 +74,20 @@ public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPa | |||
mapContext.setVariable("tp", topicPartitions); | |||
assignExpression.getValue(mapContext); | |||
} | |||
|
|||
public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) { | |||
long timestamp = -1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this doesn't have to be initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
try { | ||
//for Kafka 0.9, set to System.currentTimeMillis(); | ||
//for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis(); | ||
if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) == -1L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I don't know of something specific that would cause a -1
return value, and since there is a return value if the code gets here, I'd go with < 0
instead of == -1L
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1L
is the timestamp returned when NO_TIMESTAMP
in Kafka0.10, could change to <=0
to cover more unexpected cases.
@@ -31,6 +31,7 @@ | |||
import com.google.common.collect.Iterators; | |||
import com.google.common.collect.Lists; | |||
import com.google.common.io.Closeables; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you have a bunch of newlines added in-between imports that probably the IDE's "auto-indent" added, they are unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kind of code standard, same for the added @Override
in KafkaIO, intend to keep it for better reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually take care not to impose my code standards in older code it am modifying it anyway. But ok :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point to follow, let me take a look at my format profile.
I think @xumingmin has a point here - by adding the "Kafka timestamp" to the |
Yes, that is what I am suggesting here short of proper handling of LogAppendTime. See the caveat below.
You mean 'curTimestamp'? This policy requires that we should also default 'curTimestamp' to processing timestamp, not kafka record timestamp. |
return source.spec.getWatermarkFn() != null | ||
? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is a good default behavior. Could you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set the default watermark to current_timestamp if no user-specified WarermarkFn
. This should be the same as before since curTimestamp
in kafka0.9 equals to current_timestamp if no timestampFn
provided.
I guess in kafka0.9, if timestampFn
is provided, this watermark
cannot guarantee to monotone increasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set the default watermark to current_timestamp if no user-specified WarermarkFn.
As I mentioned earlier, this works only when curTimestamp is also now
. Do you agree? Otherwise, most if not all the records could end up as late, which would be a very surprising result for users. I don't think just asking the user to 'set the right watermark fn' as a requirement is a good default. Places a lot of burden even on simple use cases.
I guess in kafka0.9, if timestampFn is provided, this watermark cannot guarantee to monotone increasing.
True, but that is not the default. That needed improvement anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I assume it's part of kafka0.10 client support, so users follow this way need to take care of the new scenario.
kafka_record_timestamp
is kind of customized event_timestamp
for me, so I would ask developers to handle late-data properly, mostly with Trigger.withAllowedLateness()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Often the consumers of the streams may not be aware of these timestamps or might just want KafkaIO to do the most sensible thing. I am not sure having a default implementation that marks most records as late and expecting user to handle the correctly is a great policy.
To summarize, my proposal (reasons are discussed earlier):
- Default to processing timestamps. i.e. KafkaRecord will have the timestamp provided by the Kafka (if any) and 'curTimestamp' and 'watermark' don't change from previous behavior. The simplest option.
OR
Handle LogAppendTimestamp properly to return appropriate watermark (essentiallymin(log_append_time for each partition read)
. We can do this in this PR or in another one.
I think this is an important aspect of this PR. Would love to hear yours and others thoughts too. cc @kennknowles @dhalperi @davorbonaci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with 1) ,
by default both
event_timestamp
andwatermark
as processing timestamps.
Developers can combine event_timestamp
and watermark
if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @rangadi , updated as talked.
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Couple of suggestioins for comments above. Thanks @xumingmin.
timestamp = System.currentTimeMillis(); | ||
} | ||
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { | ||
// should not go to here, as it's confirmed that method timestamp() is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rephrase? "Not expected. timestampMethod() is already checked."
@@ -203,6 +203,12 @@ | |||
* {@link ProducerConfig} for sink. E.g. if you would like to enable offset | |||
* <em>auto commit</em> (for external monitoring or other purposes), you can set | |||
* <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc. | |||
* | |||
* <h3>Event Timestamp and Watermark</h3> | |||
* The default event_timestamp and watermark increases along as processing timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: s/Event/Record/ ?
"By default record timestamp and watermark are based on processing time in KafkaIO reader. This can be overridden by providing {@code WatermarkFn} with {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with {@link Read#withTimestampFn(SerializableFunction)}. Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any, otherwise it is set to processing time."
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
👍 |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xumingmin and @rangadi. Merging.
thank you @davorbonaci |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.