Skip to content

Commit

Permalink
[FLINK-19511] Rename the 'SinkTransformation' to 'LegacySinkTransform…
Browse files Browse the repository at this point in the history
…ation'

We use the 'SinkTransformation' to represent the new sink API after the FLIP-143.
So this commit renames the 'SinkTransformation' to 'LegacySinkTransformation'.
  • Loading branch information
guoweiM authored and kl0u committed Oct 13, 2020
1 parent 401f56f commit f6f9cb7
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.types.Row;

Expand Down Expand Up @@ -60,7 +60,7 @@ private CassandraSink(SingleOutputStreamOperator<IN> sink) {
useDataStreamSink = false;
}

private SinkTransformation<IN> getSinkTransformation() {
private LegacySinkTransformation<IN> getSinkTransformation() {
return sink1.getTransformation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -364,7 +364,7 @@ private static <T, K> void addKafkaShuffle(
inputStream.getTransformation().getOutputType();

StreamKafkaShuffleSink<T> shuffleSinkOperator = new StreamKafkaShuffleSink<>(kafkaShuffleProducer);
SinkTransformation<T> transformation = new SinkTransformation<>(
LegacySinkTransformation<T> transformation = new LegacySinkTransformation<>(
inputStream.getTransformation(),
"kafka_shuffle",
shuffleSinkOperator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;

/**
* A Stream Sink. This is used for emitting elements from a streaming topology.
Expand All @@ -33,18 +33,18 @@
@Public
public class DataStreamSink<T> {

private final SinkTransformation<T> transformation;
private final LegacySinkTransformation<T> transformation;

@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
this.transformation = new LegacySinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
}

/**
* Returns the transformation that contains the actual sink operator of this sink.
*/
@Internal
public SinkTransformation<T> getTransformation() {
public LegacySinkTransformation<T> getTransformation() {
return transformation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
Expand Down Expand Up @@ -321,8 +321,8 @@ private Collection<Integer> legacyTransform(Transformation<?> transform) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof LegacySourceTransformation<?>) {
transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof LegacySinkTransformation<?>) {
transformedIds = transformLegacySink((LegacySinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
Expand Down Expand Up @@ -633,9 +633,9 @@ private <T> Collection<Integer> transformLegacySource(LegacySourceTransformation
}

/**
* Transforms a {@code SinkTransformation}.
* Transforms a {@code LegacySinkTransformation}.
*/
private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
private <T> Collection<Integer> transformLegacySink(LegacySinkTransformation<T> sink) {
List<Transformation<?>> inputs = sink.getInputs();
checkState(inputs.size() == 1);
Transformation<?> input = inputs.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;

/**
* A {@link DataStreamSink} which is used to collect results of a data stream.
Expand All @@ -30,19 +30,19 @@
@Internal
public class CollectStreamSink<T> extends DataStreamSink<T> {

private final SinkTransformation<T> transformation;
private final LegacySinkTransformation<T> transformation;

public CollectStreamSink(DataStream<T> inputStream, CollectSinkOperatorFactory<T> factory) {
super(inputStream, (CollectSinkOperator<T>) factory.getOperator());
this.transformation = new SinkTransformation<>(
this.transformation = new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Collect Stream Sink",
factory,
1);
}

@Override
public SinkTransformation<T> getTransformation() {
public LegacySinkTransformation<T> getTransformation() {
return transformation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import java.util.List;

/**
* This Transformation represents a Sink.
* This Transformation represents a stream Sink.
*
* @param <T> The type of the elements in the input {@code SinkTransformation}
* @param <T> The type of the elements in the input {@code LegacySinkTransformation}
*/
@Internal
public class SinkTransformation<T> extends PhysicalTransformation<Object> {
public class LegacySinkTransformation<T> extends PhysicalTransformation<Object> {

private final Transformation<T> input;

Expand All @@ -52,22 +52,22 @@ public class SinkTransformation<T> extends PhysicalTransformation<Object> {
private TypeInformation<?> stateKeyType;

/**
* Creates a new {@code SinkTransformation} from the given input {@code Transformation}.
* Creates a new {@code LegacySinkTransformation} from the given input {@code Transformation}.
*
* @param input The input {@code Transformation}
* @param name The name of the {@code Transformation}, this will be shown in Visualizations and the Log
* @param operator The sink operator
* @param parallelism The parallelism of this {@code SinkTransformation}
* @param parallelism The parallelism of this {@code LegacySinkTransformation}
*/
public SinkTransformation(
public LegacySinkTransformation(
Transformation<T> input,
String name,
StreamSink<T> operator,
int parallelism) {
this(input, name, SimpleOperatorFactory.of(operator), parallelism);
}

public SinkTransformation(
public LegacySinkTransformation(
Transformation<T> input,
String name,
StreamOperatorFactory<Object> operatorFactory,
Expand All @@ -83,7 +83,7 @@ public StreamSink<T> getOperator() {
}

/**
* Returns the {@link StreamOperatorFactory} of this {@code SinkTransformation}.
* Returns the {@link StreamOperatorFactory} of this {@code LegacySinkTransformation}.
*/
public StreamOperatorFactory<Object> getOperatorFactory() {
return operatorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory
import org.apache.flink.streaming.api.transformations.SinkTransformation
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
Expand Down Expand Up @@ -96,7 +96,7 @@ class CommonPhysicalSink (
fieldNames
)

new SinkTransformation(
new LegacySinkTransformation(
inputTransformation,
getRelDetailedDescription,
SimpleOperatorFactory.of(operator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.common.typeutils.TypeComparator
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, SinkTransformation, TwoInputTransformation}
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, LegacySinkTransformation, TwoInputTransformation}
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
Expand Down Expand Up @@ -114,7 +114,7 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
@scala.annotation.tailrec
def findTwoInputTransform(t: Transformation[_]): TwoInputTransformation[_, _, _] = {
t match {
case sink: SinkTransformation[_] => findTwoInputTransform(sink.getInputs.get(0))
case sink: LegacySinkTransformation[_] => findTwoInputTransform(sink.getInputs.get(0))
case one: OneInputTransformation[_, _] => findTwoInputTransform(one.getInputs.get(0))
case two: TwoInputTransformation[_, _, _] => two
}
Expand Down

0 comments on commit f6f9cb7

Please sign in to comment.