Skip to content

Commit

Permalink
[FLINK-10528][table] Remove methods that were deprecated in Flink 1.4.0.
Browse files Browse the repository at this point in the history
- remove TableEnvironment.sql() deprecated with FLINK-6442
- remove StreamTableEnvironment.toDataStream() and TableConversions.toDataStream() deprecated with FLINK-6543
- remove Table.limit() deprecated with FLINK-7821

This closes apache#6826.
  • Loading branch information
fhueske committed Oct 12, 2018
1 parent d1a03dd commit 5e90ed9
Show file tree
Hide file tree
Showing 11 changed files with 10 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,11 +457,11 @@ public void testCassandraTableSink() throws Exception {
DataStreamSource<Row> source = env.fromCollection(rowCollection);

tEnv.registerDataStreamInternal("testFlinkTable", source);
tEnv.registerTableSink("cassandraTable",
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))
.configure(
new String[]{"f0", "f1", "f2"},
new TypeInformation[]{Types.STRING, Types.INT, Types.INT}));
tEnv.registerTableSink(
"cassandraTable",
new String[]{"f0", "f1", "f2"},
new TypeInformation[]{Types.STRING, Types.INT, Types.INT},
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)));

tEnv.sqlQuery("select * from testFlinkTable").insertInto("cassandraTable");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* hSrc.addColumn("fam2", "col1", String.class);
*
* tableEnv.registerTableSource("hTable", hSrc);
* Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
* Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
* }
* </pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
* .build();
*
* tEnv.registerTableSource("orcTable", orcSrc);
* Table res = tableEnv.sql("SELECT * FROM orcTable");
* Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
* }
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
def explain(table: Table): String

/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
* All tables referenced by the query must be registered in the TableEnvironment.
* A [[Table]] is automatically registered when its [[toString]] method is called, for example
* when it is embedded into a String.
* Hence, SQL queries can directly reference a [[Table]] as follows:
*
* {{{
* val table: Table = ...
* // the table is not registered to the table environment
* tEnv.sql(s"SELECT * FROM $table")
* }}}
*
* @deprecated Use sqlQuery() instead.
* @param query The SQL query to evaluate.
* @return The result of the query as Table.
*/
@Deprecated
@deprecated("Please use sqlQuery() instead.")
def sql(query: String): Table = {
sqlQuery(query)
}

/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,115 +133,6 @@ class StreamTableEnvironment(
registerDataStreamInternal(name, dataStream, exprs)
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] =
toAppendStream(table, typeInfo)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](
table: Table,
clazz: Class[T],
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](
table: Table,
typeInfo: TypeInformation[T],
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,55 +127,6 @@ class StreamTableEnvironment(
registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](
table: Table,
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,52 +77,6 @@ class TableConversions(table: Table) {
}
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] =
toAppendStream(queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,51 +782,6 @@ class Table(
orderBy(parsedFields: _*)
}

/**
* Limits a sorted result from an offset position.
* Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
* thus must be preceded by it.
*
* Example:
*
* {{{
* // skips the first 3 rows and returns all following rows.
* tab.orderBy('name.desc).limit(3)
* }}}
*
* @param offset number of records to skip
*
* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
@Deprecated
@deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int): Table = {
new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
}

/**
* Limits a sorted result to a specified number of records from an offset position.
* Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
* thus must be preceded by it.
*
* Example:
*
* {{{
* // skips the first 3 rows and returns the next 5 rows.
* tab.orderBy('name.desc).limit(3, 5)
* }}}
*
* @param offset number of records to skip
* @param fetch number of records to be returned
*
* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
@Deprecated
@deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}

/**
* Limits a sorted result from an offset position.
* Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.apache.flink.util.Collector
*
* // for SQL users
* tEnv.registerFunction("split", new Split()) // register table function first
* tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
* tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
*
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class UpdatingPlanCheckerTest {
class UpdatePlanCheckerUtil extends StreamTableTestUtil {

def verifySqlUniqueKey(query: String, expected: Seq[String]): Unit = {
verifyTableUniqueKey(tableEnv.sql(query), expected)
verifyTableUniqueKey(tableEnv.sqlQuery(query), expected)
}

def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {

override protected def checkValidTableName(name: String): Unit = ???

override def sql(query: String): Table = ???
override def sqlQuery(query: String): Table = ???

override protected def getBuiltInNormRuleSet: RuleSet = ???

Expand Down

0 comments on commit 5e90ed9

Please sign in to comment.