-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11068][table] Convert the API classes of *Table to interfaces #8006
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot approve-until consensus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @hequn8128 thank you very much for the work! It looks really good had some minor comments:
- could we use the Java API by default in javadoc examples, I think the unspoken agreement is that the Java API is the main one (probably it is not true for Table, but we should fix that)
- could we be more specific with the examples using
Expression
that they work only for scala as of now. It might be counter intuitive otherwise.
I haven't marked all of those cases.
* <p>Aggregations are performed per group and defined by a subsequent {@code select(...)} | ||
* clause similar to SQL SELECT-GROUP-BY query. | ||
* | ||
* <p>Example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we change it to Scala example:
? This will not work in Java.
* Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. | ||
* The field expressions can contain complex expressions and aggregations. | ||
* | ||
* <p>Example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala example:?
* Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. | ||
* The field expressions can contain complex expressions and aggregations. | ||
* | ||
* <p>Example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala example:?
* <p>Operations such as {@code join}, {@code select}, {@code where} and {@code groupBy} either | ||
* take arguments in a Scala DSL or as an expression String. Please refer to the documentation for | ||
* the expression syntax. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line
* Performs a selection operation. Similar to an SQL SELECT statement. The field expressions | ||
* can contain complex expressions and aggregations. | ||
* | ||
* <p>Example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala example:?
* <p>For batch tables of finite size, windowing essentially provides shortcuts for time-based | ||
* groupBy. | ||
* | ||
* <p>__Note__: Computing windowed aggregates on a streaming table is only a parallel operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>__Note__: Computing windowed aggregates on a streaming table is only a parallel operation | |
* <p><b>Note</b>: Computing windowed aggregates on a streaming table is only a parallel operation |
* } | ||
* </pre> | ||
* | ||
* <p>__Note__: Computing over window aggregates on a streaming table is only a parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>__Note__: Computing over window aggregates on a streaming table is only a parallel | |
* <p><b>Note</b>: Computing over window aggregates on a streaming table is only a parallel |
* operation if the window is partitioned. Otherwise, the whole stream will be processed by a | ||
* single task, i.e., with parallelism 1. | ||
* | ||
* <p>__Note__: Over-windows for batch tables are currently not supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>__Note__: Over-windows for batch tables are currently not supported. | |
* <p><b>Note</b>: Over-windows for batch tables are currently not supported. |
* | ||
* @param fetch the number of records to return. Fetch must be >= 0. | ||
*/ | ||
Table fetch(Integer fetch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
Table fetch(Integer fetch); | |
Table fetch(int fetch); |
* | ||
* @param offset number of records to skip | ||
*/ | ||
Table offset(Integer offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
Table offset(Integer offset); | |
Table offset(int offset); |
One more thing could we also already remove the javadocs from |
@@ -61,7 +60,7 @@ public ResultStore(Configuration flinkConfig) { | |||
*/ | |||
public <T> DynamicResult<T> createResult(Environment env, TableSchema schema, ExecutionConfig config) { | |||
|
|||
final TypeInformation<Row> outputType = Types.ROW_NAMED(schema.getFieldNames(), schema.getFieldTypes()); | |||
final RowTypeInfo outputType = (RowTypeInfo) Types.ROW_NAMED(schema.getFieldNames(), schema.getFieldTypes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe let's just replace it with the RowTypeInfo
ctor, if we expect that type anyway?
this.outputType = outputType; | ||
|
||
accumulatorName = new AbstractID().toString(); | ||
tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config)); | ||
tableSink = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we change the return type of configure
in CollectBatchTableSink
to CollectBatchTableSink
?
@@ -72,7 +73,9 @@ public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig conf | |||
|
|||
// create table sink | |||
// pass binding address and port such that sink knows where to send to | |||
collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); | |||
collectTableSink = | |||
(CollectStreamTableSink) new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as in the batch case.
@@ -42,7 +42,7 @@ | |||
|
|||
@Test | |||
public void testSnapshot() throws UnknownHostException { | |||
final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG); | |||
final RowTypeInfo type = (RowTypeInfo) Types.ROW(Types.STRING, Types.LONG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go directly for the ctor.
@@ -90,7 +90,7 @@ public void testSnapshot() throws UnknownHostException { | |||
|
|||
@Test | |||
public void testLimitedSnapshot() throws UnknownHostException { | |||
final TypeInformation<Row> type = Types.ROW(Types.STRING, Types.LONG); | |||
final RowTypeInfo type = (RowTypeInfo) Types.ROW(Types.STRING, Types.LONG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go directly for the ctor.
@dawidwys Thanks a lot for your review and valuable suggestions. I have addressed all your comments and updated the PR. Would be great if you can take another look. Thank you. |
Thank you a lot @hequn8128 for the work. It looks good. Last minor thing is we should add |
@flinkbot approve all |
@dawidwys I added the override methods and fix one checkstyle problem. Sorry for the trouble that brings to you. |
What is the purpose of the change
This pull request convert table related classes into interface.
Brief change log
TemporalTableFunction
for now. We can add it later if we find it necessary. It's easier and safer for adding methods than removing methods.Verifying this change
This change is already covered by existing tests, such as tests for tableApi. And changes for temporal table has been covered by TemporalTableJoin UT/IT test cases.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation