From 2b13a4155fd4284f6092decba867e71eea058043 Mon Sep 17 00:00:00 2001 From: godfrey he Date: Wed, 4 Mar 2020 15:52:37 +0800 Subject: [PATCH] [FLINK-16362][table] Remove deprecated `emitDataStream` method in StreamTableSink This closes #11279 --- docs/dev/table/sourceSinks.md | 12 ++++++------ docs/dev/table/sourceSinks.zh.md | 12 ++++++------ .../cassandra/CassandraAppendTableSink.java | 4 ---- .../ElasticsearchUpsertTableSinkBase.java | 5 ----- ...lasticsearch6UpsertTableSinkFactoryTest.java | 2 +- ...lasticsearch7UpsertTableSinkFactoryTest.java | 2 +- .../connectors/kafka/KafkaTableSinkBase.java | 5 ----- .../KafkaTableSourceSinkFactoryTestBase.java | 4 ++-- .../addons/hbase/HBaseUpsertTableSink.java | 5 ----- .../api/java/io/jdbc/JDBCAppendTableSink.java | 5 ----- .../api/java/io/jdbc/JDBCUpsertTableSink.java | 5 ----- .../java/io/jdbc/JDBCAppendTableSinkTest.java | 2 +- .../gateway/local/CollectStreamTableSink.java | 5 ----- .../gateway/utils/TestTableSinkFactoryBase.java | 5 +++-- .../apache/flink/table/sinks/CsvTableSink.java | 5 ----- .../table/sinks/OutputFormatTableSink.java | 5 ----- .../flink/table/sinks/StreamTableSink.java | 17 +---------------- .../org/apache/flink/table/api/TableUtils.java | 5 ----- .../table/planner/sinks/CollectTableSink.scala | 4 ---- .../utils/TestCollectionTableFactory.scala | 4 ---- .../batch/sql/PartitionableSinkITCase.scala | 5 ----- .../planner/runtime/utils/StreamTestSink.scala | 12 ------------ .../utils/MemoryTableSourceSinkUtil.scala | 5 ----- .../utils/TestCollectionTableFactory.scala | 4 ++-- .../runtime/stream/table/TableSinkITCase.scala | 15 ++++++--------- .../table/utils/MemoryTableSourceSinkUtil.scala | 4 ++-- .../common/table/SpendReportTableSink.java | 6 ++++-- 27 files changed, 35 insertions(+), 129 deletions(-) diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index dcd6829d5b6f8..7b3647f067366 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -452,7 +452,7 @@ The interface looks as follows: {% highlight java %} AppendStreamTableSink implements TableSink { - public void emitDataStream(DataStream dataStream); + public DataStreamSink consumeDataStream(DataStream dataStream); } {% endhighlight %} @@ -461,7 +461,7 @@ AppendStreamTableSink implements TableSink { {% highlight scala %} AppendStreamTableSink[T] extends TableSink[T] { - def emitDataStream(dataStream: DataStream[T]): Unit + def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_] } {% endhighlight %} @@ -484,7 +484,7 @@ RetractStreamTableSink implements TableSink> { public TypeInformation getRecordType(); - public void emitDataStream(DataStream> dataStream); + public DataStreamSink consumeDataStream(DataStream> dataStream); } {% endhighlight %} @@ -495,7 +495,7 @@ RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] - def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit + def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_] } {% endhighlight %} @@ -522,7 +522,7 @@ UpsertStreamTableSink implements TableSink> { public TypeInformation getRecordType(); - public void emitDataStream(DataStream> dataStream); + public DataStreamSink consumeDataStream(DataStream> dataStream); } {% endhighlight %} @@ -537,7 +537,7 @@ UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] - def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit + def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_] } {% endhighlight %} diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md index fab6930dbee41..e9f2c77595987 100644 --- a/docs/dev/table/sourceSinks.zh.md +++ b/docs/dev/table/sourceSinks.zh.md @@ -452,7 +452,7 @@ The interface looks as follows: {% highlight java %} AppendStreamTableSink implements TableSink { - public void emitDataStream(DataStream dataStream); + public DataStreamSink consumeDataStream(DataStream dataStream); } {% endhighlight %} @@ -461,7 +461,7 @@ AppendStreamTableSink implements TableSink { {% highlight scala %} AppendStreamTableSink[T] extends TableSink[T] { - def emitDataStream(dataStream: DataStream[T]): Unit + def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_] } {% endhighlight %} @@ -484,7 +484,7 @@ RetractStreamTableSink implements TableSink> { public TypeInformation getRecordType(); - public void emitDataStream(DataStream> dataStream); + public DataStreamSink consumeDataStream(DataStream> dataStream); } {% endhighlight %} @@ -495,7 +495,7 @@ RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] - def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit + def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_] } {% endhighlight %} @@ -522,7 +522,7 @@ UpsertStreamTableSink implements TableSink> { public TypeInformation getRecordType(); - public void emitDataStream(DataStream> dataStream); + public DataStreamSink consumeDataStream(DataStream> dataStream); } {% endhighlight %} @@ -537,7 +537,7 @@ UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] - def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit + def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_] } {% endhighlight %} diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java index 72afc3bc49e12..af152acccac13 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java @@ -97,8 +97,4 @@ public DataStreamSink consumeDataStream(DataStream dataStream) { } - @Override - public void emitDataStream(DataStream dataStream) { - consumeDataStream(dataStream); - } } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java index 3c1346f720035..7ee5d924010e2 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java @@ -191,11 +191,6 @@ public DataStreamSink consumeDataStream(DataStream> data .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); } - @Override - public void emitDataStream(DataStream> dataStream) { - consumeDataStream(dataStream); - } - @Override public TypeInformation> getOutputType() { return Types.TUPLE(Types.BOOLEAN, getRecordType()); diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java index 257634d1738a9..021bbdd08a845 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java @@ -79,7 +79,7 @@ public void testBuilder() { new StreamExecutionEnvironmentMock(), Types.TUPLE(Types.BOOLEAN, schema.toRowType())); - testSink.emitDataStream(dataStreamMock); + testSink.consumeDataStream(dataStreamMock); final ElasticsearchSink.Builder> expectedBuilder = new ElasticsearchSink.Builder<>( Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)), diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSinkFactoryTest.java index 5837733b852ee..f3493b34e1f13 100644 --- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSinkFactoryTest.java +++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSinkFactoryTest.java @@ -78,7 +78,7 @@ public void testBuilder() { new StreamExecutionEnvironmentMock(), Types.TUPLE(Types.BOOLEAN, schema.toRowType())); - testSink.emitDataStream(dataStreamMock); + testSink.consumeDataStream(dataStreamMock); final ElasticsearchSink.Builder> expectedBuilder = new ElasticsearchSink.Builder<>( Collections.singletonList(new HttpHost(ElasticsearchUpsertTableSinkFactoryTestBase.HOSTNAME, ElasticsearchUpsertTableSinkFactoryTestBase.PORT, ElasticsearchUpsertTableSinkFactoryTestBase.SCHEMA)), diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java index 3cf2fe07137c6..bf3471b783657 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java @@ -103,11 +103,6 @@ public DataStreamSink consumeDataStream(DataStream dataStream) { .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames())); } - @Override - public void emitDataStream(DataStream dataStream) { - consumeDataStream(dataStream); - } - @Override public TypeInformation getOutputType() { return schema.toRowType(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index a313956986773..0875c28d4e079 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -304,7 +304,7 @@ public void testTableSink() { // test Kafka producer final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink; final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); - actualKafkaSink.emitDataStream(streamMock); + actualKafkaSink.consumeDataStream(streamMock); assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); } @@ -357,7 +357,7 @@ public void testTableSinkWithLegacyProperties() { // test Kafka producer final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink; final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType()); - actualKafkaSink.emitDataStream(streamMock); + actualKafkaSink.consumeDataStream(streamMock); assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass())); } diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java index 1dc7f97f2cb24..bbdf93336cabb 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java @@ -100,11 +100,6 @@ public DataStreamSink consumeDataStream(DataStream> data .name(TableConnectorUtils.generateRuntimeName(this.getClass(), tableSchema.getFieldNames())); } - @Override - public void emitDataStream(DataStream> dataStream) { - consumeDataStream(dataStream); - } - @Override public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) { diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java index 7c7e7ba56e6f8..b5bc34cc47087 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java @@ -68,11 +68,6 @@ public DataStreamSink consumeDataStream(DataStream dataStream) { .name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames)); } - @Override - public void emitDataStream(DataStream dataStream) { - consumeDataStream(dataStream); - } - @Override public void emitDataSet(DataSet dataSet) { dataSet.output(outputFormat); diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java index 42e4a384f15a6..77dac7b400ef0 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java @@ -97,11 +97,6 @@ public DataStreamSink consumeDataStream(DataStream> data .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames())); } - @Override - public void emitDataStream(DataStream> dataStream) { - consumeDataStream(dataStream); - } - @Override public void setKeyFields(String[] keys) { this.keyFields = keys; diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java index c35dd395e1575..b3f2b249f5a60 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java @@ -59,7 +59,7 @@ public void testAppendTableSink() throws IOException { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE); - sink.emitDataStream(ds); + sink.consumeDataStream(ds); Collection sinkIds = env .getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, false) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java index 00f0cbed9f7cf..62cd39873eff6 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java @@ -76,11 +76,6 @@ public TypeInformation getRecordType() { return getTableSchema().toRowType(); } - @Override - public void emitDataStream(DataStream> stream) { - consumeDataStream(stream); - } - @Override public DataStreamSink consumeDataStream(DataStream> stream) { // add sink diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java index e934959e40f86..bc201c13aa453 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.factories.StreamTableSinkFactory; @@ -138,8 +139,8 @@ public TableSink configure(String[] fieldNames, TypeInformation[] fieldT } @Override - public void emitDataStream(DataStream dataStream) { - // do nothing + public DataStreamSink consumeDataStream(DataStream dataStream) { + return null; } } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java index 61b7ad32f7dec..dfe5e8763f47d 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java @@ -157,11 +157,6 @@ public DataStreamSink consumeDataStream(DataStream dataStream) { return sink; } - @Override - public void emitDataStream(DataStream dataStream) { - consumeDataStream(dataStream); - } - @Override public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { if (this.fieldNames != null || this.fieldTypes != null) { diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java index 692082fe9eb5d..e650859883d45 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java @@ -37,11 +37,6 @@ public abstract class OutputFormatTableSink implements StreamTableSink { */ public abstract OutputFormat getOutputFormat(); - @Override - public final void emitDataStream(DataStream dataStream) { - consumeDataStream(dataStream); - } - @Override public final DataStreamSink consumeDataStream(DataStream dataStream) { return dataStream diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java index 825d5bbbab7f3..09b72f189b943 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java @@ -28,24 +28,9 @@ */ public interface StreamTableSink extends TableSink { - /** - * Emits the DataStream. - * - * @deprecated This method will be removed in future versions as it returns nothing. - * It is recommended to use {@link #consumeDataStream(DataStream)} instead which - * returns the {@link DataStreamSink}. The returned {@link DataStreamSink} will be - * used to set resources for the sink operator. If the {@link #consumeDataStream(DataStream)} - * is implemented, this method can be empty implementation. - */ - @Deprecated - void emitDataStream(DataStream dataStream); - /** * Consumes the DataStream and return the sink transformation {@link DataStreamSink}. * The returned {@link DataStreamSink} will be used to set resources for the sink operator. */ - default DataStreamSink consumeDataStream(DataStream dataStream) { - emitDataStream(dataStream); - return null; - } + DataStreamSink consumeDataStream(DataStream dataStream); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java index a06cb199b592a..8aaf5c519c175 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/TableUtils.java @@ -153,11 +153,6 @@ public TableSink configure(String[] fieldNames, TypeInformation[] fieldT "This sink is configured by passing a static schema when initiating"); } - @Override - public void emitDataStream(DataStream dataStream) { - throw new UnsupportedOperationException("Deprecated method, use consumeDataStream instead"); - } - @Override public DataStreamSink consumeDataStream(DataStream dataStream) { return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult"); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala index 7535baac0e6bf..a4c92280ccd24 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala @@ -42,10 +42,6 @@ class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeI .name("collect") } - override def emitDataStream(dataStream: DataStream[T]): Unit = { - consumeDataStream(dataStream) - } - override protected def copy: TableSinkBase[T] = { new CollectTableSink(produceOutputType) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala index 170f1188f644b..228ea3c5f669d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala @@ -173,10 +173,6 @@ object TestCollectionTableFactory { override def getTableSchema: TableSchema = schema - override def emitDataStream(dataStream: DataStream[Row]): Unit = { - dataStream.addSink(new UnsafeMemorySinkFunction(schema.toRowType)).setParallelism(1) - } - override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { dataStream.addSink(new UnsafeMemorySinkFunction(schema.toRowType)).setParallelism(1) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala index 12579dc8e0cb1..add6f2ac67b62 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala @@ -306,11 +306,6 @@ private class TestSink( override def getOutputType: RowTypeInfo = rowType - override def emitDataStream(dataStream: DataStream[Row]): Unit = { - dataStream.addSink(new UnsafeMemorySinkFunction(rowType)) - .setParallelism(dataStream.getParallelism) - } - override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { dataStream.addSink(new UnsafeMemorySinkFunction(rowType)) .setParallelism(dataStream.getParallelism) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala index ac54ecab71b0a..24fb71b85e724 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala @@ -310,10 +310,6 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone) .setParallelism(dataStream.getParallelism) } - override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, BaseRow]]): Unit = { - consumeDataStream(dataStream) - } - override def configure( fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TestingUpsertTableSink = { @@ -345,10 +341,6 @@ final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[R .setParallelism(dataStream.getParallelism) } - override def emitDataStream(dataStream: DataStream[Row]): Unit = { - consumeDataStream(dataStream) - } - override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) override def configure( @@ -511,10 +503,6 @@ final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink .setParallelism(dataStream.getParallelism) } - override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]): Unit = { - consumeDataStream(dataStream) - } - override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala index 0d263f6f9cf9d..062a7dab81673 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/MemoryTableSourceSinkUtil.scala @@ -110,10 +110,6 @@ object MemoryTableSourceSinkUtil { .setParallelism(dataStream.getParallelism) .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames)) } - - override def emitDataStream(dataStream: DataStream[Row]): Unit = { - consumeDataStream(dataStream) - } } final class UnsafeMemoryOutputFormatTableSink extends OutputFormatTableSink[Row] { @@ -192,6 +188,5 @@ object MemoryTableSourceSinkUtil { dataStream.writeUsingOutputFormat(new MemoryCollectionOutputFormat) } - override def emitDataStream(dataStream: DataStream[Row]): Unit = ??? } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala index 3b179862021b0..bd31d46419641 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutpu import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource} +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.table.api.TableSchema @@ -188,7 +188,7 @@ object TestCollectionTableFactory { outputType.getFieldTypes } - override def emitDataStream(dataStream: DataStream[Row]): Unit = { + override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index e6f671d9f8c68..3e6b6375f55dc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -18,9 +18,6 @@ package org.apache.flink.table.runtime.stream.table -import java.io.File -import java.lang.{Boolean => JBool} - import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} @@ -39,9 +36,13 @@ import org.apache.flink.table.utils.MemoryTableSourceSinkUtil import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils} import org.apache.flink.types.Row import org.apache.flink.util.Collector + import org.junit.Assert._ import org.junit.Test +import java.io.File +import java.lang.{Boolean => JBool} + import scala.collection.JavaConverters._ import scala.collection.mutable @@ -623,10 +624,6 @@ private[flink] class TestAppendSink extends AppendStreamTableSink[Row] { var fNames: Array[String] = _ var fTypes: Array[TypeInformation[_]] = _ - override def emitDataStream(s: DataStream[Row]): Unit = { - consumeDataStream(s) - } - override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { dataStream.map( new MapFunction[Row, JTuple2[JBool, Row]] { @@ -656,7 +653,7 @@ private[flink] class TestRetractSink extends RetractStreamTableSink[Row] { var fNames: Array[String] = _ var fTypes: Array[TypeInformation[_]] = _ - override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = { + override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = { s.addSink(new RowSink) } @@ -703,7 +700,7 @@ private[flink] class TestUpsertSink( override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) - override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = { + override def consumeDataStream(s: DataStream[JTuple2[JBool, Row]]): DataStreamSink[_] = { s.addSink(new RowSink) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala index 5559c629efb57..5b2e9d2e36fb0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -116,7 +116,7 @@ object MemoryTableSourceSinkUtil { .name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames)) } - override def emitDataStream(dataStream: DataStream[Row]): Unit = { + override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { val inputParallelism = dataStream.getParallelism dataStream .addSink(new MemoryAppendSink) diff --git a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java index ae167fd6c2420..a143306d07fa7 100644 --- a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java +++ b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.BatchTableSink; @@ -57,8 +58,8 @@ public void emitDataSet(DataSet dataSet) { } @Override - public void emitDataStream(DataStream dataStream) { - dataStream + public DataStreamSink consumeDataStream(DataStream dataStream) { + return dataStream .map(SpendReportTableSink::format) .writeUsingOutputFormat(new LoggerOutputFormat()) .setParallelism(dataStream.getParallelism()); @@ -93,4 +94,5 @@ private static String format(Row row) { //noinspection MalformedFormatString return String.format("%s, %s, $%.2f", row.getField(0), row.getField(1), row.getField(2)); } + }