Skip to content

Commit

Permalink
[FLINK-20007] SinkTransformationTranslator connect SinkWriter to corr…
Browse files Browse the repository at this point in the history
…ect upstream node

Currently the translation logic does not take into account virtual nodes,
e.g. repartioning. This PR changes this.

This closes apache#13952.
  • Loading branch information
guoweiM authored and kl0u committed Nov 5, 2020
1 parent eb05e50 commit 747566d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private int addWriter(

return addOperatorToStreamGraph(
writer,
input.getId(),
context.getStreamNodeIds(input),
inputTypeInfo,
extractCommittableTypeInformation(sinkTransformation.getSink()),
"Sink Writer:",
Expand Down Expand Up @@ -226,7 +226,8 @@ private int addCommitter(
checkNotNull(committableTypeInfo);

return addOperatorToStreamGraph(
committerFactory, inputId,
committerFactory,
Collections.singletonList(inputId),
committableTypeInfo,
committableTypeInfo,
"Sink Committer:",
Expand Down Expand Up @@ -254,7 +255,8 @@ private void addGlobalCommitter(
}

addOperatorToStreamGraph(
globalCommitterFactory, inputId,
globalCommitterFactory,
Collections.singletonList(inputId),
checkNotNull(extractCommittableTypeInformation(sinkTransformation.getSink())),
null,
"Sink Global Committer:",
Expand All @@ -276,7 +278,7 @@ private int getParallelism(
* Add a operator to the {@link StreamGraph}.
*
* @param operatorFactory The operator factory
* @param inputId The upstream stream node id of the operator
* @param inputs A collection of upstream stream node ids.
* @param inTypeInfo The input type information of the operator
* @param outTypInfo The output type information of the operator
* @param prefix The prefix of the name and uid of the operator
Expand All @@ -287,7 +289,8 @@ private int getParallelism(
* @return The stream node id of the operator
*/
private <IN, OUT> int addOperatorToStreamGraph(
StreamOperatorFactory<OUT> operatorFactory, int inputId,
StreamOperatorFactory<OUT> operatorFactory,
Collection<Integer> inputs,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypInfo,
String prefix,
Expand Down Expand Up @@ -321,7 +324,10 @@ private <IN, OUT> int addOperatorToStreamGraph(
transformationId,
String.format("%s %s", prefix, sinkTransformation.getUid()));
}
streamGraph.addEdge(inputId, transformationId, 0);

for (int input : inputs) {
streamGraph.addEdge(input, transformationId, 0);
}

return transformationId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private StreamGraph buildGraph(
config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
env.configure(config, getClass().getClassLoader());
final DataStreamSource<Integer> src = env.fromElements(1, 2);
final DataStreamSink<Integer> dataStreamSink = src.sinkTo(sink);
final DataStreamSink<Integer> dataStreamSink = src.rebalance().sinkTo(sink);
setSinkProperty(dataStreamSink);
return env.getStreamGraph("test");
}
Expand Down

0 comments on commit 747566d

Please sign in to comment.