diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java index d299749491342..55f003862f477 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java @@ -24,7 +24,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; import org.apache.flink.table.sinks.BatchTableSink; -import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; /** @@ -59,7 +58,7 @@ public TypeInformation[] getFieldTypes() { } @Override - public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + public CollectBatchTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { final CollectBatchTableSink copy = new CollectBatchTableSink(accumulatorName, serializer); copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java index c71be18e095a4..e36eb35c43aa0 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.experimental.CollectSink; import org.apache.flink.table.sinks.RetractStreamTableSink; -import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import java.net.InetAddress; @@ -60,7 +59,7 @@ public TypeInformation[] getFieldTypes() { } @Override - public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { final CollectStreamTableSink copy = new CollectStreamTableSink(targetAddress, targetPort, serializer); copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 8670dcb020146..e9ee0008cb0f4 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -419,7 +419,9 @@ private ResultDescriptor executeQueryInternal(ExecutionContext context, S try { // writing to a sink requires an optimization step that might reference UDFs during code compilation context.wrapClassLoader(() -> { - table.writeToSink(result.getTableSink(), envInst.getQueryConfig()); + envInst + .getTableEnvironment() + .writeToSink(table, result.getTableSink(), envInst.getQueryConfig()); return null; }); jobGraph = envInst.createJobGraph(jobName); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java index 7a2432593f17f..dba7ed6752748 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java @@ -19,8 +19,7 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.net.ConnectionUtils; @@ -33,7 +32,6 @@ import org.apache.flink.table.client.gateway.local.result.DynamicResult; import org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult; import org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult; -import org.apache.flink.types.Row; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -61,7 +59,7 @@ public ResultStore(Configuration flinkConfig) { */ public DynamicResult createResult(Environment env, TableSchema schema, ExecutionConfig config) { - final TypeInformation outputType = Types.ROW_NAMED(schema.getFieldNames(), schema.getFieldTypes()); + final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); if (env.getExecution().isStreamingExecution()) { // determine gateway address (and port if possible) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java index 4e768887c0804..38e9126d71d75 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java @@ -19,8 +19,8 @@ package org.apache.flink.table.client.gateway.local.result; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; @@ -38,7 +38,7 @@ public class ChangelogCollectStreamResult extends CollectStreamResult impl private List> changeRecordBuffer; private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000; - public ChangelogCollectStreamResult(TypeInformation outputType, ExecutionConfig config, + public ChangelogCollectStreamResult(RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) { super(outputType, config, gatewayAddress, gatewayPort); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java index 656057b000717..6cdebca133991 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.experimental.SocketStreamIterator; @@ -54,7 +55,7 @@ public abstract class CollectStreamResult extends BasicResult implements D protected final Object resultLock; protected SqlExecutionException executionException; - public CollectStreamResult(TypeInformation outputType, ExecutionConfig config, + public CollectStreamResult(RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort) { this.outputType = outputType; @@ -72,7 +73,8 @@ public CollectStreamResult(TypeInformation outputType, ExecutionConfig conf // create table sink // pass binding address and port such that sink knows where to send to - collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); + collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer) + .configure(outputType.getFieldNames(), outputType.getFieldTypes()); retrievalThread = new ResultRetrievalThread(); monitoringThread = new JobMonitoringThread(); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java index 84d54ec60c219..dc482d035b257 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.client.gateway.local.CollectBatchTableSink; @@ -53,11 +54,12 @@ public class MaterializedCollectBatchResult extends BasicResult implements private volatile boolean snapshotted = false; - public MaterializedCollectBatchResult(TypeInformation outputType, ExecutionConfig config) { + public MaterializedCollectBatchResult(RowTypeInfo outputType, ExecutionConfig config) { this.outputType = outputType; accumulatorName = new AbstractID().toString(); - tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config)); + tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config)) + .configure(outputType.getFieldNames(), outputType.getFieldTypes()); resultLock = new Object(); retrievalThread = new ResultRetrievalThread(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java index 0beabe94d2cf4..2becfdac1ac31 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java @@ -20,8 +20,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; @@ -90,7 +90,7 @@ public class MaterializedCollectStreamResult extends CollectStreamResult i @VisibleForTesting public MaterializedCollectStreamResult( - TypeInformation outputType, + RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, @@ -117,7 +117,7 @@ public MaterializedCollectStreamResult( } public MaterializedCollectStreamResult( - TypeInformation outputType, + RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java index ba8c9245cc922..c7636cd793b1a 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java @@ -19,9 +19,9 @@ package org.apache.flink.table.client.gateway.local.result; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; @@ -42,7 +42,7 @@ public class MaterializedCollectStreamResultTest { @Test public void testSnapshot() throws UnknownHostException { - final TypeInformation type = Types.ROW(Types.STRING, Types.LONG); + final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG); TestMaterializedCollectStreamResult result = null; try { @@ -90,7 +90,7 @@ public void testSnapshot() throws UnknownHostException { @Test public void testLimitedSnapshot() throws UnknownHostException { - final TypeInformation type = Types.ROW(Types.STRING, Types.LONG); + final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG); TestMaterializedCollectStreamResult result = null; try { @@ -145,7 +145,7 @@ private static class TestMaterializedCollectStreamResult extends Materialized public boolean isRetrieving; public TestMaterializedCollectStreamResult( - TypeInformation outputType, + RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, @@ -162,7 +162,7 @@ public TestMaterializedCollectStreamResult( } public TestMaterializedCollectStreamResult( - TypeInformation outputType, + RowTypeInfo outputType, ExecutionConfig config, InetAddress gatewayAddress, int gatewayPort, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindowedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindowedTable.java new file mode 100644 index 0000000000000..5193b72eafdbb --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindowedTable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.Expression; + +/** + * A table that has been windowed for {@link GroupWindow}s. + */ +@PublicEvolving +public interface GroupWindowedTable { + + /** + * Groups the elements by a mandatory window and one or more optional grouping attributes. + * The window is specified by referring to its alias. + * + *

If no additional grouping attribute is specified and if the input is a streaming table, + * the aggregation will be performed by a single task, i.e., with parallelism 1. + * + *

Aggregations are performed per group and defined by a subsequent {@code select(...)} + * clause similar to SQL SELECT-GROUP-BY query. + * + *

Example: + * + *

+	 * {code
+	 *   tab.window([groupWindow].as("w")).groupBy("w, key").select("key, value.avg")
+	 * }
+	 * 
+ */ + WindowGroupedTable groupBy(String fields); + + /** + * Groups the elements by a mandatory window and one or more optional grouping attributes. + * The window is specified by referring to its alias. + * + *

If no additional grouping attribute is specified and if the input is a streaming table, + * the aggregation will be performed by a single task, i.e., with parallelism 1. + * + *

Aggregations are performed per group and defined by a subsequent {@code select(...)} + * clause similar to SQL SELECT-GROUP-BY query. + * + *

Scala Example: + * + *

+	 * {code
+	 *   tab.window([groupWindow] as 'w)).groupBy('w, 'key).select('key, 'value.avg)
+	 * }
+	 * 
+ */ + WindowGroupedTable groupBy(Expression... fields); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java new file mode 100644 index 0000000000000..c535462ccf596 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.Expression; + +/** + * A table that has been grouped on a set of grouping keys. + */ +@PublicEvolving +public interface GroupedTable { + + /** + * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
+	 * }
+	 * 
+ */ + Table select(String fields); + + /** + * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.groupBy('key).select('key, 'value.avg + " The average" as 'average)
+	 * }
+	 * 
+ */ + Table select(Expression... fields); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowedTable.java new file mode 100644 index 0000000000000..04705970cb13c --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowedTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.Expression; + +/** + * A table that has been windowed for {@link OverWindow}s. + * + *

Unlike group windows, which are specified in the + * {@code GROUP BY} clause, over windows do not collapse rows. Instead over window aggregates + * compute an aggregate for each input row over a range of its neighboring rows. + */ +@PublicEvolving +public interface OverWindowedTable { + + /** + * Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Example: + * + *

+	 * {@code
+	 *   overWindowedTable.select("c, b.count over ow, e.sum over ow")
+	 * }
+	 * 
+ */ + Table select(String fields); + + /** + * Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   overWindowedTable.select('c, 'b.count over 'ow, 'e.sum over 'ow)
+	 * }
+	 * 
+ */ + Table select(Expression... fields); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java new file mode 100644 index 0000000000000..5434bf979f5e6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.TemporalTableFunction; +import org.apache.flink.table.sinks.TableSink; + +/** + * A Table is the core component of the Table API. + * Similar to how the batch and streaming APIs have DataSet and DataStream, + * the Table API is built around {@link Table}. + * + *

Use the methods of {@link Table} to transform data. Use {@code TableEnvironment} to convert a + * {@link Table} back to a {@code DataSet} or {@code DataStream}. + * + *

When using Scala a {@link Table} can also be converted using implicit conversions. + * + *

Java Example: + * + *

+ * {@code
+ *   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *   BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+ *
+ *   DataSet> set = ...
+ *   tEnv.registerTable("MyTable", set, "a, b");
+ *
+ *   Table table = tEnv.scan("MyTable").select(...);
+ *   ...
+ *   Table table2 = ...
+ *   DataSet set2 = tEnv.toDataSet(table2, MyType.class);
+ * }
+ * 
+ * + *

Scala Example: + * + *

+ * {@code
+ *   val env = ExecutionEnvironment.getExecutionEnvironment
+ *   val tEnv = BatchTableEnvironment.create(env)
+ *
+ *   val set: DataSet[(String, Int)] = ...
+ *   val table = set.toTable(tEnv, 'a, 'b)
+ *   ...
+ *   val table2 = ...
+ *   val set2: DataSet[MyType] = table2.toDataSet[MyType]
+ * }
+ * 
+ * + *

Operations such as {@code join}, {@code select}, {@code where} and {@code groupBy} either + * take arguments in a Scala DSL or as an expression String. Please refer to the documentation for + * the expression syntax. + */ +@PublicEvolving +public interface Table { + + /** + * Returns the schema of this table. + */ + TableSchema getSchema(); + + /** + * Prints the schema of this table to the console in a tree format. + */ + void printSchema(); + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.select("key, value.avg + ' The average' as average")
+	 * }
+	 * 
+ */ + Table select(String fields); + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.select('key, 'value.avg + " The average" as 'average)
+	 * }
+	 * 
+ */ + Table select(Expression... fields); + + /** + * Creates {@link TemporalTableFunction} backed up by this table as a history table. + * Temporal Tables represent a concept of a table that changes over time and for which + * Flink keeps track of those changes. {@link TemporalTableFunction} provides a way how to + * access those data. + * + *

For more information please check Flink's documentation on Temporal Tables. + * + *

Currently {@link TemporalTableFunction}s are only supported in streaming. + * + * @param timeAttribute Must points to a time attribute. Provides a way to compare which + * records are a newer or older version. + * @param primaryKey Defines the primary key. With primary key it is possible to update + * a row or to delete it. + * @return {@link TemporalTableFunction} which is an instance of {@link TableFunction}. + * It takes one single argument, the {@code timeAttribute}, for which it returns + * matching version of the {@link Table}, from which {@link TemporalTableFunction} + * was created. + */ + TemporalTableFunction createTemporalTableFunction(String timeAttribute, String primaryKey); + + /** + * Creates {@link TemporalTableFunction} backed up by this table as a history table. + * Temporal Tables represent a concept of a table that changes over time and for which + * Flink keeps track of those changes. {@link TemporalTableFunction} provides a way how to + * access those data. + * + *

For more information please check Flink's documentation on Temporal Tables. + * + *

Currently {@link TemporalTableFunction}s are only supported in streaming. + * + * @param timeAttribute Must points to a time indicator. Provides a way to compare which + * records are a newer or older version. + * @param primaryKey Defines the primary key. With primary key it is possible to update + * a row or to delete it. + * @return {@link TemporalTableFunction} which is an instance of {@link TableFunction}. + * It takes one single argument, the {@code timeAttribute}, for which it returns + * matching version of the {@link Table}, from which {@link TemporalTableFunction} + * was created. + */ + TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey); + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.as("a, b")
+	 * }
+	 * 
+ */ + Table as(String fields); + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.as('a, 'b)
+	 * }
+	 * 
+ */ + Table as(Expression... fields); + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.filter("name = 'Fred'")
+	 * }
+	 * 
+ */ + Table filter(String predicate); + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.filter('name === "Fred")
+	 * }
+	 * 
+ */ + Table filter(Expression predicate); + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.where("name = 'Fred'")
+	 * }
+	 * 
+ */ + Table where(String predicate); + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.where('name === "Fred")
+	 * }
+	 * 
+ */ + Table where(Expression predicate); + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.groupBy("key").select("key, value.avg")
+	 * }
+	 * 
+ */ + GroupedTable groupBy(String fields); + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.groupBy('key).select('key, 'value.avg)
+	 * }
+	 * 
+ */ + GroupedTable groupBy(Expression... fields); + + /** + * Removes duplicate values and returns only distinct (different) values. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.select("key, value").distinct()
+	 * }
+	 * 
+ */ + Table distinct(); + + /** + * Joins two {@link Table}s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. You can use + * where and select clauses after a join to further specify the behaviour of the join. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} . + * + *

Example: + * + *

+	 * {@code
+	 *   left.join(right).where("a = b && c > 3").select("a, b, d")
+	 * }
+	 * 
+ */ + Table join(Table right); + + /** + * Joins two {@link Table}s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} . + * + *

Example: + * + *

+	 * {@code
+	 *   left.join(right, "a = b")
+	 * }
+	 * 
+ */ + Table join(Table right, String joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} . + * + *

Scala Example: + * + *

+	 * {@code
+	 *   left.join(right, 'a === 'b).select('a, 'b, 'd)
+	 * }
+	 * 
+ */ + Table join(Table right, Expression joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL left outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Example: + * + *

+	 * {@code
+	 *   left.leftOuterJoin(right).select("a, b, d")
+	 * }
+	 * 
+ */ + Table leftOuterJoin(Table right); + + /** + * Joins two {@link Table}s. Similar to an SQL left outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Example: + * + *

+	 * {@code
+	 *   left.leftOuterJoin(right, "a = b").select("a, b, d")
+	 * }
+	 * 
+ */ + Table leftOuterJoin(Table right, String joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL left outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Scala Example: + * + *

+	 * {@code
+	 *   left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+	 * }
+	 * 
+ */ + Table leftOuterJoin(Table right, Expression joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL right outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Example: + * + *

+	 * {@code
+	 *   left.rightOuterJoin(right, "a = b").select("a, b, d")
+	 * }
+	 * 
+ */ + Table rightOuterJoin(Table right, String joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL right outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Scala Example: + * + *

+	 * {@code
+	 *   left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+	 * }
+	 * 
+ */ + Table rightOuterJoin(Table right, Expression joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL full outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Example: + * + *

+	 * {@code
+	 *   left.fullOuterJoin(right, "a = b").select("a, b, d")
+	 * }
+	 * 
+ */ + Table fullOuterJoin(Table right, String joinPredicate); + + /** + * Joins two {@link Table}s. Similar to an SQL full outer join. The fields of the two joined + * operations must not overlap, use {@code as} to rename fields if necessary. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment} and its + * {@code TableConfig} must have null check enabled (default). + * + *

Scala Example: + * + *

+	 * {@code
+	 *   left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
+	 * }
+	 * 
+ */ + Table fullOuterJoin(Table right, Expression joinPredicate); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the + * table is joined with all rows produced by the table function. + * + *

Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction {
+	 *     public void eval(String str) {
+	 *       str.split("#").forEach(this::collect);
+	 *     }
+	 *   }
+	 *
+	 *   TableFunction split = new MySplitUDTF();
+	 *   tableEnv.registerFunction("split", split);
+	 *   table.joinLateral("split(c) as (s)").select("a, b, c, s");
+	 * }
+	 * 
+ */ + Table joinLateral(String tableFunctionCall); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the + * table is joined with all rows produced by the table function. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction[String] {
+	 *     def eval(str: String): Unit = {
+	 *       str.split("#").foreach(collect)
+	 *     }
+	 *   }
+	 *
+	 *   val split = new MySplitUDTF()
+	 *   table.joinLateral(split('c) as ('s)).select('a, 'b, 'c, 's)
+	 * }
+	 * 
+ */ + Table joinLateral(Expression tableFunctionCall); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the + * table is joined with all rows produced by the table function. + * + *

Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction {
+	 *     public void eval(String str) {
+	 *       str.split("#").forEach(this::collect);
+	 *     }
+	 *   }
+	 *
+	 *   TableFunction split = new MySplitUDTF();
+	 *   tableEnv.registerFunction("split", split);
+	 *   table.joinLateral("split(c) as (s)", "a = s").select("a, b, c, s");
+	 * }
+	 * 
+ */ + Table joinLateral(String tableFunctionCall, String joinPredicate); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL inner join with ON TRUE predicate but works with a table function. Each row of the + * table is joined with all rows produced by the table function. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction[String] {
+	 *     def eval(str: String): Unit = {
+	 *       str.split("#").foreach(collect)
+	 *     }
+	 *   }
+	 *
+	 *   val split = new MySplitUDTF()
+	 *   table.joinLateral(split('c) as ('s), 'a === 's).select('a, 'b, 'c, 's)
+	 * }
+	 * 
+ */ + Table joinLateral(Expression tableFunctionCall, Expression joinPredicate); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL left outer join with ON TRUE predicate but works with a table function. Each row of + * the table is joined with all rows produced by the table function. If the table function does + * not produce any row, the outer row is padded with nulls. + * + *

Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction {
+	 *     public void eval(String str) {
+	 *       str.split("#").forEach(this::collect);
+	 *     }
+	 *   }
+	 *
+	 *   TableFunction split = new MySplitUDTF();
+	 *   tableEnv.registerFunction("split", split);
+	 *   table.leftOuterJoinLateral("split(c) as (s)").select("a, b, c, s");
+	 * }
+	 * 
+ */ + Table leftOuterJoinLateral(String tableFunctionCall); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL left outer join with ON TRUE predicate but works with a table function. Each row of + * the table is joined with all rows produced by the table function. If the table function does + * not produce any row, the outer row is padded with nulls. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction[String] {
+	 *     def eval(str: String): Unit = {
+	 *       str.split("#").foreach(collect)
+	 *     }
+	 *   }
+	 *
+	 *   val split = new MySplitUDTF()
+	 *   table.leftOuterJoinLateral(split('c) as ('s)).select('a, 'b, 'c, 's)
+	 * }
+	 * 
+ */ + Table leftOuterJoinLateral(Expression tableFunctionCall); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL left outer join with ON TRUE predicate but works with a table function. Each row of + * the table is joined with all rows produced by the table function. If the table function does + * not produce any row, the outer row is padded with nulls. + * + *

Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction {
+	 *     public void eval(String str) {
+	 *       str.split("#").forEach(this::collect);
+	 *     }
+	 *   }
+	 *
+	 *   TableFunction split = new MySplitUDTF();
+	 *   tableEnv.registerFunction("split", split);
+	 *   table.leftOuterJoinLateral("split(c) as (s)", "a = s").select("a, b, c, s");
+	 * }
+	 * 
+ */ + Table leftOuterJoinLateral(String tableFunctionCall, String joinPredicate); + + /** + * Joins this {@link Table} with an user-defined {@link TableFunction}. This join is similar to + * a SQL left outer join with ON TRUE predicate but works with a table function. Each row of + * the table is joined with all rows produced by the table function. If the table function does + * not produce any row, the outer row is padded with nulls. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   class MySplitUDTF extends TableFunction[String] {
+	 *     def eval(str: String): Unit = {
+	 *       str.split("#").foreach(collect)
+	 *     }
+	 *   }
+	 *
+	 *   val split = new MySplitUDTF()
+	 *   table.leftOuterJoinLateral(split('c) as ('s), 'a === 's).select('a, 'b, 'c, 's)
+	 * }
+	 * 
+ */ + Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate); + + /** + * Minus of two {@link Table}s with duplicate records removed. + * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not + * exist in the right table. Duplicate records in the left table are returned + * exactly once, i.e., duplicates are removed. Both tables must have identical field types. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.minus(right)
+	 * }
+	 * 
+ */ + Table minus(Table right); + + /** + * Minus of two {@link Table}s. Similar to an SQL EXCEPT ALL. + * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in + * the right table. A record that is present n times in the left table and m times + * in the right table is returned (n - m) times, i.e., as many duplicates as are present + * in the right table are removed. Both tables must have identical field types. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.minusAll(right)
+	 * }
+	 * 
+ */ + Table minusAll(Table right); + + /** + * Unions two {@link Table}s with duplicate records removed. + * Similar to an SQL UNION. The fields of the two union operations must fully overlap. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.union(right)
+	 * }
+	 * 
+ */ + Table union(Table right); + + /** + * Unions two {@link Table}s. Similar to an SQL UNION ALL. The fields of the two union + * operations must fully overlap. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.unionAll(right)
+	 * }
+	 * 
+ */ + Table unionAll(Table right); + + /** + * Intersects two {@link Table}s with duplicate records removed. Intersect returns records that + * exist in both tables. If a record is present in one or both tables more than once, it is + * returned just once, i.e., the resulting table has no duplicate records. Similar to an + * SQL INTERSECT. The fields of the two intersect operations must fully overlap. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.intersect(right)
+	 * }
+	 * 
+ */ + Table intersect(Table right); + + /** + * Intersects two {@link Table}s. IntersectAll returns records that exist in both tables. + * If a record is present in both tables more than once, it is returned as many times as it + * is present in both tables, i.e., the resulting table might have duplicate records. Similar + * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap. + * + *

Note: Both tables must be bound to the same {@code TableEnvironment}. + * + *

Example: + * + *

+	 * {@code
+	 *   left.intersectAll(right)
+	 * }
+	 * 
+ */ + Table intersectAll(Table right); + + /** + * Sorts the given {@link Table}. Similar to SQL ORDER BY. + * The resulting Table is sorted globally sorted across all parallel partitions. + * + *

Example: + * + *

+	 * {@code
+	 *   tab.orderBy("name.desc")
+	 * }
+	 * 
+ */ + Table orderBy(String fields); + + /** + * Sorts the given {@link Table}. Similar to SQL ORDER BY. + * The resulting Table is globally sorted across all parallel partitions. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   tab.orderBy('name.desc)
+	 * }
+	 * 
+ */ + Table orderBy(Expression... fields); + + /** + * Limits a sorted result from an offset position. + * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and + * thus must be preceded by it. + * + * {@link Table#offset(int offset)} can be combined with a subsequent + * {@link Table#fetch(int fetch)} call to return n rows after skipping the first o rows. + * + *
+	 * {@code
+	 *   // skips the first 3 rows and returns all following rows.
+	 *   tab.orderBy("name.desc").offset(3)
+	 *   // skips the first 10 rows and returns the next 5 rows.
+	 *   tab.orderBy("name.desc").offset(10).fetch(5)
+	 * }
+	 * 
+ * + * @param offset number of records to skip + */ + Table offset(int offset); + + /** + * Limits a sorted result to the first n rows. + * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and + * thus must be preceded by it. + * + * {@link Table#fetch(int fetch)} can be combined with a preceding + * {@link Table#offset(int offset)} call to return n rows after skipping the first o rows. + * + *
+	 * {@code
+	 *   // returns the first 3 records.
+	 *   tab.orderBy("name.desc").fetch(3)
+	 *   // skips the first 10 rows and returns the next 5 rows.
+	 *   tab.orderBy("name.desc").offset(10).fetch(5)
+	 * }
+	 * 
+ * + * @param fetch the number of records to return. Fetch must be >= 0. + */ + Table fetch(int fetch); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name. + * + *

A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * @param tableName Name of the registered {@link TableSink} to which the {@link Table} is + * written. + */ + void insertInto(String tableName); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name. + * + *

A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * @param tableName Name of the {@link TableSink} to which the {@link Table} is written. + * @param conf The {@link QueryConfig} to use. + */ + void insertInto(String tableName, QueryConfig conf); + + /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + *

For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which group-based aggregates can be computed. + * + *

For batch tables of finite size, windowing essentially provides shortcuts for time-based + * groupBy. + * + *

Note: Computing windowed aggregates on a streaming table is only a parallel operation + * if additional grouping attributes are added to the {@code groupBy(...)} clause. + * If the {@code groupBy(...)} only references a GroupWindow alias, the streamed table will be + * processed by a single task, i.e., with parallelism 1. + * + * @param groupWindow groupWindow that specifies how elements are grouped. + * @return A windowed table. + */ + GroupWindowedTable window(GroupWindow groupWindow); + + /** + * Defines over-windows on the records of a table. + * + *

An over-window defines for each record an interval of records over which aggregation + * functions can be computed. + * + *

Example: + * + *

+	 * {@code
+	 *   table
+	 *     .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow)
+	 *     .select('c, 'b.count over 'ow, 'e.sum over 'ow)
+	 * }
+	 * 
+ * + *

Note: Computing over window aggregates on a streaming table is only a parallel + * operation if the window is partitioned. Otherwise, the whole stream will be processed by a + * single task, i.e., with parallelism 1. + * + *

Note: Over-windows for batch tables are currently not supported. + * + * @param overWindows windows that specify the record interval over which aggregations are + * computed. + * @return An OverWindowedTable to specify the aggregations. + */ + OverWindowedTable window(OverWindow... overWindows); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java new file mode 100644 index 0000000000000..94893fc9c57d4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.Expression; + +/** + * A table that has been windowed and grouped for {@link GroupWindow}s. + */ +@PublicEvolving +public interface WindowGroupedTable { + + /** + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Example: + * + *

+	 * {@code
+	 *   windowGroupedTable.select("key, window.start, value.avg as valavg")
+	 * }
+	 * 
+ */ + Table select(String fields); + + /** + * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + *

Scala Example: + * + *

+	 * {@code
+	 *   windowGroupedTable.select('key, 'window.start, 'value.avg as 'valavg)
+	 * }
+	 * 
+ */ + Table select(Expression... fields); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TemporalTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TemporalTableFunction.java new file mode 100644 index 0000000000000..6de9c2398e6cb --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TemporalTableFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.types.Row; + +/** + * Class representing temporal table function over some history table. A + * {@link TemporalTableFunction} is also an instance of {@link TableFunction}. + * + *

Currently {@link TemporalTableFunction}s are only supported in streaming. + */ +@PublicEvolving +public abstract class TemporalTableFunction extends TableFunction { + +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 312d4d8da0d9f..fe0568b2712ea 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -355,7 +355,7 @@ abstract class BatchTableEnvironment( * @param extended Flag to include detailed optimizer estimates. */ private[flink] def explain(table: Table, extended: Boolean): String = { - val ast = table.getRelNode + val ast = table.asInstanceOf[TableImpl].getRelNode val optimizedPlan = optimize(ast) val dataSet = translate[Row](optimizedPlan, ast.getRowType, queryConfig) ( new GenericTypeInfo (classOf[Row])) @@ -471,7 +471,7 @@ abstract class BatchTableEnvironment( protected def translate[A]( table: Table, queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val relNode = table.getRelNode + val relNode = table.asInstanceOf[TableImpl].getRelNode val dataSetPlan = optimize(relNode) translate(dataSetPlan, relNode.getRowType, queryConfig) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 03774a0634375..bbe9def3ebe9c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -319,10 +319,11 @@ abstract class StreamTableEnvironment( * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ override private[flink] def writeToSink[T]( - table: Table, + inputTable: Table, sink: TableSink[T], queryConfig: QueryConfig): Unit = { + val table = inputTable.asInstanceOf[TableImpl] // Check query configuration val streamQueryConfig = queryConfig match { case streamConfig: StreamQueryConfig => streamConfig @@ -859,7 +860,7 @@ abstract class StreamTableEnvironment( queryConfig: StreamQueryConfig, updatesAsRetraction: Boolean, withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val relNode = table.getRelNode + val relNode = table.asInstanceOf[TableImpl].getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) val rowType = getResultType(relNode, dataStreamPlan) @@ -1000,7 +1001,7 @@ abstract class StreamTableEnvironment( * @param table The table for which the AST and execution plan will be returned. */ def explain(table: Table): String = { - val ast = table.getRelNode + val ast = table.asInstanceOf[TableImpl].getRelNode val optimizedPlan = optimize(ast, updatesAsRetraction = false) val dataStream = translateToCRow(optimizedPlan, queryConfig) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4915d7d83b274..db66bfc7bc814 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -512,13 +512,13 @@ abstract class TableEnvironment(val config: TableConfig) { def registerTable(name: String, table: Table): Unit = { // check that table belongs to this table environment - if (table.tableEnv != this) { + if (table.asInstanceOf[TableImpl].tableEnv != this) { throw new TableException( "Only tables that belong to this TableEnvironment can be registered.") } checkValidTableName(name) - val tableTable = new RelTable(table.getRelNode) + val tableTable = new RelTable(table.asInstanceOf[TableImpl].getRelNode) registerTableInternal(name, tableTable) } @@ -656,7 +656,7 @@ abstract class TableEnvironment(val config: TableConfig) { val tableName = tablePath(tablePath.length - 1) val table = schema.getTable(tableName) if (table != null) { - return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))) + return Some(new TableImpl(this, CatalogNode(tablePath, table.getRowType(typeFactory)))) } } None @@ -739,7 +739,7 @@ abstract class TableEnvironment(val config: TableConfig) { val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) + new TableImpl(this, LogicalRelNode(relational.rel)) } else { throw new TableException( "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " + @@ -801,7 +801,7 @@ abstract class TableEnvironment(val config: TableConfig) { val validatedQuery = planner.validate(query) // get query result as Table - val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) + val queryResult = new TableImpl(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 9812a1a4e2e56..3f44019bfd44a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException} +import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException, TableImpl} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv} import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv} @@ -30,7 +30,7 @@ import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa * * @param table The table to convert. */ -class TableConversions(table: Table) { +class TableConversions(table: TableImpl) { /** * Converts the given [[Table]] into a [[DataSet]] of a specified type. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala index 67eef8cf45558..6737e084b9662 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala @@ -67,7 +67,7 @@ import _root_.scala.language.implicitConversions package object scala extends ImplicitExpressionConversions { implicit def table2TableConversions(table: Table): TableConversions = { - new TableConversions(table) + new TableConversions(table.asInstanceOf[TableImpl]) } implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { @@ -75,7 +75,7 @@ package object scala extends ImplicitExpressionConversions { } implicit def table2RowDataSet(table: Table): DataSet[Row] = { - val tableEnv = table.tableEnv.asInstanceOf[ScalaBatchTableEnv] + val tableEnv = table.asInstanceOf[TableImpl].tableEnv.asInstanceOf[ScalaBatchTableEnv] tableEnv.toDataSet[Row](table) } @@ -84,7 +84,7 @@ package object scala extends ImplicitExpressionConversions { } implicit def table2RowDataStream(table: Table): DataStream[Row] = { - val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv] + val tableEnv = table.asInstanceOf[TableImpl].tableEnv.asInstanceOf[ScalaStreamTableEnv] tableEnv.toAppendStream[Row](table) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala deleted file mode 100644 index e66d14b6736d5..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ /dev/null @@ -1,1426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api - -import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionBridge, ExpressionParser, Ordering, PlannerExpression, ResolvedFieldReference, UnresolvedAlias, WindowProperty} -import org.apache.flink.table.functions.TemporalTableFunction -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils -import org.apache.flink.table.plan.ProjectionTranslator._ -import org.apache.flink.table.plan.logical.{Minus, _} -import org.apache.flink.table.sinks.TableSink -import org.apache.flink.table.util.JavaScalaConversionUtil - -import _root_.scala.annotation.varargs -import _root_.scala.collection.JavaConverters._ -import _root_.scala.collection.JavaConversions._ - -/** - * A Table is the core component of the Table API. - * Similar to how the batch and streaming APIs have DataSet and DataStream, - * the Table API is built around [[Table]]. - * - * Use the methods of [[Table]] to transform data. Use [[TableEnvironment]] to convert a [[Table]] - * back to a DataSet or DataStream. - * - * When using Scala a [[Table]] can also be converted using implicit conversions. - * - * Example: - * - * {{{ - * val env = ExecutionEnvironment.getExecutionEnvironment - * val tEnv = TableEnvironment.getTableEnvironment(env) - * - * val set: DataSet[(String, Int)] = ... - * val table = set.toTable(tEnv, 'a, 'b) - * ... - * val table2 = ... - * val set2: DataSet[MyType] = table2.toDataSet[MyType] - * }}} - * - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression - * syntax. - * - * @param tableEnv The [[TableEnvironment]] to which the table is bound. - * @param logicalPlan logical representation - */ -class Table( - private[flink] val tableEnv: TableEnvironment, - private[flink] val logicalPlan: LogicalNode) { - - private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] = - tableEnv.expressionBridge - - private lazy val tableSchema: TableSchema = new TableSchema( - logicalPlan.output.map(_.name).toArray, - logicalPlan.output.map(_.resultType).toArray) - - var tableName: String = _ - - def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder - - def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) - - /** - * Returns the schema of this table. - */ - def getSchema: TableSchema = tableSchema - - /** - * Prints the schema of this table to the console in a tree format. - */ - def printSchema(): Unit = print(tableSchema.toString) - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * tab.select("key, value.avg + ' The average' as average") - * }}} - */ - def select(fields: String): Table = { - select(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * tab.select('key, 'value.avg + " The average" as 'average) - * }}} - */ - def select(fields: Expression*): Table = { - selectInternal(fields.map(expressionBridge.bridge)) - } - - private def selectInternal(fields: Seq[PlannerExpression]): Table = { - val expandedFields = expandProjectList(fields, logicalPlan, tableEnv) - val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv) - if (propNames.nonEmpty) { - throw new ValidationException("Window properties can only be used on windowed tables.") - } - - if (aggNames.nonEmpty) { - val projectsOnAgg = replaceAggregationsAndProperties( - expandedFields, tableEnv, aggNames, propNames) - val projectFields = extractFieldReferences(expandedFields) - - new Table(tableEnv, - Project(projectsOnAgg, - Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq, - Project(projectFields, logicalPlan).validate(tableEnv) - ).validate(tableEnv) - ).validate(tableEnv) - ) - } else { - new Table(tableEnv, - Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv)) - } - } - - /** - * Creates [[TemporalTableFunction]] backed up by this table as a history table. - * Temporal Tables represent a concept of a table that changes over time and for which - * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access - * those data. - * - * For more information please check Flink's documentation on Temporal Tables. - * - * Currently [[TemporalTableFunction]]s are only supported in streaming. - * - * @param timeAttribute Must points to a time attribute. Provides a way to compare which records - * are a newer or older version. - * @param primaryKey Defines the primary key. With primary key it is possible to update - * a row or to delete it. - * @return [[TemporalTableFunction]] which is an instance of - * [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument, - * the `timeAttribute`, for which it returns matching version of the [[Table]], from which - * [[TemporalTableFunction]] was created. - */ - def createTemporalTableFunction( - timeAttribute: String, - primaryKey: String) - : TemporalTableFunction = { - createTemporalTableFunction( - ExpressionParser.parseExpression(timeAttribute), - ExpressionParser.parseExpression(primaryKey)) - } - - /** - * Creates [[TemporalTableFunction]] backed up by this table as a history table. - * Temporal Tables represent a concept of a table that changes over time and for which - * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access - * those data. - * - * For more information please check Flink's documentation on Temporal Tables. - * - * Currently [[TemporalTableFunction]]s are only supported in streaming. - * - * @param timeAttribute Must points to a time indicator. Provides a way to compare which records - * are a newer or older version. - * @param primaryKey Defines the primary key. With primary key it is possible to update - * a row or to delete it. - * @return [[TemporalTableFunction]] which is an instance of - * [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument, - * the `timeAttribute`, for which it returns matching version of the [[Table]], from which - * [[TemporalTableFunction]] was created. - */ - def createTemporalTableFunction( - timeAttribute: Expression, - primaryKey: Expression) - : TemporalTableFunction = { - createTemporalTableFunctionInternal( - expressionBridge.bridge(timeAttribute), - expressionBridge.bridge(primaryKey)) - } - - private def createTemporalTableFunctionInternal( - timeAttribute: PlannerExpression, - primaryKey: PlannerExpression) - : TemporalTableFunction = { - val temporalTable = TemporalTable(timeAttribute, primaryKey, logicalPlan) - .validate(tableEnv) - .asInstanceOf[TemporalTable] - - TemporalTableFunction.create( - this, - temporalTable.timeAttribute, - validatePrimaryKeyExpression(temporalTable.primaryKey)) - } - - private def validatePrimaryKeyExpression(expression: Expression): String = { - expression match { - case fieldReference: ResolvedFieldReference => - fieldReference.name - case _ => throw new ValidationException( - s"Unsupported expression [$expression] as primary key. " + - s"Only top-level (not nested) field references are supported.") - } - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * tab.as("a, b") - * }}} - */ - def as(fields: String): Table = { - as(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * tab.as('a, 'b) - * }}} - */ - def as(fields: Expression*): Table = { - asInternal(fields.map(tableEnv.expressionBridge.bridge)) - } - - private def asInternal(fields: Seq[PlannerExpression]): Table = { - new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * tab.filter("name = 'Fred'") - * }}} - */ - def filter(predicate: String): Table = { - filter(ExpressionParser.parseExpression(predicate)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * tab.filter('name === "Fred") - * }}} - */ - def filter(predicate: Expression): Table = { - filterInternal(expressionBridge.bridge(predicate)) - } - - private def filterInternal(predicate: PlannerExpression): Table = { - new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * tab.where("name = 'Fred'") - * }}} - */ - def where(predicate: String): Table = { - filter(predicate) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * tab.where('name === "Fred") - * }}} - */ - def where(predicate: Expression): Table = { - filter(predicate) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * tab.groupBy("key").select("key, value.avg") - * }}} - */ - def groupBy(fields: String): GroupedTable = { - groupBy(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * tab.groupBy('key).select('key, 'value.avg) - * }}} - */ - def groupBy(fields: Expression*): GroupedTable = { - groupByInternal(fields.map(expressionBridge.bridge)) - } - - private def groupByInternal(fields: Seq[PlannerExpression]): GroupedTable = { - new GroupedTable(this, fields) - } - - /** - * Removes duplicate values and returns only distinct (different) values. - * - * Example: - * - * {{{ - * tab.select("key, value").distinct() - * }}} - */ - def distinct(): Table = { - new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv)) - } - - /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. You can use - * where and select clauses after a join to further specify the behaviour of the join. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) - * }}} - */ - def join(right: Table): Table = { - joinInternal(right, None, JoinType.INNER) - } - - /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.join(right, "a = b") - * }}} - */ - def join(right: Table, joinPredicate: String): Table = { - join(right, ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.join(right, 'a === 'b).select('a, 'b, 'd) - * }}} - */ - def join(right: Table, joinPredicate: Expression): Table = { - joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.INNER) - } - - /** - * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.leftOuterJoin(right).select('a, 'b, 'd) - * }}} - */ - def leftOuterJoin(right: Table): Table = { - joinInternal(right, None, JoinType.LEFT_OUTER) - } - - /** - * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.leftOuterJoin(right, "a = b").select("a, b, d") - * }}} - */ - def leftOuterJoin(right: Table, joinPredicate: String): Table = { - leftOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd) - * }}} - */ - def leftOuterJoin(right: Table, joinPredicate: Expression): Table = { - joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.LEFT_OUTER) - } - - /** - * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.rightOuterJoin(right, "a = b").select('a, 'b, 'd) - * }}} - */ - def rightOuterJoin(right: Table, joinPredicate: String): Table = { - rightOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd) - * }}} - */ - def rightOuterJoin(right: Table, joinPredicate: Expression): Table = { - joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.RIGHT_OUTER) - } - - /** - * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.fullOuterJoin(right, "a = b").select('a, 'b, 'd) - * }}} - */ - def fullOuterJoin(right: Table, joinPredicate: String): Table = { - fullOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. - * - * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must - * have null check enabled (default). - * - * Example: - * - * {{{ - * left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd) - * }}} - */ - def fullOuterJoin(right: Table, joinPredicate: Expression): Table = { - joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.FULL_OUTER) - } - - private def joinInternal( - right: Table, - joinPredicate: Option[PlannerExpression], - joinType: JoinType) - : Table = { - // check that the TableEnvironment of right table is not null - // and right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be joined.") - } - - new Table( - tableEnv, - Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false) - .validate(tableEnv)) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL inner join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table function. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction { - * public void eval(String str) { - * str.split("#").forEach(this::collect); - * } - * } - * - * TableFunction split = new MySplitUDTF(); - * tableEnv.registerFunction("split", split); - * table.joinLateral("split(c) as (s)").select("a, b, c, s"); - * }}} - */ - def joinLateral(tableFunctionCall: String): Table = { - joinLateral(ExpressionParser.parseExpression(tableFunctionCall)) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL inner join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table function. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction[String] { - * def eval(str: String): Unit = { - * str.split("#").foreach(collect) - * } - * } - * - * val split = new MySplitUDTF() - * table.joinLateral(split('c) as ('s)).select('a, 'b, 'c, 's) - * }}} - */ - def joinLateral(tableFunctionCall: Expression): Table = { - joinLateralInternal(expressionBridge.bridge(tableFunctionCall), None, JoinType.INNER) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL inner join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table function. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction { - * public void eval(String str) { - * str.split("#").forEach(this::collect); - * } - * } - * - * TableFunction split = new MySplitUDTF(); - * tableEnv.registerFunction("split", split); - * table.joinLateral("split(c) as (s)", "a = s").select("a, b, c, s"); - * }}} - */ - def joinLateral(tableFunctionCall: String, joinPredicate: String): Table = { - joinLateral( - ExpressionParser.parseExpression(tableFunctionCall), - ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL inner join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table function. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction[String] { - * def eval(str: String): Unit = { - * str.split("#").foreach(collect) - * } - * } - * - * val split = new MySplitUDTF() - * table.joinLateral(split('c) as ('s), 'a === 's).select('a, 'b, 'c, 's) - * }}} - */ - def joinLateral(tableFunctionCall: Expression, joinPredicate: Expression): Table = { - joinLateralInternal( - expressionBridge.bridge(tableFunctionCall), - Some(expressionBridge.bridge(joinPredicate)), - JoinType.INNER) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL left outer join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table - * function. If the table function does not produce any row, the outer row is padded with nulls. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction { - * public void eval(String str) { - * str.split("#").forEach(this::collect); - * } - * } - * - * TableFunction split = new MySplitUDTF(); - * tableEnv.registerFunction("split", split); - * table.leftOuterJoinLateral("split(c) as (s)").select("a, b, c, s"); - * }}} - */ - def leftOuterJoinLateral(tableFunctionCall: String): Table = { - leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall)) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL left outer join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table - * function. If the table function does not produce any row, the outer row is padded with nulls. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction[String] { - * def eval(str: String): Unit = { - * str.split("#").foreach(collect) - * } - * } - * - * val split = new MySplitUDTF() - * table.leftOuterJoinLateral(split('c) as ('s)).select('a, 'b, 'c, 's) - * }}} - */ - def leftOuterJoinLateral(tableFunctionCall: Expression): Table = { - joinLateralInternal(expressionBridge.bridge(tableFunctionCall), None, JoinType.LEFT_OUTER) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL left outer join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table - * function. If the table function does not produce any row, the outer row is padded with nulls. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction { - * public void eval(String str) { - * str.split("#").forEach(this::collect); - * } - * } - * - * TableFunction split = new MySplitUDTF(); - * tableEnv.registerFunction("split", split); - * table.leftOuterJoinLateral("split(c) as (s)", "a = s").select("a, b, c, s"); - * }}} - */ - def leftOuterJoinLateral(tableFunctionCall: String, joinPredicate: String): Table = { - leftOuterJoinLateral( - ExpressionParser.parseExpression(tableFunctionCall), - ExpressionParser.parseExpression(joinPredicate)) - } - - /** - * Joins this [[Table]] with an user-defined [[org.apache.flink.table.functions.TableFunction]]. - * This join is similar to a SQL left outer join with ON TRUE predicate but works with a - * table function. Each row of the table is joined with all rows produced by the table - * function. If the table function does not produce any row, the outer row is padded with nulls. - * - * Example: - * {{{ - * class MySplitUDTF extends TableFunction[String] { - * def eval(str: String): Unit = { - * str.split("#").foreach(collect) - * } - * } - * - * val split = new MySplitUDTF() - * table.leftOuterJoinLateral(split('c) as ('s), 'a === 's).select('a, 'b, 'c, 's) - * }}} - */ - def leftOuterJoinLateral(tableFunctionCall: Expression, joinPredicate: Expression): Table = { - joinLateralInternal( - expressionBridge.bridge(tableFunctionCall), - Some(expressionBridge.bridge(joinPredicate)), - JoinType.LEFT_OUTER) - } - - private def joinLateralInternal( - callExpr: PlannerExpression, - joinPredicate: Option[PlannerExpression], - joinType: JoinType): Table = { - - // check join type - if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) { - throw new ValidationException( - "Table functions are currently only supported for inner and left outer lateral joins.") - } - - val logicalCall = UserDefinedFunctionUtils.createLogicalFunctionCall( - callExpr, - logicalPlan) - val validatedLogicalCall = logicalCall.validate(tableEnv) - - new Table( - tableEnv, - Join( - logicalPlan, - validatedLogicalCall, - joinType, - joinPredicate, - correlated = true - ).validate(tableEnv)) - } - - /** - * Minus of two [[Table]]s with duplicate records removed. - * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not - * exist in the right table. Duplicate records in the left table are returned - * exactly once, i.e., duplicates are removed. Both tables must have identical field types. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.minus(right) - * }}} - */ - def minus(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be " + - "subtracted.") - } - new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false) - .validate(tableEnv)) - } - - /** - * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL. - * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in - * the right table. A record that is present n times in the left table and m times - * in the right table is returned (n - m) times, i.e., as many duplicates as are present - * in the right table are removed. Both tables must have identical field types. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.minusAll(right) - * }}} - */ - def minusAll(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be " + - "subtracted.") - } - new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true) - .validate(tableEnv)) - } - - /** - * Unions two [[Table]]s with duplicate records removed. - * Similar to an SQL UNION. The fields of the two union operations must fully overlap. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.union(right) - * }}} - */ - def union(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") - } - new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) - } - - /** - * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations - * must fully overlap. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.unionAll(right) - * }}} - */ - def unionAll(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") - } - new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) - } - - /** - * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that - * exist in both tables. If a record is present in one or both tables more than once, it is - * returned just once, i.e., the resulting table has no duplicate records. Similar to an - * SQL INTERSECT. The fields of the two intersect operations must fully overlap. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.intersect(right) - * }}} - */ - def intersect(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException( - "Only tables from the same TableEnvironment can be intersected.") - } - new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) - } - - /** - * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables. - * If a record is present in both tables more than once, it is returned as many times as it - * is present in both tables, i.e., the resulting table might have duplicate records. Similar - * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap. - * - * Note: Both tables must be bound to the same [[TableEnvironment]]. - * - * Example: - * - * {{{ - * left.intersectAll(right) - * }}} - */ - def intersectAll(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new ValidationException( - "Only tables from the same TableEnvironment can be intersected.") - } - new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) - } - - /** - * Sorts the given [[Table]]. Similar to SQL ORDER BY. - * The resulting Table is globally sorted across all parallel partitions. - * - * Example: - * - * {{{ - * tab.orderBy("name.desc") - * }}} - */ - def orderBy(fields: String): Table = { - orderBy(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Sorts the given [[Table]]. Similar to SQL ORDER BY. - * The resulting Table is globally sorted across all parallel partitions. - * - * Example: - * - * {{{ - * tab.orderBy('name.desc) - * }}} - */ - def orderBy(fields: Expression*): Table = { - orderByInternal(fields.map(expressionBridge.bridge)) - } - - private def orderByInternal(fields: Seq[PlannerExpression]): Table = { - val order: Seq[Ordering] = fields.map { - case o: Ordering => o - case e => Asc(e) - } - new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv)) - } - - /** - * Limits a sorted result from an offset position. - * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and - * thus must be preceded by it. - * - * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return n rows - * after skipping the first o rows. - * - * {{{ - * // skips the first 3 rows and returns all following rows. - * tab.orderBy('name.desc).offset(3) - * // skips the first 10 rows and returns the next 5 rows. - * tab.orderBy('name.desc).offset(10).fetch(5) - * }}} - * - * @param offset number of records to skip - */ - def offset(offset: Int): Table = { - new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv)) - } - - /** - * Limits a sorted result to the first n rows. - * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and - * thus must be preceded by it. - * - * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return n rows - * after skipping the first o rows. - * - * {{{ - * // returns the first 3 records. - * tab.orderBy('name.desc).fetch(3) - * // skips the first 10 rows and returns the next 5 rows. - * tab.orderBy('name.desc).offset(10).fetch(5) - * }}} - * - * @param fetch the number of records to return. Fetch must be >= 0. - */ - def fetch(fetch: Int): Table = { - if (fetch < 0) { - throw new ValidationException("FETCH count must be equal or larger than 0.") - } - this.logicalPlan match { - case Limit(o, -1, c) => - // replace LIMIT without FETCH by LIMIT with FETCH - new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv)) - case Limit(_, _, _) => - throw new ValidationException("FETCH is already defined.") - case _ => - new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv)) - } - } - - /** - * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. - * - * A batch [[Table]] can only be written to a - * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a - * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an - * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. - * - * @param sink The [[TableSink]] to which the [[Table]] is written. - * @tparam T The data type that the [[TableSink]] expects. - * - * @deprecated Will be removed in a future release. Please register the TableSink and use - * Table.insertInto(). - */ - @deprecated("This method will be removed. Please register the TableSink and use " + - "Table.insertInto().", "1.7.0") - @Deprecated - def writeToSink[T](sink: TableSink[T]): Unit = { - val queryConfig = Option(this.tableEnv) match { - case None => null - case _ => this.tableEnv.queryConfig - } - writeToSink(sink, queryConfig) - } - - /** - * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. - * - * A batch [[Table]] can only be written to a - * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a - * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an - * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. - * - * @param sink The [[TableSink]] to which the [[Table]] is written. - * @param conf The configuration for the query that writes to the sink. - * @tparam T The data type that the [[TableSink]] expects. - * - * @deprecated Will be removed in a future release. Please register the TableSink and use - * Table.insertInto(). - */ - @deprecated("This method will be removed. Please register the TableSink and use " + - "Table.insertInto().", "1.7.0") - @Deprecated - def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = { - // get schema information of table - val rowType = getRelNode.getRowType - val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray - val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .map { - // replace time indicator types by SQL_TIMESTAMP - case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP - case t: TypeInformation[_] => t - }.toArray - - // configure the table sink - val configuredSink = sink.configure(fieldNames, fieldTypes) - - // emit the table to the configured table sink - tableEnv.writeToSink(this, configuredSink, conf) - } - - /** - * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. - * - * A batch [[Table]] can only be written to a - * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a - * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an - * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. - * - * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. - */ - def insertInto(tableName: String): Unit = { - insertInto(tableName, tableEnv.queryConfig) - } - - /** - * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. - * - * A batch [[Table]] can only be written to a - * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a - * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an - * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. - * - * @param tableName Name of the [[TableSink]] to which the [[Table]] is written. - * @param conf The [[QueryConfig]] to use. - */ - def insertInto(tableName: String, conf: QueryConfig): Unit = { - tableEnv.insertInto(this, tableName, conf) - } - - /** - * Groups the records of a table by assigning them to windows defined by a time or row interval. - * - * For streaming tables of infinite size, grouping into windows is required to define finite - * groups on which group-based aggregates can be computed. - * - * For batch tables of finite size, windowing essentially provides shortcuts for time-based - * groupBy. - * - * __Note__: Computing windowed aggregates on a streaming table is only a parallel operation - * if additional grouping attributes are added to the `groupBy(...)` clause. - * If the `groupBy(...)` only references a window alias, the streamed table will be processed - * by a single task, i.e., with parallelism 1. - * - * @param window groupWindow that specifies how elements are grouped. - * @return A group windowed table. - */ - def window(window: GroupWindow): GroupWindowedTable = { - new GroupWindowedTable(this, window) - } - - /** - * Defines over-windows on the records of a table. - * - * An over-window defines for each record an interval of records over which aggregation - * functions can be computed. - * - * Example: - * - * {{{ - * table - * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) - * .select('c, 'b.count over 'ow, 'e.sum over 'ow) - * }}} - * - * __Note__: Computing over window aggregates on a streaming table is only a parallel operation - * if the window is partitioned. Otherwise, the whole stream will be processed by a single - * task, i.e., with parallelism 1. - * - * __Note__: Over-windows for batch tables are currently not supported. - * - * @param overWindows windows that specify the record interval over which aggregations are - * computed. - * @return An OverWindowedTable to specify the aggregations. - */ - @varargs - def window(overWindows: OverWindow*): OverWindowedTable = { - - if (tableEnv.isInstanceOf[BatchTableEnvironment]) { - throw new TableException("Over-windows for batch tables are currently not supported.") - } - - if (overWindows.size != 1) { - throw new TableException("Over-Windows are currently only supported single window.") - } - - new OverWindowedTable(this, overWindows) - } - - /** - * Registers an unique table name under the table environment - * and return the registered table name. - */ - override def toString: String = { - if (tableName == null) { - tableName = "UnnamedTable$" + tableEnv.attrNameCntr.getAndIncrement() - tableEnv.registerTable(tableName, this) - } - tableName - } -} - -/** - * A table that has been grouped on a set of grouping keys. - */ -class GroupedTable( - private[flink] val table: Table, - private[flink] val groupKey: Seq[PlannerExpression]) { - - /** - * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * tab.groupBy("key").select("key, value.avg + ' The average' as average") - * }}} - */ - def select(fields: String): Table = { - select(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * tab.groupBy('key).select('key, 'value.avg + " The average" as 'average) - * }}} - */ - def select(fields: Expression*): Table = { - selectInternal(fields.map(table.expressionBridge.bridge)) - } - - private def selectInternal(fields: Seq[PlannerExpression]): Table = { - val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv) - val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv) - if (propNames.nonEmpty) { - throw new ValidationException("Window properties can only be used on windowed tables.") - } - - val projectsOnAgg = replaceAggregationsAndProperties( - expandedFields, table.tableEnv, aggNames, propNames) - val projectFields = extractFieldReferences(expandedFields ++ groupKey) - - new Table(table.tableEnv, - Project(projectsOnAgg, - Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq, - Project(projectFields, table.logicalPlan).validate(table.tableEnv) - ).validate(table.tableEnv) - ).validate(table.tableEnv)) - } -} - -/** - * A table that has been windowed for [[GroupWindow]]s. - */ -class GroupWindowedTable( - private[flink] val table: Table, - private[flink] val window: GroupWindow) { - - /** - * Groups the elements by a mandatory window and one or more optional grouping attributes. - * The window is specified by referring to its alias. - * - * If no additional grouping attribute is specified and if the input is a streaming table, - * the aggregation will be performed by a single task, i.e., with parallelism 1. - * - * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar - * to SQL SELECT-GROUP-BY query. - * - * Example: - * - * {{{ - * tab.window([window].as("w")).groupBy("w, key").select("key, value.avg") - * }}} - */ - def groupBy(fields: String): WindowGroupedTable = { - groupBy(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Groups the elements by a mandatory window and one or more optional grouping attributes. - * The window is specified by referring to its alias. - * - * If no additional grouping attribute is specified and if the input is a streaming table, - * the aggregation will be performed by a single task, i.e., with parallelism 1. - * - * Aggregations are performed per group and defined by a subsequent `select(...)` clause similar - * to SQL SELECT-GROUP-BY query. - * - * Example: - * - * {{{ - * tab.window([window] as 'w)).groupBy('w, 'key).select('key, 'value.avg) - * }}} - */ - def groupBy(fields: Expression*): WindowGroupedTable = { - val fieldsWithoutWindow = fields.filterNot(window.getAlias.equals(_)) - if (fields.size != fieldsWithoutWindow.size + 1) { - throw new ValidationException("GroupBy must contain exactly one window alias.") - } - - new WindowGroupedTable(table, fieldsWithoutWindow, window) - } -} - -/** - * A table that has been windowed and grouped for [[GroupWindow]]s. - */ -class WindowGroupedTable( - private[flink] val table: Table, - private[flink] val groupKeys: Seq[Expression], - private[flink] val window: GroupWindow) { - - /** - * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * windowGroupedTable.select("key, window.start, value.avg as valavg") - * }}} - */ - def select(fields: String): Table = { - select(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * windowGroupedTable.select('key, 'window.start, 'value.avg as 'valavg) - * }}} - */ - def select(fields: Expression*): Table = { - selectInternal( - groupKeys.map(table.expressionBridge.bridge), - createLogicalWindow(), - fields.map(table.expressionBridge.bridge)) - } - - private def selectInternal( - groupKeys: Seq[PlannerExpression], - window: LogicalWindow, - fields: Seq[PlannerExpression]): Table = { - val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv) - val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv) - - val projectsOnAgg = replaceAggregationsAndProperties( - expandedFields, table.tableEnv, aggNames, propNames) - - val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeAttribute) - - new Table(table.tableEnv, - Project( - projectsOnAgg, - WindowAggregate( - groupKeys, - window, - propNames.map(a => Alias(a._1, a._2)).toSeq, - aggNames.map(a => Alias(a._1, a._2)).toSeq, - Project(projectFields, table.logicalPlan).validate(table.tableEnv) - ).validate(table.tableEnv), - // required for proper resolution of the time attribute in multi-windows - explicitAlias = true - ).validate(table.tableEnv)) - } - - /** - * Converts an API class to a logical window for planning. - */ - private def createLogicalWindow(): LogicalWindow = window match { - case tw: TumbleWithSizeOnTimeWithAlias => - TumblingGroupWindow( - table.expressionBridge.bridge(tw.getAlias), - table.expressionBridge.bridge(tw.getTimeField), - table.expressionBridge.bridge(tw.getSize)) - case sw: SlideWithSizeAndSlideOnTimeWithAlias => - SlidingGroupWindow( - table.expressionBridge.bridge(sw.getAlias), - table.expressionBridge.bridge(sw.getTimeField), - table.expressionBridge.bridge(sw.getSize), - table.expressionBridge.bridge(sw.getSlide)) - case sw: SessionWithGapOnTimeWithAlias => - SessionGroupWindow( - table.expressionBridge.bridge(sw.getAlias), - table.expressionBridge.bridge(sw.getTimeField), - table.expressionBridge.bridge(sw.getGap)) - } -} - -/** - * A table that has been windowed for [[OverWindow]]s. - */ -class OverWindowedTable( - private[flink] val table: Table, - private[flink] val overWindows: Seq[OverWindow]) { - - /** - * Performs a selection operation on an over-windowed table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * overWindowedTable.select("c, b.count over ow, e.sum over ow") - * }}} - */ - def select(fields: String): Table = { - select(ExpressionParser.parseExpressionList(fields): _*) - } - - /** - * Performs a selection operation on an over-windowed table. Similar to an SQL SELECT statement. - * The field expressions can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * overWindowedTable.select('c, 'b.count over 'ow, 'e.sum over 'ow) - * }}} - */ - def select(fields: Expression*): Table = { - selectInternal( - fields.map(table.expressionBridge.bridge), - overWindows.map(createLogicalWindow)) - } - - private def selectInternal( - fields: Seq[PlannerExpression], - logicalOverWindows: Seq[LogicalOverWindow]) - : Table = { - - val expandedFields = expandProjectList( - fields, - table.logicalPlan, - table.tableEnv) - - if (fields.exists(_.isInstanceOf[WindowProperty])){ - throw new ValidationException( - "Window start and end properties are not available for Over windows.") - } - - val expandedOverFields = resolveOverWindows(expandedFields, logicalOverWindows, table.tableEnv) - - new Table( - table.tableEnv, - Project( - expandedOverFields.map(UnresolvedAlias), - table.logicalPlan, - // required for proper projection push down - explicitAlias = true) - .validate(table.tableEnv) - ) - } - - /** - * Converts an API class to a logical window for planning. - */ - private def createLogicalWindow(overWindow: OverWindow): LogicalOverWindow = { - LogicalOverWindow( - table.expressionBridge.bridge(overWindow.getAlias), - overWindow.getPartitioning.map(table.expressionBridge.bridge), - table.expressionBridge.bridge(overWindow.getOrder), - table.expressionBridge.bridge(overWindow.getPreceding), - JavaScalaConversionUtil.toScala(overWindow.getFollowing).map(table.expressionBridge.bridge) - ) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala new file mode 100644 index 0000000000000..148aecd4ff172 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api + +import org.apache.calcite.rel.RelNode +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.table.calcite.FlinkRelBuilder +import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionBridge, + ExpressionParser, Ordering, PlannerExpression, ResolvedFieldReference, UnresolvedAlias, + WindowProperty} +import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.plan.ProjectionTranslator._ +import org.apache.flink.table.plan.logical.{Minus, _} +import org.apache.flink.table.util.JavaScalaConversionUtil + +import _root_.scala.collection.JavaConversions._ + +/** + * The implementation of the [[Table]]. + * + * In [[TableImpl]], string expressions are parsed by [[ExpressionParser]] into [[Expression]]s. + * + * __NOTE__: Currently, the implementation depends on Calcite. + * + * @param tableEnv The [[TableEnvironment]] to which the table is bound. + * @param logicalPlan logical representation + */ +class TableImpl( + private[flink] val tableEnv: TableEnvironment, + private[flink] val logicalPlan: LogicalNode) extends Table { + + private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] = + tableEnv.expressionBridge + + private lazy val tableSchema: TableSchema = new TableSchema( + logicalPlan.output.map(_.name).toArray, + logicalPlan.output.map(_.resultType).toArray) + + var tableName: String = _ + + def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder + + def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) + + override def getSchema: TableSchema = tableSchema + + override def printSchema(): Unit = print(tableSchema.toString) + + override def select(fields: String): Table = { + select(ExpressionParser.parseExpressionList(fields): _*) + } + + override def select(fields: Expression*): Table = { + selectInternal(fields.map(expressionBridge.bridge)) + } + + private def selectInternal(fields: Seq[PlannerExpression]): Table = { + val expandedFields = expandProjectList(fields, logicalPlan, tableEnv) + val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv) + if (propNames.nonEmpty) { + throw new ValidationException("Window properties can only be used on windowed tables.") + } + + if (aggNames.nonEmpty) { + val projectsOnAgg = replaceAggregationsAndProperties( + expandedFields, tableEnv, aggNames, propNames) + val projectFields = extractFieldReferences(expandedFields) + + new TableImpl(tableEnv, + Project(projectsOnAgg, + Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq, + Project(projectFields, logicalPlan).validate(tableEnv) + ).validate(tableEnv) + ).validate(tableEnv) + ) + } else { + new TableImpl(tableEnv, + Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv)) + } + } + + override def createTemporalTableFunction( + timeAttribute: String, + primaryKey: String) + : TemporalTableFunction = { + createTemporalTableFunction( + ExpressionParser.parseExpression(timeAttribute), + ExpressionParser.parseExpression(primaryKey)) + } + + override def createTemporalTableFunction( + timeAttribute: Expression, + primaryKey: Expression) + : TemporalTableFunction = { + createTemporalTableFunctionInternal( + expressionBridge.bridge(timeAttribute), + expressionBridge.bridge(primaryKey)) + } + + private def createTemporalTableFunctionInternal( + timeAttribute: PlannerExpression, + primaryKey: PlannerExpression) + : TemporalTableFunction = { + val temporalTable = TemporalTable(timeAttribute, primaryKey, logicalPlan) + .validate(tableEnv) + .asInstanceOf[TemporalTable] + + TemporalTableFunctionImpl.create( + this, + temporalTable.timeAttribute, + validatePrimaryKeyExpression(temporalTable.primaryKey)) + } + + private def validatePrimaryKeyExpression(expression: Expression): String = { + expression match { + case fieldReference: ResolvedFieldReference => + fieldReference.name + case _ => throw new ValidationException( + s"Unsupported expression [$expression] as primary key. " + + s"Only top-level (not nested) field references are supported.") + } + } + + override def as(fields: String): Table = { + as(ExpressionParser.parseExpressionList(fields): _*) + } + + override def as(fields: Expression*): Table = { + asInternal(fields.map(tableEnv.expressionBridge.bridge)) + } + + private def asInternal(fields: Seq[PlannerExpression]): Table = { + new TableImpl(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv)) + } + + override def filter(predicate: String): Table = { + filter(ExpressionParser.parseExpression(predicate)) + } + + override def filter(predicate: Expression): Table = { + filterInternal(expressionBridge.bridge(predicate)) + } + + private def filterInternal(predicate: PlannerExpression): Table = { + new TableImpl(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + } + + override def where(predicate: String): Table = { + filter(predicate) + } + + override def where(predicate: Expression): Table = { + filter(predicate) + } + + override def groupBy(fields: String): GroupedTable = { + groupBy(ExpressionParser.parseExpressionList(fields): _*) + } + + override def groupBy(fields: Expression*): GroupedTable = { + groupByInternal(fields.map(expressionBridge.bridge)) + } + + private def groupByInternal(fields: Seq[PlannerExpression]): GroupedTable = { + new GroupedTableImpl(this, fields) + } + + override def distinct(): Table = { + new TableImpl(tableEnv, Distinct(logicalPlan).validate(tableEnv)) + } + + override def join(right: Table): Table = { + joinInternal(right, None, JoinType.INNER) + } + + override def join(right: Table, joinPredicate: String): Table = { + join(right, ExpressionParser.parseExpression(joinPredicate)) + } + + override def join(right: Table, joinPredicate: Expression): Table = { + joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.INNER) + } + + override def leftOuterJoin(right: Table): Table = { + joinInternal(right, None, JoinType.LEFT_OUTER) + } + + override def leftOuterJoin(right: Table, joinPredicate: String): Table = { + leftOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) + } + + override def leftOuterJoin(right: Table, joinPredicate: Expression): Table = { + joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.LEFT_OUTER) + } + + override def rightOuterJoin(right: Table, joinPredicate: String): Table = { + rightOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) + } + + override def rightOuterJoin(right: Table, joinPredicate: Expression): Table = { + joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.RIGHT_OUTER) + } + + override def fullOuterJoin(right: Table, joinPredicate: String): Table = { + fullOuterJoin(right, ExpressionParser.parseExpression(joinPredicate)) + } + + override def fullOuterJoin(right: Table, joinPredicate: Expression): Table = { + joinInternal(right, Some(expressionBridge.bridge(joinPredicate)), JoinType.FULL_OUTER) + } + + private def joinInternal( + right: Table, + joinPredicate: Option[PlannerExpression], + joinType: JoinType) + : Table = { + // check that the TableEnvironment of right table is not null + // and right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be joined.") + } + + new TableImpl( + tableEnv, + Join( + this.logicalPlan, + right.asInstanceOf[TableImpl].logicalPlan, + joinType, + joinPredicate, + correlated = false).validate(tableEnv)) + } + + override def joinLateral(tableFunctionCall: String): Table = { + joinLateral(ExpressionParser.parseExpression(tableFunctionCall)) + } + + override def joinLateral(tableFunctionCall: Expression): Table = { + joinLateralInternal(expressionBridge.bridge(tableFunctionCall), None, JoinType.INNER) + } + + override def joinLateral(tableFunctionCall: String, joinPredicate: String): Table = { + joinLateral( + ExpressionParser.parseExpression(tableFunctionCall), + ExpressionParser.parseExpression(joinPredicate)) + } + + override def joinLateral(tableFunctionCall: Expression, joinPredicate: Expression): Table = { + joinLateralInternal( + expressionBridge.bridge(tableFunctionCall), + Some(expressionBridge.bridge(joinPredicate)), + JoinType.INNER) + } + + override def leftOuterJoinLateral(tableFunctionCall: String): Table = { + leftOuterJoinLateral(ExpressionParser.parseExpression(tableFunctionCall)) + } + + override def leftOuterJoinLateral(tableFunctionCall: Expression): Table = { + joinLateralInternal(expressionBridge.bridge(tableFunctionCall), None, JoinType.LEFT_OUTER) + } + + override def leftOuterJoinLateral(tableFunctionCall: String, joinPredicate: String): Table = { + leftOuterJoinLateral( + ExpressionParser.parseExpression(tableFunctionCall), + ExpressionParser.parseExpression(joinPredicate)) + } + + override def leftOuterJoinLateral( + tableFunctionCall: Expression, joinPredicate: Expression): Table = { + joinLateralInternal( + expressionBridge.bridge(tableFunctionCall), + Some(expressionBridge.bridge(joinPredicate)), + JoinType.LEFT_OUTER) + } + + private def joinLateralInternal( + callExpr: PlannerExpression, + joinPredicate: Option[PlannerExpression], + joinType: JoinType): Table = { + + // check join type + if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) { + throw new ValidationException( + "Table functions are currently only supported for inner and left outer lateral joins.") + } + + val logicalCall = UserDefinedFunctionUtils.createLogicalFunctionCall( + callExpr, + logicalPlan) + val validatedLogicalCall = logicalCall.validate(tableEnv) + + new TableImpl( + tableEnv, + Join( + logicalPlan, + validatedLogicalCall, + joinType, + joinPredicate, + correlated = true + ).validate(tableEnv)) + } + + override def minus(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be " + + "subtracted.") + } + new TableImpl( + tableEnv, Minus(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = false) + .validate(tableEnv)) + } + + override def minusAll(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be " + + "subtracted.") + } + new TableImpl( + tableEnv, Minus(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = true) + .validate(tableEnv)) + } + + override def union(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") + } + new TableImpl( + tableEnv, Union(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = false) + .validate(tableEnv)) + } + + override def unionAll(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") + } + new TableImpl( + tableEnv, Union(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = true) + .validate(tableEnv)) + } + + override def intersect(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException( + "Only tables from the same TableEnvironment can be intersected.") + } + new TableImpl( + tableEnv, Intersect(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = false) + .validate(tableEnv)) + } + + override def intersectAll(right: Table): Table = { + // check that right table belongs to the same TableEnvironment + if (right.asInstanceOf[TableImpl].tableEnv != this.tableEnv) { + throw new ValidationException( + "Only tables from the same TableEnvironment can be intersected.") + } + new TableImpl( + tableEnv, Intersect(logicalPlan, right.asInstanceOf[TableImpl].logicalPlan, all = true) + .validate(tableEnv)) + } + + override def orderBy(fields: String): Table = { + orderBy(ExpressionParser.parseExpressionList(fields): _*) + } + + override def orderBy(fields: Expression*): Table = { + orderByInternal(fields.map(expressionBridge.bridge)) + } + + private def orderByInternal(fields: Seq[PlannerExpression]): Table = { + val order: Seq[Ordering] = fields.map { + case o: Ordering => o + case e => Asc(e) + } + new TableImpl(tableEnv, Sort(order, logicalPlan).validate(tableEnv)) + } + + override def offset(offset: Int): Table = { + new TableImpl(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv)) + } + + override def fetch(fetch: Int): Table = { + if (fetch < 0) { + throw new ValidationException("FETCH count must be equal or larger than 0.") + } + this.logicalPlan match { + case Limit(o, -1, c) => + // replace LIMIT without FETCH by LIMIT with FETCH + new TableImpl(tableEnv, Limit(o, fetch, c).validate(tableEnv)) + case Limit(_, _, _) => + throw new ValidationException("FETCH is already defined.") + case _ => + new TableImpl(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv)) + } + } + + override def insertInto(tableName: String): Unit = { + insertInto(tableName, tableEnv.queryConfig) + } + + override def insertInto(tableName: String, conf: QueryConfig): Unit = { + tableEnv.insertInto(this, tableName, conf) + } + + override def window(window: GroupWindow): GroupWindowedTable = { + new GroupWindowedTableImpl(this, window) + } + + override def window(overWindows: OverWindow*): OverWindowedTable = { + + if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw new TableException("Over-windows for batch tables are currently not supported.") + } + + if (overWindows.size != 1) { + throw new TableException("Over-Windows are currently only supported single window.") + } + + new OverWindowedTableImpl(this, overWindows) + } + + /** + * Registers an unique table name under the table environment + * and return the registered table name. + */ + override def toString: String = { + if (tableName == null) { + tableName = "UnnamedTable$" + tableEnv.attrNameCntr.getAndIncrement() + tableEnv.registerTable(tableName, this) + } + tableName + } +} + +/** + * The implementation of a [[GroupedTable]] that has been grouped on a set of grouping keys. + */ +class GroupedTableImpl( + private[flink] val table: Table, + private[flink] val groupKey: Seq[PlannerExpression]) + extends GroupedTable { + + private val tableImpl = table.asInstanceOf[TableImpl] + + override def select(fields: String): Table = { + select(ExpressionParser.parseExpressionList(fields): _*) + } + + override def select(fields: Expression*): Table = { + selectInternal(fields.map(tableImpl.expressionBridge.bridge)) + } + + private def selectInternal(fields: Seq[PlannerExpression]): Table = { + val expandedFields = expandProjectList(fields, tableImpl.logicalPlan, tableImpl.tableEnv) + val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableImpl.tableEnv) + if (propNames.nonEmpty) { + throw new ValidationException("Window properties can only be used on windowed tables.") + } + + val projectsOnAgg = replaceAggregationsAndProperties( + expandedFields, tableImpl.tableEnv, aggNames, propNames) + val projectFields = extractFieldReferences(expandedFields ++ groupKey) + + new TableImpl(tableImpl.tableEnv, + Project(projectsOnAgg, + Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq, + Project(projectFields, tableImpl.logicalPlan).validate(tableImpl.tableEnv) + ).validate(tableImpl.tableEnv) + ).validate(tableImpl.tableEnv)) + } +} + +/** + * The implementation of a [[GroupWindowedTable]] that has been windowed for [[GroupWindow]]s. + */ +class GroupWindowedTableImpl( + private[flink] val table: Table, + private[flink] val window: GroupWindow) + extends GroupWindowedTable { + + override def groupBy(fields: String): WindowGroupedTable = { + groupBy(ExpressionParser.parseExpressionList(fields): _*) + } + + override def groupBy(fields: Expression*): WindowGroupedTable = { + val fieldsWithoutWindow = fields.filterNot(window.getAlias.equals(_)) + if (fields.size != fieldsWithoutWindow.size + 1) { + throw new ValidationException("GroupBy must contain exactly one window alias.") + } + + new WindowGroupedTableImpl(table, fieldsWithoutWindow, window) + } +} + +/** + * The implementation of a [[WindowGroupedTable]] that has been windowed and grouped for + * [[GroupWindow]]s. + */ +class WindowGroupedTableImpl( + private[flink] val table: Table, + private[flink] val groupKeys: Seq[Expression], + private[flink] val window: GroupWindow) + extends WindowGroupedTable { + + private val tableImpl = table.asInstanceOf[TableImpl] + + override def select(fields: String): Table = { + select(ExpressionParser.parseExpressionList(fields): _*) + } + + override def select(fields: Expression*): Table = { + selectInternal( + groupKeys.map(tableImpl.expressionBridge.bridge), + createLogicalWindow(), + fields.map(tableImpl.expressionBridge.bridge)) + } + + private def selectInternal( + groupKeys: Seq[PlannerExpression], + window: LogicalWindow, + fields: Seq[PlannerExpression]): Table = { + val expandedFields = expandProjectList(fields, tableImpl.logicalPlan, tableImpl.tableEnv) + val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableImpl.tableEnv) + + val projectsOnAgg = replaceAggregationsAndProperties( + expandedFields, tableImpl.tableEnv, aggNames, propNames) + + val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeAttribute) + + new TableImpl(tableImpl.tableEnv, + Project( + projectsOnAgg, + WindowAggregate( + groupKeys, + window, + propNames.map(a => Alias(a._1, a._2)).toSeq, + aggNames.map(a => Alias(a._1, a._2)).toSeq, + Project(projectFields, tableImpl.logicalPlan).validate(tableImpl.tableEnv) + ).validate(tableImpl.tableEnv), + // required for proper resolution of the time attribute in multi-windows + explicitAlias = true + ).validate(tableImpl.tableEnv)) + } + + /** + * Converts an API class to a logical window for planning. + */ + private def createLogicalWindow(): LogicalWindow = window match { + case tw: TumbleWithSizeOnTimeWithAlias => + TumblingGroupWindow( + tableImpl.expressionBridge.bridge(tw.getAlias), + tableImpl.expressionBridge.bridge(tw.getTimeField), + tableImpl.expressionBridge.bridge(tw.getSize)) + case sw: SlideWithSizeAndSlideOnTimeWithAlias => + SlidingGroupWindow( + tableImpl.expressionBridge.bridge(sw.getAlias), + tableImpl.expressionBridge.bridge(sw.getTimeField), + tableImpl.expressionBridge.bridge(sw.getSize), + tableImpl.expressionBridge.bridge(sw.getSlide)) + case sw: SessionWithGapOnTimeWithAlias => + SessionGroupWindow( + tableImpl.expressionBridge.bridge(sw.getAlias), + tableImpl.expressionBridge.bridge(sw.getTimeField), + tableImpl.expressionBridge.bridge(sw.getGap)) + } +} + +/** + * The implementation of an [[OverWindowedTable]] that has been windowed for [[OverWindow]]s. + */ +class OverWindowedTableImpl( + private[flink] val table: Table, + private[flink] val overWindows: Seq[OverWindow]) + extends OverWindowedTable{ + + private val tableImpl = table.asInstanceOf[TableImpl] + + override def select(fields: String): Table = { + select(ExpressionParser.parseExpressionList(fields): _*) + } + + override def select(fields: Expression*): Table = { + selectInternal( + fields.map(tableImpl.expressionBridge.bridge), + overWindows.map(createLogicalWindow)) + } + + private def selectInternal( + fields: Seq[PlannerExpression], + logicalOverWindows: Seq[LogicalOverWindow]) + : Table = { + + val expandedFields = expandProjectList( + fields, + tableImpl.logicalPlan, + tableImpl.tableEnv) + + if (fields.exists(_.isInstanceOf[WindowProperty])){ + throw new ValidationException( + "Window start and end properties are not available for Over windows.") + } + + val expandedOverFields = + resolveOverWindows(expandedFields, logicalOverWindows, tableImpl.tableEnv) + + new TableImpl( + tableImpl.tableEnv, + Project( + expandedOverFields.map(UnresolvedAlias), + tableImpl.logicalPlan, + // required for proper projection push down + explicitAlias = true) + .validate(tableImpl.tableEnv) + ) + } + + /** + * Converts an API class to a logical window for planning. + */ + private def createLogicalWindow(overWindow: OverWindow): LogicalOverWindow = { + LogicalOverWindow( + tableImpl.expressionBridge.bridge(overWindow.getAlias), + overWindow.getPartitioning.map(tableImpl.expressionBridge.bridge), + tableImpl.expressionBridge.bridge(overWindow.getOrder), + tableImpl.expressionBridge.bridge(overWindow.getPreceding), + JavaScalaConversionUtil + .toScala(overWindow.getFollowing).map(tableImpl.expressionBridge.bridge) + ) + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala index 6f846abc36fda..28d5b141f06f8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala @@ -24,7 +24,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.TableImpl import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -39,7 +39,7 @@ case class In(expression: PlannerExpression, elements: Seq[PlannerExpression]) // check if this is a sub-query expression or an element list elements.head match { - case TableReference(_, table: Table) => + case TableReference(_, table: TableImpl) => RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode)) case _ => @@ -51,7 +51,7 @@ case class In(expression: PlannerExpression, elements: Seq[PlannerExpression]) // check if this is a sub-query expression or an element list elements.head match { - case TableReference(name, table: Table) => + case TableReference(name, table: TableImpl) => if (elements.length != 1) { return ValidationFailure("IN operator supports only one table reference.") } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunctionImpl.scala similarity index 93% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala rename to flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunctionImpl.scala index 508dd2543d00f..2212475960739 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/TemporalTableFunctionImpl.scala @@ -23,7 +23,6 @@ import java.sql.Timestamp import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.Table import org.apache.flink.table.expressions.PlannerExpression -import org.apache.flink.types.Row /** * Class representing temporal table function over some history table. @@ -33,12 +32,12 @@ import org.apache.flink.types.Row * This function shouldn't be evaluated. Instead calls to it should be rewritten by the optimiser * into other operators (like Temporal Table Join). */ -class TemporalTableFunction private( +class TemporalTableFunctionImpl private( @transient private val underlyingHistoryTable: Table, private val timeAttribute: PlannerExpression, private val primaryKey: String, private val resultType: RowTypeInfo) - extends TableFunction[Row] { + extends TemporalTableFunction { def eval(row: Timestamp): Unit = { throw new IllegalStateException("This should never be called") @@ -64,12 +63,12 @@ class TemporalTableFunction private( } } -object TemporalTableFunction { +object TemporalTableFunctionImpl { private[flink] def create( table: Table, timeAttribute: PlannerExpression, primaryKey: String): TemporalTableFunction = { - new TemporalTableFunction( + new TemporalTableFunctionImpl( table, timeAttribute, primaryKey, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala index 3e8f0368d2f85..232addeced9f8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala @@ -24,10 +24,10 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.TableFunctionScan import org.apache.calcite.rel.logical.LogicalCorrelate import org.apache.calcite.rex._ -import org.apache.flink.table.api.{Table, Types, ValidationException} +import org.apache.flink.table.api.{TableImpl, Types, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory.{isProctimeIndicatorType, isTimeIndicatorType} import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin import org.apache.flink.table.plan.util.RexDefaultVisitor @@ -64,9 +64,12 @@ class LogicalCorrelateToTemporalTableJoinRule .visit(rightTableFunctionScan.getCall) match { case None => // Do nothing and handle standard TableFunction - case Some(TemporalTableFunctionCall(rightTemporalTableFunction, leftTimeAttribute)) => + case Some(TemporalTableFunctionCall( + rightTemporalTableFunction: TemporalTableFunctionImpl, leftTimeAttribute)) => + // If TemporalTableFunction was found, rewrite LogicalCorrelate to TemporalJoin - val underlyingHistoryTable: Table = rightTemporalTableFunction.getUnderlyingHistoryTable + val underlyingHistoryTable: TableImpl = + rightTemporalTableFunction.getUnderlyingHistoryTable.asInstanceOf[TableImpl] val relBuilder = this.relBuilderFactory.create( cluster, underlyingHistoryTable.relBuilder.getRelOptSchema) @@ -162,7 +165,8 @@ class GetTemporalTableFunctionCall( if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) { return null } - val temporalTableFunction = tableFunction.getTableFunction.asInstanceOf[TemporalTableFunction] + val temporalTableFunction = + tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl] checkState( rexCall.getOperands.size().equals(1), @@ -181,7 +185,7 @@ class GetTemporalTableFunctionCall( * for join condition context without `$cor` reference. */ class CorrelatedFieldAccessRemoval( - var temporalTableFunction: TemporalTableFunction, + var temporalTableFunction: TemporalTableFunctionImpl, var rexBuilder: RexBuilder, var leftSide: RelNode) extends RexDefaultVisitor[RexNode] { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 5c6c7ff5dad07..be3da6d55b213 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -19,10 +19,9 @@ package org.apache.flink.table.api.stream.sql import org.apache.calcite.rel.logical.LogicalJoin import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Types +import org.apache.flink.table.api.{TableImpl, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.calcite.RelTimeIndicatorConverter -import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil import org.apache.flink.table.utils.TableTestUtil.{term, _} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} @@ -989,7 +988,7 @@ class JoinTest extends TableTestBase { val resultTable = streamUtil.tableEnv.sqlQuery(query) val relNode = RelTimeIndicatorConverter.convert( - resultTable.getRelNode, + resultTable.asInstanceOf[TableImpl].getRelNode, streamUtil.tableEnv.getRelBuilder.getRexBuilder) val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val (windowBounds, _) = @@ -1014,7 +1013,7 @@ class JoinTest extends TableTestBase { val resultTable = streamUtil.tableEnv.sqlQuery(query) val relNode = RelTimeIndicatorConverter.convert( - resultTable.getRelNode, + resultTable.asInstanceOf[TableImpl].getRelNode, streamUtil.tableEnv.getRelBuilder.getRexBuilder) val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val (_, remainCondition) = diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala index d98248dc1de89..165f6d0c047bb 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.api.{TableSchema, ValidationException} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.stream.table.TemporalTableJoinTest._ import org.apache.flink.table.expressions.ResolvedFieldReference -import org.apache.flink.table.functions.TemporalTableFunction +import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl} import org.apache.flink.table.plan.logical.rel.LogicalTemporalTableJoin._ import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils._ @@ -153,8 +153,9 @@ class TemporalTableJoinTest extends TableTestBase { private def assertRatesFunction( expectedSchema: TableSchema, - rates: TemporalTableFunction, + inputRates: TemporalTableFunction, proctime: Boolean = false): Unit = { + val rates = inputRates.asInstanceOf[TemporalTableFunctionImpl] assertEquals("currency", rates.getPrimaryKey) assertTrue(rates.getTimeAttribute.isInstanceOf[ResolvedFieldReference]) assertEquals( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala index 350ad06dd8c16..aa74dcc776976 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{Over, Table, Tumble, ValidationException} +import org.apache.flink.table.api._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithRetract} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test @@ -30,6 +31,14 @@ class OverWindowValidationTest extends TableTestBase { val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + /** + * Perform optimization for the input Table. + */ + def optimizeTable(table: Table, updatesAsRetraction: Boolean): Unit = { + streamUtil.tableEnv + .optimize(table.asInstanceOf[TableImpl].getRelNode, updatesAsRetraction = true) + } + /** * OVER clause is necessary for [[OverAgg0]] window function. */ @@ -61,7 +70,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'x) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -69,7 +78,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -77,7 +86,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -85,7 +94,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -93,7 +102,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -103,7 +112,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table2 .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -111,7 +120,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding -1.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -119,7 +128,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -129,7 +138,7 @@ class OverWindowValidationTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding 1.minutes as 'w) .select('c, weightedAvg('b, 'a) over 'w) - streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true) + optimizeTable(result, updatesAsRetraction = true) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index edf56a2c9fb1b..861bb37e04a71 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -38,7 +38,7 @@ import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.table.api.scala.BatchTableEnvironment -import org.apache.flink.table.api.{TableConfig, TableEnvironment} +import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableImpl} import org.apache.flink.table.calcite.FlinkPlannerImpl import org.apache.flink.table.codegen.{Compiler, FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.expressions.{Expression, ExpressionParser} @@ -201,6 +201,7 @@ abstract class ExpressionTestBase { val converted = env .scan(tableName) .select(tableApiExpr) + .asInstanceOf[TableImpl] .getRelNode val optimized = env.optimize(converted) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index 5b4f36480e7b2..b59e2396af137 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan import org.apache.calcite.rel.RelNode import org.apache.flink.api.scala._ -import org.apache.flink.table.api.{Table, Tumble} +import org.apache.flink.table.api.{Table, TableImpl, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.nodes.datastream._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.CountDistinct @@ -506,7 +506,7 @@ class StreamTableTestForRetractionUtil extends StreamTableTestUtil { } def verifyTableTrait(resultTable: Table, expected: String): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false) val actual = TraitUtil.toString(optimized) assertEquals( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala index a57cabde69c91..b90f94d598da8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala @@ -18,12 +18,12 @@ package org.apache.flink.table.plan -import org.apache.flink.table.api.{Table, Tumble} +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Table, TableImpl, Tumble} import org.apache.flink.table.plan.util.UpdatingPlanChecker import org.apache.flink.table.utils.StreamTableTestUtil import org.junit.Assert._ -import org.apache.flink.table.api.scala._ -import org.apache.flink.api.scala._ import org.junit.Test class UpdatingPlanCheckerTest { @@ -316,7 +316,7 @@ class UpdatePlanCheckerUtil extends StreamTableTestUtil { } def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false) UpdatingPlanChecker.getUniqueKeyGroups(optimized) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 1fd2c38dff554..ec3cbbac868b9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -24,13 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet} import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment} +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{Table, TableSchema} +import org.apache.flink.table.api.{Table, TableImpl, TableSchema} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.junit.Assert.assertEquals @@ -62,8 +62,10 @@ class TableTestBase { def verifyTableEquals(expected: Table, actual: Table): Unit = { assertEquals( "Logical plans do not match", - LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString(expected.getRelNode)), - LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString(actual.getRelNode))) + LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString( + expected.asInstanceOf[TableImpl].getRelNode)), + LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString( + actual.asInstanceOf[TableImpl].getRelNode))) } } @@ -87,7 +89,7 @@ abstract class TableTestUtil { def verifyTable(resultTable: Table, expected: String): Unit def verifySchema(resultTable: Table, fields: Seq[(String, TypeInformation[_])]): Unit = { - val actual = resultTable.getSchema + val actual = resultTable.asInstanceOf[TableImpl].getSchema val expected = new TableSchema(fields.map(_._1).toArray, fields.map(_._2).toArray) assertEquals(expected, actual) } @@ -240,7 +242,7 @@ case class BatchTableTestUtil() extends TableTestUtil { } def verifyTable(resultTable: Table, expected: String): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode) verifyString(expected, optimized) } @@ -250,13 +252,13 @@ case class BatchTableTestUtil() extends TableTestUtil { } def verifyJavaTable(resultTable: Table, expected: String): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = javaTableEnv.optimize(relNode) verifyString(expected, optimized) } def printTable(resultTable: Table): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode) println(RelOptUtil.toString(optimized)) } @@ -323,15 +325,15 @@ case class StreamTableTestUtil() extends TableTestUtil { } def verifyTable(resultTable: Table, expected: String): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false) verifyString(expected, optimized) } def verify2Tables(resultTable1: Table, resultTable2: Table): Unit = { - val relNode1 = resultTable1.getRelNode + val relNode1 = resultTable1.asInstanceOf[TableImpl].getRelNode val optimized1 = tableEnv.optimize(relNode1, updatesAsRetraction = false) - val relNode2 = resultTable2.getRelNode + val relNode2 = resultTable2.asInstanceOf[TableImpl].getRelNode val optimized2 = tableEnv.optimize(relNode2, updatesAsRetraction = false) assertEquals(RelOptUtil.toString(optimized1), RelOptUtil.toString(optimized2)) } @@ -341,14 +343,14 @@ case class StreamTableTestUtil() extends TableTestUtil { } def verifyJavaTable(resultTable: Table, expected: String): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = javaTableEnv.optimize(relNode, updatesAsRetraction = false) verifyString(expected, optimized) } // the print methods are for debugging purposes only def printTable(resultTable: Table): Unit = { - val relNode = resultTable.getRelNode + val relNode = resultTable.asInstanceOf[TableImpl].getRelNode val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false) println(RelOptUtil.toString(optimized)) }