Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11068][table] Convert the API classes of *Table to interfaces #8006

Closed
wants to merge 4 commits into from

Conversation

hequn8128
Copy link
Contributor

What is the purpose of the change

This pull request convert table related classes into interface.

Brief change log

  • Convert Table, GroupedTable, GroupWindowedTable, OverWindowedTable, WindowGroupedTable into interface.
  • Port TemporalTableFunction into flink-common. I add no methods in TemporalTableFunction. It seems we don't need to add methods in TemporalTableFunction for now. We can add it later if we find it necessary. It's easier and safer for adding methods than removing methods.

Verifying this change

This change is already covered by existing tests, such as tests for tableApi. And changes for temporal table has been covered by TemporalTableJoin UT/IT test cases.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 18, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@twalthr
Copy link
Contributor

twalthr commented Mar 18, 2019

@flinkbot approve-until consensus

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hequn8128 thank you very much for the work! It looks really good had some minor comments:

  • could we use the Java API by default in javadoc examples, I think the unspoken agreement is that the Java API is the main one (probably it is not true for Table, but we should fix that)
  • could we be more specific with the examples using Expression that they work only for scala as of now. It might be counter intuitive otherwise.

I haven't marked all of those cases.

* <p>Aggregations are performed per group and defined by a subsequent {@code select(...)}
* clause similar to SQL SELECT-GROUP-BY query.
*
* <p>Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change it to Scala example:? This will not work in Java.

* Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
* The field expressions can contain complex expressions and aggregations.
*
* <p>Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala example:?

* Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement.
* The field expressions can contain complex expressions and aggregations.
*
* <p>Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala example:?

* <p>Operations such as {@code join}, {@code select}, {@code where} and {@code groupBy} either
* take arguments in a Scala DSL or as an expression String. Please refer to the documentation for
* the expression syntax.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line

* Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions and aggregations.
*
* <p>Example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala example:?

* <p>For batch tables of finite size, windowing essentially provides shortcuts for time-based
* groupBy.
*
* <p>__Note__: Computing windowed aggregates on a streaming table is only a parallel operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>__Note__: Computing windowed aggregates on a streaming table is only a parallel operation
* <p><b>Note</b>: Computing windowed aggregates on a streaming table is only a parallel operation

* }
* </pre>
*
* <p>__Note__: Computing over window aggregates on a streaming table is only a parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>__Note__: Computing over window aggregates on a streaming table is only a parallel
* <p><b>Note</b>: Computing over window aggregates on a streaming table is only a parallel

* operation if the window is partitioned. Otherwise, the whole stream will be processed by a
* single task, i.e., with parallelism 1.
*
* <p>__Note__: Over-windows for batch tables are currently not supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>__Note__: Over-windows for batch tables are currently not supported.
* <p><b>Note</b>: Over-windows for batch tables are currently not supported.

*
* @param fetch the number of records to return. Fetch must be >= 0.
*/
Table fetch(Integer fetch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

Suggested change
Table fetch(Integer fetch);
Table fetch(int fetch);

*
* @param offset number of records to skip
*/
Table offset(Integer offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

Suggested change
Table offset(Integer offset);
Table offset(int offset);

@dawidwys
Copy link
Contributor

One more thing could we also already remove the javadocs from TableImpl? We shouldn't duplicate it.

@@ -61,7 +60,7 @@ public ResultStore(Configuration flinkConfig) {
*/
public <T> DynamicResult<T> createResult(Environment env, TableSchema schema, ExecutionConfig config) {

final TypeInformation<Row> outputType = Types.ROW_NAMED(schema.getFieldNames(), schema.getFieldTypes());
final RowTypeInfo outputType = (RowTypeInfo) Types.ROW_NAMED(schema.getFieldNames(), schema.getFieldTypes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's just replace it with the RowTypeInfo ctor, if we expect that type anyway?

this.outputType = outputType;

accumulatorName = new AbstractID().toString();
tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config));
tableSink =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change the return type of configure in CollectBatchTableSink to CollectBatchTableSink?

@@ -72,7 +73,9 @@ public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig conf

// create table sink
// pass binding address and port such that sink knows where to send to
collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
collectTableSink =
(CollectStreamTableSink) new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in the batch case.

@@ -42,7 +42,7 @@

@Test
public void testSnapshot() throws UnknownHostException {
final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
final RowTypeInfo type = (RowTypeInfo) Types.ROW(Types.STRING, Types.LONG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go directly for the ctor.

@@ -90,7 +90,7 @@ public void testSnapshot() throws UnknownHostException {

@Test
public void testLimitedSnapshot() throws UnknownHostException {
final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG);
final RowTypeInfo type = (RowTypeInfo) Types.ROW(Types.STRING, Types.LONG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go directly for the ctor.

@hequn8128
Copy link
Contributor Author

@dawidwys Thanks a lot for your review and valuable suggestions. I have addressed all your comments and updated the PR. Would be great if you can take another look. Thank you.

@dawidwys
Copy link
Contributor

Thank you a lot @hequn8128 for the work. It looks good. Last minor thing is we should add override keywords to the methods of TableImpl, but I can do that during merging. Will do that as soon as travis gives green.

@dawidwys
Copy link
Contributor

@flinkbot approve all

@hequn8128
Copy link
Contributor Author

@dawidwys I added the override methods and fix one checkstyle problem. Sorry for the trouble that brings to you.

dawidwys pushed a commit to dawidwys/flink that referenced this pull request Mar 19, 2019
dawidwys pushed a commit to dawidwys/flink that referenced this pull request Mar 19, 2019
@dawidwys dawidwys closed this in 6468024 Mar 19, 2019
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Mar 20, 2019
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Apr 22, 2019
sunhaibotb pushed a commit to sunhaibotb/flink that referenced this pull request May 8, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants