Skip to content

Commit

Permalink
[FLINK-14230][task] Change the endInput call of the downstream operat…
Browse files Browse the repository at this point in the history
…or to after the upstream operator closes

This change fixes the error of propagating "endInput" on the chain immediately after the input of the head
operator is finished. Correctly, "endInput" of the downstream operator should be invoked only after closing
the upstream operator.
  • Loading branch information
sunhaibotb authored and zhijiangW committed Oct 28, 2019
1 parent 27307b4 commit ecc8020
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ public void run(final Object lockingObject,
if (!isCanceledOrStopped()) {
advanceToEndOfEventTime();

// in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
// so we still need the following call to end the input
synchronized (lockingObject) {
operatorChain.endInput(1);
operatorChain.endHeadOperatorInput(1);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public InputStatus processInput() throws Exception {

if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endInput(1);
operatorChain.endHeadOperatorInput(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private int selectFirstReadingInputIndex() throws IOException {
private void checkFinished(InputStatus status, int inputIndex) throws Exception {
if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endInput(getInputId(inputIndex));
operatorChain.endHeadOperatorInput(getInputId(inputIndex));
inputSelectionHandler.nextSelection();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
Expand All @@ -73,6 +71,7 @@
import java.util.Random;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* The {@code OperatorChain} contains all operators that are executed as one chain within a single
Expand Down Expand Up @@ -105,9 +104,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
*/
private StreamStatus streamStatus = StreamStatus.ACTIVE;

/** The flag that tracks finished inputs. */
private InputSelection finishedInputs = new InputSelection.Builder().build();

public OperatorChain(
StreamTask<OUT, OP> containingTask,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters) {
Expand Down Expand Up @@ -244,46 +240,30 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
}

/**
* Ends an input (specified by {@code inputId}) of the {@link StreamTask}. The {@code inputId}
* is numbered starting from 1, and `1` indicates the first input.
* Ends the head operator input specified by {@code inputId}).
*
* @param inputId The ID of the input.
* @throws Exception if some exception happens in the endInput function of an operator.
* @param inputId the input ID starts from 1 which indicates the first input.
*/
public void endInput(int inputId) throws Exception {
if (finishedInputs.areAllInputsSelected()) {
return;
}

if (headOperator instanceof TwoInputStreamOperator) {
if (finishedInputs.isInputSelected(inputId)) {
return;
}
public void endHeadOperatorInput(int inputId) throws Exception {
endOperatorInput(headOperator, inputId);
}

if (headOperator instanceof BoundedMultiInput) {
((BoundedMultiInput) headOperator).endInput(inputId);
}
/**
* Ends all inputs of the non-head operator specified by {@code streamOperator})
* (now there is only one input for each non-head operator).
*
* @param streamOperator non-head operator for ending the only input.
*/
public void endNonHeadOperatorInput(StreamOperator<?> streamOperator) throws Exception {
checkState(streamOperator != headOperator);
endOperatorInput(streamOperator, 1);
}

finishedInputs = InputSelection.Builder
.from(finishedInputs)
.select(finishedInputs.getInputMask() == 0 ? inputId : -1)
.build();
} else {
// here, the head operator is a stream source or an one-input stream operator,
// so all inputs are finished
finishedInputs = new InputSelection.Builder()
.select(-1)
.build();
}

if (finishedInputs.areAllInputsSelected()) {
// executing #endInput() happens from head to tail operator in the chain
for (int i = allOperators.length - 1; i >= 0; i--) {
StreamOperator<?> operator = allOperators[i];
if (operator instanceof BoundedOneInput) {
((BoundedOneInput) operator).endInput();
}
}
private void endOperatorInput(StreamOperator<?> streamOperator, int inputId) throws Exception {
if (streamOperator instanceof BoundedOneInput) {
((BoundedOneInput) streamOperator).endInput();
} else if (streamOperator instanceof BoundedMultiInput) {
((BoundedMultiInput) streamOperator).endInput(inputId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ private void closeAllOperators() throws Exception {
if (operator != null) {
operator.close();
}

// The operators on the chain, except for the head operator, must be one-input operators.
// So after the upstream operator on the chain is closed, the input of its downstream operator
// reaches the end.
if (i > 0) {
operatorChain.endNonHeadOperatorInput(allOperators[i - 1]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ public void testHandlingEndOfInput() throws Exception {
testHarness.waitForTaskCompletion();

expectedOutput.add(new StreamRecord<>("Hello"));
expectedOutput.add(new StreamRecord<>("[Operator0]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator0]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator1]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));

TestHarnessUtil.assertOutputEquals("Output was not correct.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
Expand Down Expand Up @@ -172,8 +174,8 @@ public void testMarkingEndOfInput() throws Exception {
testHarness
.setupOperatorChain(
new OperatorID(),
new StreamSource<>(new FromElementsFunction<>(
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "Hello")))
new OutputRecordInCloseTestSource<>(
"Source0", new FromElementsFunction<>(StringSerializer.INSTANCE, "Hello")))
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
Expand All @@ -189,6 +191,9 @@ public void testMarkingEndOfInput() throws Exception {
testHarness.waitForTaskCompletion();

expectedOutput.add(new StreamRecord<>("Hello"));
expectedOutput.add(new StreamRecord<>("[Source0]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Source0]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator1]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));

TestHarnessUtil.assertOutputEquals("Output was not correct.",
Expand Down Expand Up @@ -479,5 +484,26 @@ public void cancel() {
running = false;
}
}

private static final class OutputRecordInCloseTestSource<SRC extends SourceFunction<String>>
extends StreamSource<String, SRC> implements BoundedOneInput {

private final String name;

public OutputRecordInCloseTestSource(String name, SRC sourceFunction) {
super(sourceFunction);
this.name = name;
}

@Override
public void endInput() {
output.collect(new StreamRecord<>("[" + name + "]: EndOfInput"));
}

@Override
public void close() {
output.collect(new StreamRecord<>("[" + name + "]: Bye"));
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public void processElement(StreamRecord<String> element) {

@Override
public void endInput() {
output.collect(new StreamRecord<>("[" + name + "]: EndOfInput"));
}

@Override
public void close() throws Exception {
output.collect(new StreamRecord<>("[" + name + "]: Bye"));
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,11 @@ public void testHandlingEndOfInput() throws Exception {
testHarness.waitForTaskCompletion();

expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1"));
expectedOutput.add(new StreamRecord<>("[Operator0-1]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator0-1]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-2"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator0]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator1]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));

TestHarnessUtil.assertOutputEquals("Output was not correct.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public void processElement2(StreamRecord<String> element) {

@Override
public void endInput(int inputId) {
output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: Bye"));
output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: EndOfInput"));
}

@Override
public void close() throws Exception {
output.collect(new StreamRecord<>("[" + name + "]: Bye"));
super.close();
}
}

0 comments on commit ecc8020

Please sign in to comment.