-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6563] [table] Add time indicator support to KafkaTableSource. #4638
Conversation
ba6385b
to
7ef12c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall. Leave a question
TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); | ||
|
||
// check if the rowtime field exists and remember position | ||
this.rowtimeFieldPos = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering why we need to remove the field here and add it back later on. Changing the orders of the fields seems problematic and can potentially break serialization (in very hacky cases).
Another question is that to which extent a customized timestamp assigner can reuse the code here? Is it possible to implement it as a decorator of the table source? That way it opens up the possibilities to reuse the code for other table sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not very nice to move the time attribute into the StreamRecord
, remove the Row
field, and copy the StreamRecord
timestamp back into the Row
. That's just how the current interface is designed. However, this will be changed with FLINK-7446.
The issue of watermark generation (timestamps don't need to be generated as they are already expected to be in the Row
) will be addressed by FLINK-7548.
We have to see how much of FLINK-7446 and FLINK-7548 we can solve before Flink 1.4.0. I did this PR to have at least some time attribute support for KafkaTableSources in 1.4.0. Do you need more specialized watermark generators for your use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about accelerating the efforts on FLINK-7446 and FLINK-7548?
Unfortunately our use cases are a little bit more than that thus this PR will not solve the problem out of the box. In one use case we have a timestamp that is a double
instead of a bigint
, in another use case we have the timestamp sit in a nested structure. That's the reason why I'm more inclined for a decorator-based approach which is more easy to customize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right. We should not ask each TableSource
to assign watermarks.
A decorator approach, i.e., a watermark generation wrapper around a TableSource
would be one option. Alternatively, we could also give the responsibility for watermark generation to the scan operator. All the TableSource
would need to provide is the name of the field for which watermarks are generated and the watermark strategy ascending, bounded, or custom). The generation would be handled completely internally. At that point, we can also access all fields of any type (Row, Pojo, Tuple, ...), nested or flat, via code generation. So, we would need to extend or replace the DefinedRowtimeAttribute
interface because we need information about the watermark strategy (ascending, bounded, or custom). What do you think @haohui?
I already started to work on FLINK-7446 but will also not too much time in the next weeks due to conferences. I hope that we can address these issues with the next release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently in our internal system we worked around this problem by decorating the data stream return by Kafka. Having the scan operator to assign the watermark seems pretty neat. +1 for that.
It might be cleaner to put both the assignment of the timestamp (i.e., TimestampAssigner
) and the naming of the timestamp (i.e., DefinedRowtimeAttribute
and DefinedProctimeAttribute
) together.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification @haohui.
Would be sufficient to apply default casting logic to the timestamp field, i.e., cast numeric fields to bigint
or do you need custom conversion logic (evaluate a custom expression, possibly involving scalar UDFs)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we do need customer logics -- other people might need them too to take care issues like time zone differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you also need access to the full row, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of today the answer is no -- we only need a single, top-level field. There might be use cases that have the rowtime in the nested fields. TimestampAssigner
does come handy as the flexibility is quite important to us.
Maybe it makes sense to provide some default implementation of TimestampAssigner
to extract a field in the row in order to make it easier to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Let's collect the requirements for the watermark and timestamp generation in the respective JIRA issue: https://issues.apache.org/jira/browse/FLINK-7548.
@Override | ||
public Row map(Row value) throws Exception { | ||
|
||
Row out = new Row(value.getArity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be Row out = new Row(value.getArity() - 1)
because you are removing one field from a row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. Thanks!
throw new ValidationException( | ||
"You can only specify an ingestion time attribute OR a row time attribute."); | ||
} | ||
this.rowTimeAttribute = ingestionTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be this.ingestionTimeAttribute = ingestionTime;
Otherwise no need of ingestionTimeAttribute
variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's inconsistently handled. Thanks for pointing this out.
I removed this.ingestionTimeAttribute
17cabce
to
44458ba
Compare
f0c9b04
to
26d97f3
Compare
…kaTableSources. This closes apache#4638.
Running some last tests before merging this PR |
…kaTableSources. This closes apache#4638.
What is the purpose of the change
Add support for time indicators (processing time, ingestion time, event time) for Kafka table sources that extend
KafkaTableSource
.The PR adds the following methods:
addProcTimeAttribute(String proctime)
: adds a processing time attribute to the tableaddIngestionTimeAttribute(String ingestionTime)
: adds an ingestion time attribute to the tablesetAscendingRowTimeAttribute(String rowtime)
: sets an existing attribute to be the row time attribute and automatically assigns watermarks for monotonically increasing attributes.setBoundedOutOfOrderRowtimeAttribute(String rowtime, long watermarkDelay)
: sets an existing attribute to be the row time attribute and automatically assigns watermarks with a specified fixed delay.So, event-time is currently only supported for ascending timestamps and timestamps which are out-of-order by a fixed delay.
Brief change log
KafkaTableSource
implementsDefinedProcTimeAttribute
andDefinedRowTimeAttribute
interfaces.Verifying this change
KafkaTableSource
test.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation