Skip to content

Commit

Permalink
[FLINK-16362][table] Remove deprecated emitDataStream method in Str…
Browse files Browse the repository at this point in the history
…eamTableSink

This closes apache#11279
  • Loading branch information
godfreyhe committed Mar 4, 2020
1 parent b3f8e33 commit 2b13a41
Show file tree
Hide file tree
Showing 27 changed files with 35 additions and 129 deletions.
12 changes: 6 additions & 6 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ The interface looks as follows:
{% highlight java %}
AppendStreamTableSink<T> implements TableSink<T> {

public void emitDataStream(DataStream<T> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -461,7 +461,7 @@ AppendStreamTableSink<T> implements TableSink<T> {
{% highlight scala %}
AppendStreamTableSink[T] extends TableSink[T] {

def emitDataStream(dataStream: DataStream[T]): Unit
def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand All @@ -484,7 +484,7 @@ RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

public TypeInformation<T> getRecordType();

public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -495,7 +495,7 @@ RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

def getRecordType: TypeInformation[T]

def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand All @@ -522,7 +522,7 @@ UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

public TypeInformation<T> getRecordType();

public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -537,7 +537,7 @@ UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

def getRecordType: TypeInformation[T]

def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand Down
12 changes: 6 additions & 6 deletions docs/dev/table/sourceSinks.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ The interface looks as follows:
{% highlight java %}
AppendStreamTableSink<T> implements TableSink<T> {

public void emitDataStream(DataStream<T> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -461,7 +461,7 @@ AppendStreamTableSink<T> implements TableSink<T> {
{% highlight scala %}
AppendStreamTableSink[T] extends TableSink[T] {

def emitDataStream(dataStream: DataStream[T]): Unit
def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand All @@ -484,7 +484,7 @@ RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

public TypeInformation<T> getRecordType();

public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -495,7 +495,7 @@ RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

def getRecordType: TypeInformation[T]

def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand All @@ -522,7 +522,7 @@ UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

public TypeInformation<T> getRecordType();

public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
{% endhighlight %}
</div>
Expand All @@ -537,7 +537,7 @@ UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

def getRecordType: TypeInformation[T]

def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
def consumeDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): DataStreamSink[_]
}
{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,4 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
consumeDataStream(dataStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> data
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}

@Override
public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
return Types.TUPLE(Types.BOOLEAN, getRecordType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testBuilder() {
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

testSink.emitDataStream(dataStreamMock);
testSink.consumeDataStream(dataStreamMock);

final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(HOSTNAME, PORT, SCHEMA)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testBuilder() {
new StreamExecutionEnvironmentMock(),
Types.TUPLE(Types.BOOLEAN, schema.toRowType()));

testSink.emitDataStream(dataStreamMock);
testSink.consumeDataStream(dataStreamMock);

final ElasticsearchSink.Builder<Tuple2<Boolean, Row>> expectedBuilder = new ElasticsearchSink.Builder<>(
Collections.singletonList(new HttpHost(ElasticsearchUpsertTableSinkFactoryTestBase.HOSTNAME, ElasticsearchUpsertTableSinkFactoryTestBase.PORT, ElasticsearchUpsertTableSinkFactoryTestBase.SCHEMA)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
consumeDataStream(dataStream);
}

@Override
public TypeInformation<Row> getOutputType() {
return schema.toRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void testTableSink() {
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.emitDataStream(streamMock);
actualKafkaSink.consumeDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}

Expand Down Expand Up @@ -357,7 +357,7 @@ public void testTableSinkWithLegacyProperties() {
// test Kafka producer
final KafkaTableSinkBase actualKafkaSink = (KafkaTableSinkBase) actualSink;
final DataStreamMock streamMock = new DataStreamMock(new StreamExecutionEnvironmentMock(), schema.toRowType());
actualKafkaSink.emitDataStream(streamMock);
actualKafkaSink.consumeDataStream(streamMock);
assertTrue(getExpectedFlinkKafkaProducer().isAssignableFrom(streamMock.sinkFunction.getClass()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> data
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), tableSchema.getFieldNames()));
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}

@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
consumeDataStream(dataStream);
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet.output(outputFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> data
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}

@Override
public void setKeyFields(String[] keys) {
this.keyFields = keys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testAppendTableSink() throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
sink.emitDataStream(ds);
sink.consumeDataStream(ds);

Collection<Integer> sinkIds = env
.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public TypeInformation<Row> getRecordType() {
return getTableSchema().toRowType();
}

@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
consumeDataStream(stream);
}

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
// add sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.factories.StreamTableSinkFactory;
Expand Down Expand Up @@ -138,8 +139,8 @@ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldT
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
// do nothing
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return sink;
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
consumeDataStream(dataStream);
}

@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (this.fieldNames != null || this.fieldTypes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
*/
public abstract OutputFormat<T> getOutputFormat();

@Override
public final void emitDataStream(DataStream<T> dataStream) {
consumeDataStream(dataStream);
}

@Override
public final DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {
return dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,9 @@
*/
public interface StreamTableSink<T> extends TableSink<T> {

/**
* Emits the DataStream.
*
* @deprecated This method will be removed in future versions as it returns nothing.
* It is recommended to use {@link #consumeDataStream(DataStream)} instead which
* returns the {@link DataStreamSink}. The returned {@link DataStreamSink} will be
* used to set resources for the sink operator. If the {@link #consumeDataStream(DataStream)}
* is implemented, this method can be empty implementation.
*/
@Deprecated
void emitDataStream(DataStream<T> dataStream);

/**
* Consumes the DataStream and return the sink transformation {@link DataStreamSink}.
* The returned {@link DataStreamSink} will be used to set resources for the sink operator.
*/
default DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) {
emitDataStream(dataStream);
return null;
}
DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldT
"This sink is configured by passing a static schema when initiating");
}

@Override
public void emitDataStream(DataStream<Row> dataStream) {
throw new UnsupportedOperationException("Deprecated method, use consumeDataStream instead");
}

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeI
.name("collect")
}

override def emitDataStream(dataStream: DataStream[T]): Unit = {
consumeDataStream(dataStream)
}

override protected def copy: TableSinkBase[T] = {
new CollectTableSink(produceOutputType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ object TestCollectionTableFactory {

override def getTableSchema: TableSchema = schema

override def emitDataStream(dataStream: DataStream[Row]): Unit = {
dataStream.addSink(new UnsafeMemorySinkFunction(schema.toRowType)).setParallelism(1)
}

override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
dataStream.addSink(new UnsafeMemorySinkFunction(schema.toRowType)).setParallelism(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,6 @@ private class TestSink(

override def getOutputType: RowTypeInfo = rowType

override def emitDataStream(dataStream: DataStream[Row]): Unit = {
dataStream.addSink(new UnsafeMemorySinkFunction(rowType))
.setParallelism(dataStream.getParallelism)
}

override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
dataStream.addSink(new UnsafeMemorySinkFunction(rowType))
.setParallelism(dataStream.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,6 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
.setParallelism(dataStream.getParallelism)
}

override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, BaseRow]]): Unit = {
consumeDataStream(dataStream)
}

override def configure(
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]): TestingUpsertTableSink = {
Expand Down Expand Up @@ -345,10 +341,6 @@ final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[R
.setParallelism(dataStream.getParallelism)
}

override def emitDataStream(dataStream: DataStream[Row]): Unit = {
consumeDataStream(dataStream)
}

override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)

override def configure(
Expand Down Expand Up @@ -511,10 +503,6 @@ final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink
.setParallelism(dataStream.getParallelism)
}

override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]): Unit = {
consumeDataStream(dataStream)
}

override def getRecordType: TypeInformation[Row] =
new RowTypeInfo(fTypes, fNames)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ object MemoryTableSourceSinkUtil {
.setParallelism(dataStream.getParallelism)
.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
}

override def emitDataStream(dataStream: DataStream[Row]): Unit = {
consumeDataStream(dataStream)
}
}

final class UnsafeMemoryOutputFormatTableSink extends OutputFormatTableSink[Row] {
Expand Down Expand Up @@ -192,6 +188,5 @@ object MemoryTableSourceSinkUtil {
dataStream.writeUsingOutputFormat(new MemoryCollectionOutputFormat)
}

override def emitDataStream(dataStream: DataStream[Row]): Unit = ???
}
}
Loading

0 comments on commit 2b13a41

Please sign in to comment.