Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6563] [table] Add time indicator support to KafkaTableSource. #4638

Closed
wants to merge 1 commit into from

Conversation

fhueske
Copy link
Contributor

@fhueske fhueske commented Sep 4, 2017

What is the purpose of the change

Add support for time indicators (processing time, ingestion time, event time) for Kafka table sources that extend KafkaTableSource.

The PR adds the following methods:

  • addProcTimeAttribute(String proctime): adds a processing time attribute to the table
  • addIngestionTimeAttribute(String ingestionTime): adds an ingestion time attribute to the table
  • setAscendingRowTimeAttribute(String rowtime): sets an existing attribute to be the row time attribute and automatically assigns watermarks for monotonically increasing attributes.
  • setBoundedOutOfOrderRowtimeAttribute(String rowtime, long watermarkDelay): sets an existing attribute to be the row time attribute and automatically assigns watermarks with a specified fixed delay.

So, event-time is currently only supported for ascending timestamps and timestamps which are out-of-order by a fixed delay.

Brief change log

  • KafkaTableSource implements DefinedProcTimeAttribute and DefinedRowTimeAttribute interfaces.
  • Added methods to set different time attributes.
  • Added tests to validate the time attribute methods.

Verifying this change

  • added KafkaTableSource test.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? NOT YET

Copy link

@haohui haohui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall. Leave a question

TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes();

// check if the rowtime field exists and remember position
this.rowtimeFieldPos = -1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we need to remove the field here and add it back later on. Changing the orders of the fields seems problematic and can potentially break serialization (in very hacky cases).

Another question is that to which extent a customized timestamp assigner can reuse the code here? Is it possible to implement it as a decorator of the table source? That way it opens up the possibilities to reuse the code for other table sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not very nice to move the time attribute into the StreamRecord, remove the Row field, and copy the StreamRecord timestamp back into the Row. That's just how the current interface is designed. However, this will be changed with FLINK-7446.

The issue of watermark generation (timestamps don't need to be generated as they are already expected to be in the Row) will be addressed by FLINK-7548.

We have to see how much of FLINK-7446 and FLINK-7548 we can solve before Flink 1.4.0. I did this PR to have at least some time attribute support for KafkaTableSources in 1.4.0. Do you need more specialized watermark generators for your use case?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about accelerating the efforts on FLINK-7446 and FLINK-7548?

Unfortunately our use cases are a little bit more than that thus this PR will not solve the problem out of the box. In one use case we have a timestamp that is a double instead of a bigint, in another use case we have the timestamp sit in a nested structure. That's the reason why I'm more inclined for a decorator-based approach which is more easy to customize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. We should not ask each TableSource to assign watermarks.

A decorator approach, i.e., a watermark generation wrapper around a TableSource would be one option. Alternatively, we could also give the responsibility for watermark generation to the scan operator. All the TableSource would need to provide is the name of the field for which watermarks are generated and the watermark strategy ascending, bounded, or custom). The generation would be handled completely internally. At that point, we can also access all fields of any type (Row, Pojo, Tuple, ...), nested or flat, via code generation. So, we would need to extend or replace the DefinedRowtimeAttribute interface because we need information about the watermark strategy (ascending, bounded, or custom). What do you think @haohui?

I already started to work on FLINK-7446 but will also not too much time in the next weeks due to conferences. I hope that we can address these issues with the next release.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently in our internal system we worked around this problem by decorating the data stream return by Kafka. Having the scan operator to assign the watermark seems pretty neat. +1 for that.

It might be cleaner to put both the assignment of the timestamp (i.e., TimestampAssigner) and the naming of the timestamp (i.e., DefinedRowtimeAttribute and DefinedProctimeAttribute) together.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification @haohui.

Would be sufficient to apply default casting logic to the timestamp field, i.e., cast numeric fields to bigint or do you need custom conversion logic (evaluate a custom expression, possibly involving scalar UDFs)?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we do need customer logics -- other people might need them too to take care issues like time zone differences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you also need access to the full row, correct?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of today the answer is no -- we only need a single, top-level field. There might be use cases that have the rowtime in the nested fields. TimestampAssigner does come handy as the flexibility is quite important to us.

Maybe it makes sense to provide some default implementation of TimestampAssigner to extract a field in the row in order to make it easier to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let's collect the requirements for the watermark and timestamp generation in the respective JIRA issue: https://issues.apache.org/jira/browse/FLINK-7548.

@Override
public Row map(Row value) throws Exception {

Row out = new Row(value.getArity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be Row out = new Row(value.getArity() - 1) because you are removing one field from a row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Thanks!

throw new ValidationException(
"You can only specify an ingestion time attribute OR a row time attribute.");
}
this.rowTimeAttribute = ingestionTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be this.ingestionTimeAttribute = ingestionTime;
Otherwise no need of ingestionTimeAttribute variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's inconsistently handled. Thanks for pointing this out.
I removed this.ingestionTimeAttribute

@fhueske fhueske force-pushed the tableKafkaTime branch 2 times, most recently from 17cabce to 44458ba Compare October 5, 2017 14:33
@fhueske fhueske force-pushed the tableKafkaTime branch 5 times, most recently from f0c9b04 to 26d97f3 Compare October 31, 2017 20:39
fhueske added a commit to fhueske/flink that referenced this pull request Oct 31, 2017
@fhueske
Copy link
Contributor Author

fhueske commented Oct 31, 2017

Running some last tests before merging this PR

fhueske added a commit to fhueske/flink that referenced this pull request Oct 31, 2017
@asfgit asfgit closed this in 0e92b66 Nov 1, 2017
@fhueske fhueske deleted the tableKafkaTime branch November 2, 2017 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants