Skip to content

Commit

Permalink
[FLINK-19736] Add SinkTransformation along with Translator
Browse files Browse the repository at this point in the history
This patch does three things:

1. Introduce 'SinkTransformation' to represent the new sink api.

2. Introduce 'SinkTransformationTranslator' that translates the
'SinkTransformation' to the corresponding runtime operators.

3. Make DataStream Sdk support new sink api.
  • Loading branch information
guoweiM authored and aljoscha committed Nov 2, 2020
1 parent c6043a5 commit 34d143f
Show file tree
Hide file tree
Showing 17 changed files with 1,428 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

package org.apache.flink.api.common.typeutils;

import java.io.IOException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Abstract test base for type information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
Expand Down Expand Up @@ -1303,6 +1305,22 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
return sink;
}

/**
* Adds the given {@link Sink} to this DataStream. Only streams with sinks added
* will be executed once the {@link StreamExecutionEnvironment#execute()} method is called.
*
* @param sink The user defined sink.
*
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

return new DataStreamSink<>(this, sink);
}

/**
* Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
* of the given DataStream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;

/**
* A Stream Sink. This is used for emitting elements from a streaming topology.
Expand All @@ -33,19 +36,37 @@
@Public
public class DataStreamSink<T> {

private final LegacySinkTransformation<T> transformation;
private final PhysicalTransformation<T> transformation;

@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
this.transformation = new LegacySinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
this.transformation = (PhysicalTransformation<T>) new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
operator,
inputStream.getExecutionEnvironment().getParallelism());
}

@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) {
transformation = (PhysicalTransformation<T>) new SinkTransformation<>(
inputStream.getTransformation(),
sink,
"Unnamed",
inputStream.getExecutionEnvironment().getParallelism());
inputStream.getExecutionEnvironment().addOperator(transformation);
}

/**
* Returns the transformation that contains the actual sink operator of this sink.
*/
@Internal
public LegacySinkTransformation<T> getTransformation() {
return transformation;
if (transformation instanceof LegacySinkTransformation) {
return (LegacySinkTransformation<T>) transformation;
} else {
throw new IllegalStateException("There is no the LegacySinkTransformation.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.util.graph.StreamGraphUtils;

import java.util.Collection;

Expand Down Expand Up @@ -88,27 +88,22 @@ private void configure(final T transformation, final Context context) {
final StreamGraph streamGraph = context.getStreamGraph();
final int transformationId = transformation.getId();

if (transformation.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transformationId, transformation.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transformationId, context.getDefaultBufferTimeout());
}
StreamGraphUtils.configureBufferTimeout(
streamGraph,
transformationId,
transformation,
context.getDefaultBufferTimeout());

if (transformation.getUid() != null) {
streamGraph.setTransformationUID(transformationId, transformation.getUid());
}
if (transformation.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transformationId, transformation.getUserProvidedNodeHash());
streamGraph.setTransformationUserHash(
transformationId,
transformation.getUserProvidedNodeHash());
}

if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transformation instanceof PhysicalTransformation
&& transformation.getUserProvidedNodeHash() == null
&& transformation.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transformation.getName());
}
}
StreamGraphUtils.validateTransformationUid(streamGraph, transformation);

if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
streamGraph.setResources(transformationId, transformation.getMinResources(), transformation.getPreferredResources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ void setSortedInputs(int vertexId, boolean shouldSort) {
getStreamNode(vertexId).setSortedInputs(shouldSort);
}

void setTransformationUID(Integer nodeId, String transformationId) {
public void setTransformationUID(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationUID(transformationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
Expand All @@ -56,6 +57,7 @@
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
Expand Down Expand Up @@ -153,6 +155,7 @@ public class StreamGraphGenerator {
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package org.apache.flink.streaming.api.transformations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.operators.ChainingStrategy;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link Transformation} for {@link Sink}.
*
* @param <InputT> The input type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <CommT> The committable type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <WriterStateT> The state type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <GlobalCommT> The global committable type of the {@link org.apache.flink.api.connector.sink.GlobalCommitter}
*/
@Internal
public class SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> extends PhysicalTransformation<Object> {

private final Transformation<InputT> input;

private final Sink<InputT, CommT, WriterStateT, GlobalCommT> sink;

private ChainingStrategy chainingStrategy;

public SinkTransformation(
Transformation<InputT> input,
Sink<InputT, CommT, WriterStateT, GlobalCommT> sink,
String name,
int parallelism) {
super(name, TypeExtractor.getForClass(Object.class), parallelism);
this.input = checkNotNull(input);
this.sink = checkNotNull(sink);
}

@Override
public void setChainingStrategy(ChainingStrategy strategy) {
chainingStrategy = checkNotNull(strategy);
}

@Override
public List<Transformation<?>> getTransitivePredecessors() {
final List<Transformation<?>> result = Lists.newArrayList();
result.add(this);
result.addAll(input.getTransitivePredecessors());
return result;
}

@Override
public List<Transformation<?>> getInputs() {
return Collections.singletonList(input);
}

@Override
public void setUidHash(String uidHash) {
throw new UnsupportedOperationException("Setting a UidHash is not supported for SinkTransformation.");
}

@Override
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
throw new UnsupportedOperationException(
"Do not support set resources for SinkTransformation.");
}

@Override
public Optional<Integer> declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase managedMemoryUseCase,
int weight) {
throw new UnsupportedOperationException(
"Declaring managed memory use cases is not supported for SinkTransformation.");
}

@Override
public void declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase managedMemoryUseCase) {
throw new UnsupportedOperationException(
"Declaring managed memory use cases is not supported for SinkTransformation.");
}

public ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}

public Sink<InputT, CommT, WriterStateT, GlobalCommT> getSink() {
return sink;
}
}
Loading

0 comments on commit 34d143f

Please sign in to comment.