Skip to content

Commit

Permalink
[FLINK-16535][table] rename BatchTableSink#emitDataSet to BatchTableS…
Browse files Browse the repository at this point in the history
…ink#consumeDataSet, and return DataSink
  • Loading branch information
godfreyhe authored and wuchong committed Apr 8, 2020
1 parent 3fd568a commit 980e31d
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
Expand Down Expand Up @@ -69,8 +70,8 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet.output(outputFormat);
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet.output(outputFormat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.OutputFormatTableSink;
Expand Down Expand Up @@ -68,8 +69,8 @@ public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[]
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet
.output(new Utils.CollectHelper<>(accumulatorName, serializer))
.name("SQL Client Batch Collect Sink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.table.sinks;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.table.api.Table;

/** Defines an external {@link TableSink} to emit a batch {@link Table}.
/**
* Defines an external {@link TableSink} to emit a batch {@link Table}.
*
* @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.
*
Expand All @@ -30,6 +33,9 @@
@Deprecated
public interface BatchTableSink<T> extends TableSink<T> {

/** Emits the DataSet. */
void emitDataSet(DataSet<T> dataSet);
/**
* Consumes the DataSet and return the {@link DataSink}.
* The returned {@link DataSink} will be used to generate {@link Plan}.
*/
DataSink<?> consumeDataSet(DataSet<T> dataSet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public CsvTableSink(String path, String fieldDelim) {
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
MapOperator<Row, String> csvRows =
dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));

Expand All @@ -128,7 +128,7 @@ public void emitDataSet(DataSet<Row> dataSet) {
sink.setParallelism(numFiles);
}

sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
return sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
Expand Down Expand Up @@ -165,7 +166,7 @@ object TestCollectionTableFactory {
class CollectionTableSink(val schema: TableSchema)
extends BatchTableSink[Row]
with AppendStreamTableSink[Row] {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ abstract class BatchTableEnvImpl(
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
// Give the DataSet to the TableSink to emit it.
batchSink.emitDataSet(result)
batchSink.consumeDataSet(result)
case boundedSink: OutputFormatTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -176,7 +177,7 @@ object TestCollectionTableFactory {
class CollectionTableSink(val outputType: RowTypeInfo)
extends BatchTableSink[Row]
with AppendStreamTableSink[Row] {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.api.{DataTypes, SqlDialect, TableSchema}
import org.apache.flink.table.api.{DataTypes, TableSchema}
import org.apache.flink.table.factories.utils.TestCollectionTableFactory.TestCollectionInputFormat
import org.apache.flink.table.runtime.batch.sql.PartitionableSinkITCase._
import org.apache.flink.table.sinks.{BatchTableSink, PartitionableTableSink, TableSink}
import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{Before, Rule, Test}
Expand Down Expand Up @@ -154,7 +156,7 @@ class PartitionableSinkITCase extends AbstractTestBase {
staticPartitions
}

override def emitDataSet(dataSet: DataSet[Row]): Unit = {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
dataSet.map(new MapFunction[Row, String] {
override def map(value: Row): String = value.toString
}).output(new CollectionOutputFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.utils

import org.apache.flink.api.common.io.{OutputFormat, RichOutputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -111,7 +112,7 @@ object MemoryTableSourceSinkUtil {
new UnsafeMemoryAppendTableSink
}

override def emitDataSet(dataSet: DataSet[Row]): Unit = {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
dataSet
.output(new MemoryCollectionOutputFormat)
.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -51,8 +52,8 @@ public SpendReportTableSink() {
}

@Override
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet
.map(SpendReportTableSink::format)
.output(new LoggerOutputFormat());
}
Expand Down

0 comments on commit 980e31d

Please sign in to comment.