Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0 #6826

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[FLINK-10528] [table] Remove methods that were deprecated in Flink 1.…
…4.0.

- remove TableEnvironment.sql() deprecated with FLINK-6442
- remove StreamTableEnvironment.toDataStream() and TableConversions.toDataStream() deprecated with FLINK-6543
- remove Table.limit() deprecated with FLINK-7821
  • Loading branch information
fhueske committed Oct 11, 2018
commit c7bffc0cb240031633b74c7bacc4fea3c40ac4eb
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public void testCassandraTableSink() throws Exception {

tEnv.registerDataStreamInternal("testFlinkTable", source);

tEnv.sql("select * from testFlinkTable").writeToSink(
tEnv.sqlQuery("select * from testFlinkTable").writeToSink(
new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY)));

env.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* hSrc.addColumn("fam2", "col1", String.class);
*
* tableEnv.registerTableSource("hTable", hSrc);
* Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
* Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1");
* }
* </pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
* .build();
*
* tEnv.registerTableSource("orcTable", orcSrc);
* Table res = tableEnv.sql("SELECT * FROM orcTable");
* Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
* }
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
def explain(table: Table): String

/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
* All tables referenced by the query must be registered in the TableEnvironment.
* A [[Table]] is automatically registered when its [[toString]] method is called, for example
* when it is embedded into a String.
* Hence, SQL queries can directly reference a [[Table]] as follows:
*
* {{{
* val table: Table = ...
* // the table is not registered to the table environment
* tEnv.sql(s"SELECT * FROM $table")
* }}}
*
* @deprecated Use sqlQuery() instead.
* @param query The SQL query to evaluate.
* @return The result of the query as Table.
*/
@Deprecated
@deprecated("Please use sqlQuery() instead.")
def sql(query: String): Table = {
sqlQuery(query)
}

/**
* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,115 +133,6 @@ class StreamTableEnvironment(
registerDataStreamInternal(name, dataStream, exprs)
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] =
toAppendStream(table, typeInfo)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataStream]].
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](
table: Table,
clazz: Class[T],
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
* @deprecated This method only supports conversion of append-only tables. In order to
* make this more explicit in the future, please use toAppendStream() instead.
*/
@Deprecated
def toDataStream[T](
table: Table,
typeInfo: TypeInformation[T],
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,55 +127,6 @@ class StreamTableEnvironment(
registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](
table: Table,
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,52 +77,6 @@ class TableConversions(table: Table) {
}
}

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] =
toAppendStream(queryConfig)

/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,51 +782,6 @@ class Table(
orderBy(parsedFields: _*)
}

/**
* Limits a sorted result from an offset position.
* Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
* thus must be preceded by it.
*
* Example:
*
* {{{
* // skips the first 3 rows and returns all following rows.
* tab.orderBy('name.desc).limit(3)
* }}}
*
* @param offset number of records to skip
*
* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
@Deprecated
@deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int): Table = {
new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
}

/**
* Limits a sorted result to a specified number of records from an offset position.
* Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
* thus must be preceded by it.
*
* Example:
*
* {{{
* // skips the first 3 rows and returns the next 5 rows.
* tab.orderBy('name.desc).limit(3, 5)
* }}}
*
* @param offset number of records to skip
* @param fetch number of records to be returned
*
* @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
@Deprecated
@deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}

/**
* Limits a sorted result from an offset position.
* Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.apache.flink.util.Collector
*
* // for SQL users
* tEnv.registerFunction("split", new Split()) // register table function first
* tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
* tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
*
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class UpdatingPlanCheckerTest {
class UpdatePlanCheckerUtil extends StreamTableTestUtil {

def verifySqlUniqueKey(query: String, expected: Seq[String]): Unit = {
verifyTableUniqueKey(tableEnv.sql(query), expected)
verifyTableUniqueKey(tableEnv.sqlQuery(query), expected)
}

def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {

override protected def checkValidTableName(name: String): Unit = ???

override def sql(query: String): Table = ???
override def sqlQuery(query: String): Table = ???

override protected def getBuiltInNormRuleSet: RuleSet = ???

Expand Down