Skip to content

Commit

Permalink
Revert "[FLINK-16535][table] BatchTableSink emitDataSet to consumeDat…
Browse files Browse the repository at this point in the history
…aSet"

This reverts commit 9ffa85a.
  • Loading branch information
pnowojski committed Apr 3, 2020
1 parent 0629d16 commit 6a42bf6
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
Expand Down Expand Up @@ -70,8 +69,8 @@ public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
}

@Override
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet.output(outputFormat);
public void emitDataSet(DataSet<Row> dataSet) {
dataSet.output(outputFormat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.OutputFormatTableSink;
Expand Down Expand Up @@ -69,8 +68,8 @@ public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[]
}

@Override
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
.output(new Utils.CollectHelper<>(accumulatorName, serializer))
.name("SQL Client Batch Collect Sink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

package org.apache.flink.table.sinks;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.table.api.Table;

/**
* Defines an external {@link TableSink} to emit a batch {@link Table}.
/** Defines an external {@link TableSink} to emit a batch {@link Table}.
*
* @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.
*
Expand All @@ -33,9 +30,6 @@
@Deprecated
public interface BatchTableSink<T> extends TableSink<T> {

/**
* Consumes the DataSet and return the {@link DataSink}.
* The returned {@link DataSink} will be used to generate {@link Plan}.
*/
DataSink<?> consumeDataSet(DataSet<T> dataSet);
/** Emits the DataSet. */
void emitDataSet(DataSet<T> dataSet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public CsvTableSink(String path, String fieldDelim) {
}

@Override
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
public void emitDataSet(DataSet<Row> dataSet) {
MapOperator<Row, String> csvRows =
dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim));

Expand All @@ -128,7 +128,7 @@ public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
sink.setParallelism(numFiles);
}

return sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
Expand Down Expand Up @@ -166,7 +165,7 @@ object TestCollectionTableFactory {
class CollectionTableSink(val schema: TableSchema)
extends BatchTableSink[Row]
with AppendStreamTableSink[Row] {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ abstract class BatchTableEnvImpl(
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
// Give the DataSet to the TableSink to emit it.
batchSink.consumeDataSet(result)
batchSink.emitDataSet(result)
case boundedSink: OutputFormatTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -177,7 +176,7 @@ object TestCollectionTableFactory {
class CollectionTableSink(val outputType: RowTypeInfo)
extends BatchTableSink[Row]
with AppendStreamTableSink[Row] {
override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.flink.table.runtime.batch.sql

import java.util.{LinkedList => JLinkedList, Map => JMap}

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
Expand All @@ -39,7 +39,6 @@ import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{Before, Rule, Test}
Expand Down Expand Up @@ -155,7 +154,7 @@ class PartitionableSinkITCase extends AbstractTestBase {
staticPartitions
}

override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
dataSet.map(new MapFunction[Row, String] {
override def map(value: Row): String = value.toString
}).output(new CollectionOutputFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.flink.table.utils

import org.apache.flink.api.common.io.{OutputFormat, RichOutputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
Expand Down Expand Up @@ -112,7 +111,7 @@ object MemoryTableSourceSinkUtil {
new UnsafeMemoryAppendTableSink
}

override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
override def emitDataSet(dataSet: DataSet[Row]): Unit = {
dataSet
.output(new MemoryCollectionOutputFormat)
.name(TableConnectorUtils.generateRuntimeName(this.getClass, getTableSchema.getFieldNames))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -52,8 +51,8 @@ public SpendReportTableSink() {
}

@Override
public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
return dataSet
public void emitDataSet(DataSet<Row> dataSet) {
dataSet
.map(SpendReportTableSink::format)
.output(new LoggerOutputFormat());
}
Expand Down

0 comments on commit 6a42bf6

Please sign in to comment.