Skip to content

Commit

Permalink
This closes apache#3263
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed May 31, 2017
2 parents 19c33df + 5780fc5 commit 4884d48
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void mapPartition(
RuntimeContext runtimeContext = getRuntimeContext();

DoFnRunners.OutputManager outputManager;
if (outputMap == null) {
if (outputMap.size() == 1) {
outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
} else {
// it has some additional outputs
Expand Down Expand Up @@ -146,7 +146,9 @@ static class DoFnOutputManager
@Override
@SuppressWarnings("unchecked")
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
collector.collect(output);
collector.collect(
WindowedValue.of(new RawUnionValue(0 /* single output */, output.getValue()),
output.getTimestamp(), output.getWindows(), output.getPane()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void reduce(
RuntimeContext runtimeContext = getRuntimeContext();

DoFnRunners.OutputManager outputManager;
if (outputMap == null) {
if (outputMap.size() == 1) {
outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
} else {
// it has some additional Outputs
Expand Down

0 comments on commit 4884d48

Please sign in to comment.