From 5e90ed95a580aefd84b72f593954d01f4eb67f68 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Thu, 11 Oct 2018 14:06:46 +0200 Subject: [PATCH] [FLINK-10528][table] Remove methods that were deprecated in Flink 1.4.0. - remove TableEnvironment.sql() deprecated with FLINK-6442 - remove StreamTableEnvironment.toDataStream() and TableConversions.toDataStream() deprecated with FLINK-6543 - remove Table.limit() deprecated with FLINK-7821 This closes #6826. --- .../cassandra/CassandraConnectorITCase.java | 10 +- .../flink/addons/hbase/HBaseTableSource.java | 2 +- .../org/apache/flink/orc/OrcTableSource.java | 2 +- .../flink/table/api/TableEnvironment.scala | 24 ---- .../api/java/StreamTableEnvironment.scala | 109 ------------------ .../api/scala/StreamTableEnvironment.scala | 49 -------- .../table/api/scala/TableConversions.scala | 46 -------- .../org/apache/flink/table/api/table.scala | 45 -------- .../flink/table/functions/TableFunction.scala | 2 +- .../table/plan/UpdatingPlanCheckerTest.scala | 2 +- .../table/utils/MockTableEnvironment.scala | 2 +- 11 files changed, 10 insertions(+), 283 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 56a256c8db572..4ecfa8913d4c1 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -457,11 +457,11 @@ public void testCassandraTableSink() throws Exception { DataStreamSource source = env.fromCollection(rowCollection); tEnv.registerDataStreamInternal("testFlinkTable", source); - tEnv.registerTableSink("cassandraTable", - new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)) - .configure( - new String[]{"f0", "f1", "f2"}, - new TypeInformation[]{Types.STRING, Types.INT, Types.INT})); + tEnv.registerTableSink( + "cassandraTable", + new String[]{"f0", "f1", "f2"}, + new TypeInformation[]{Types.STRING, Types.INT, Types.INT}, + new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))); tEnv.sqlQuery("select * from testFlinkTable").insertInto("cassandraTable"); diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 27b75d49ac919..4bb0f31978c25 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -51,7 +51,7 @@ * hSrc.addColumn("fam2", "col1", String.class); * * tableEnv.registerTableSource("hTable", hSrc); - * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); + * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); * } * * diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java index 860999c80dba0..6e3ada4c493dc 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java @@ -75,7 +75,7 @@ * .build(); * * tEnv.registerTableSource("orcTable", orcSrc); - * Table res = tableEnv.sql("SELECT * FROM orcTable"); + * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable"); * } * */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index d740c3f1f9997..6fe0a9e3a6450 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) { */ def explain(table: Table): String - /** - * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. - * - * All tables referenced by the query must be registered in the TableEnvironment. - * A [[Table]] is automatically registered when its [[toString]] method is called, for example - * when it is embedded into a String. - * Hence, SQL queries can directly reference a [[Table]] as follows: - * - * {{{ - * val table: Table = ... - * // the table is not registered to the table environment - * tEnv.sql(s"SELECT * FROM $table") - * }}} - * - * @deprecated Use sqlQuery() instead. - * @param query The SQL query to evaluate. - * @return The result of the query as Table. - */ - @Deprecated - @deprecated("Please use sqlQuery() instead.") - def sql(query: String): Table = { - sqlQuery(query) - } - /** * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 2da08faa12e94..53fb7a08d474b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -133,115 +133,6 @@ class StreamTableEnvironment( registerDataStreamInternal(name, dataStream, exprs) } - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @param clazz The class of the type of the resulting [[DataStream]]. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - * @deprecated This method only supports conversion of append-only tables. In order to - * make this more explicit in the future, please use toAppendStream() instead. - */ - @Deprecated - def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) - - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - * @deprecated This method only supports conversion of append-only tables. In order to - * make this more explicit in the future, please use toAppendStream() instead. - */ - @Deprecated - def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = - toAppendStream(table, typeInfo) - - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @param clazz The class of the type of the resulting [[DataStream]]. - * @param queryConfig The configuration of the query to generate. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - * @deprecated This method only supports conversion of append-only tables. In order to - * make this more explicit in the future, please use toAppendStream() instead. - */ - @Deprecated - def toDataStream[T]( - table: Table, - clazz: Class[T], - queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig) - - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. - * @param queryConfig The configuration of the query to generate. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - * @deprecated This method only supports conversion of append-only tables. In order to - * make this more explicit in the future, please use toAppendStream() instead. - */ - @Deprecated - def toDataStream[T]( - table: Table, - typeInfo: TypeInformation[T], - queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig) - /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 14ea84ae20266..c8d520c6bc0f8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -127,55 +127,6 @@ class StreamTableEnvironment( registerDataStreamInternal(name, dataStream.javaStream, fields.toArray) } - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field - * types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - */ - @deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") - def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table) - - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field - * types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param table The [[Table]] to convert. - * @param queryConfig The configuration of the query to generate. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - */ - @deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") - def toDataStream[T: TypeInformation]( - table: Table, - queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig) - /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 908bb66beff6b..9812a1a4e2e56 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -77,52 +77,6 @@ class TableConversions(table: Table) { } } - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field - * types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - */ - @deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") - def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream - - /** - * Converts the given [[Table]] into an append [[DataStream]] of a specified type. - * - * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified - * by update or delete changes, the conversion will fail. - * - * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field - * types must match. - * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. - * - * NOTE: This method only supports conversion of append-only tables. In order to make this - * more explicit in the future, please use [[toAppendStream()]] instead. - * If add and retract messages are required, use [[toRetractStream()]]. - * - * @param queryConfig The configuration of the query to generate. - * @tparam T The type of the resulting [[DataStream]]. - * @return The converted [[DataStream]]. - */ - @deprecated("This method only supports conversion of append-only tables. In order to make this" + - " more explicit in the future, please use toAppendStream() instead.") - def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = - toAppendStream(queryConfig) - /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 00374246fbe76..2b9570cdda57a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -782,51 +782,6 @@ class Table( orderBy(parsedFields: _*) } - /** - * Limits a sorted result from an offset position. - * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and - * thus must be preceded by it. - * - * Example: - * - * {{{ - * // skips the first 3 rows and returns all following rows. - * tab.orderBy('name.desc).limit(3) - * }}} - * - * @param offset number of records to skip - * - * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead. - */ - @Deprecated - @deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0") - def limit(offset: Int): Table = { - new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv)) - } - - /** - * Limits a sorted result to a specified number of records from an offset position. - * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and - * thus must be preceded by it. - * - * Example: - * - * {{{ - * // skips the first 3 rows and returns the next 5 rows. - * tab.orderBy('name.desc).limit(3, 5) - * }}} - * - * @param offset number of records to skip - * @param fetch number of records to be returned - * - * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead. - */ - @Deprecated - @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0") - def limit(offset: Int, fetch: Int): Table = { - new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv)) - } - /** * Limits a sorted result from an offset position. * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala index 489b0e66a18eb..e892a4cddcc7e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala @@ -73,7 +73,7 @@ import org.apache.flink.util.Collector * * // for SQL users * tEnv.registerFunction("split", new Split()) // register table function first - * tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)") + * tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)") * * }}} * diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala index e0cbba574b84f..bd2a86836064f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala @@ -312,7 +312,7 @@ class UpdatingPlanCheckerTest { class UpdatePlanCheckerUtil extends StreamTableTestUtil { def verifySqlUniqueKey(query: String, expected: Seq[String]): Unit = { - verifyTableUniqueKey(tableEnv.sql(query), expected) + verifyTableUniqueKey(tableEnv.sqlQuery(query), expected) } def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = { diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 87dbc91b37bc9..f35e0d473d7b6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -34,7 +34,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def checkValidTableName(name: String): Unit = ??? - override def sql(query: String): Table = ??? + override def sqlQuery(query: String): Table = ??? override protected def getBuiltInNormRuleSet: RuleSet = ???