Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7571] [table] Fix translation of TableSource with time indicators #4635

Closed
wants to merge 1 commit into from

Conversation

fhueske
Copy link
Contributor

@fhueske fhueske commented Sep 1, 2017

What is the purpose of the change

This PR fixes the translation / code generation for table scans against table sources that include tiem indicator fields. The behavior was broken during the recent time indicator refactoring.
I added a test to ensure the feature remains working.

Brief change log

  • Add an ITCase to ensure that table sources with time indicators are correctly executed
  • Fix the registration of table sources with time indicators

Verifying this change

This change added tests and can be verified as follows:
Run TimeAttributesITCase.testTableSourceWithTimeIndicators()

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? n/a


// append rowtime marker
val withRowtime = if (rowtime.isDefined) {
original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For rowtime we need to replace the index of specific field, but here you are adding index at the end of row. Same thing applies for FieldNames and FieldTypes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how rowtime fields are handled at the moment. They are always appended at the end of the row.

There is a JIRA to use existing fields as rowtime fields (https://issues.apache.org/jira/browse/FLINK-7446) but this has not been implemented yet. I'm currently working on a PR for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Actually we can define existing field as the rowtime field, when we convert DataStream to Table. That's why I was little bit confused, but we have PR for TableSource then it's good.

@fhueske
Copy link
Contributor Author

fhueske commented Sep 20, 2017

@wuchong can you have a look at this PR?
It fixes the code generation for table sources with time attributes.
Thanks!

@wuchong
Copy link
Member

wuchong commented Sep 21, 2017

Hi @fhueske , thanks for the work! It is a very important fix! I'm fine with the changes.

+1 to merge

fhueske added a commit to fhueske/flink that referenced this pull request Sep 21, 2017
@fhueske
Copy link
Contributor Author

fhueske commented Sep 21, 2017

Thanks for the review @wuchong!

Merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants