Skip to content

Commit

Permalink
[FLINK-14230][datastream] Remove the BoundedOneInput implementation o…
Browse files Browse the repository at this point in the history
…f AsyncWaitOperator and ContinuousFileReaderOperator

After "endInput" of the downstream operator on the chain is invoked correctly, we revert
the changes of PR#9298 and PR#9221.
  • Loading branch information
sunhaibotb authored and zhijiangW committed Oct 28, 2019
1 parent ecc8020 commit d4be257
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
Expand Down Expand Up @@ -62,7 +61,7 @@
*/
@Internal
public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, BoundedOneInput {
implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -202,11 +201,6 @@ public void close() throws Exception {
output.close();
}

@Override
public void endInput() throws Exception {
waitSplitReaderFinished();
}

private void waitSplitReaderFinished() throws InterruptedException {
// make sure that we hold the checkpointing lock
assert Thread.holdsLock(checkpointLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
Expand Down Expand Up @@ -76,7 +75,7 @@
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;

private static final String STATE_NAME = "_async_wait_operator_state_";
Expand Down Expand Up @@ -234,11 +233,6 @@ public void initializeState(StateInitializationContext context) throws Exception

}

@Override
public void endInput() throws Exception {
waitInFlightInputsFinished();
}

@Override
public void close() throws Exception {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,6 @@ public static void countDown() {
}
}

/**
* AsyncFunction supports a specific delay(ms) before async invocation.
*/
private static class DelayedAsyncFunction extends MyAsyncFunction {

private final long delayed;

public DelayedAsyncFunction(long delayed) {
this.delayed = delayed;
}

@Override
public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
executorService.submit(() -> {
try {
Thread.sleep(delayed);
} catch (InterruptedException e) {
resultFuture.completeExceptionally(e);
}
resultFuture.complete(Collections.singletonList(input * 2));
});
}
}

/**
* A special {@link LazyAsyncFunction} for timeout handling.
* Complete the result future with 3 times the input when the timeout occurred.
Expand Down Expand Up @@ -1023,44 +999,6 @@ private <IN, OUT> SingleOutputStreamOperator<OUT> addAsyncOperatorLegacyChained(
return in.transform("async wait operator", outTypeInfo, factory);
}

/**
* Delay a while before async invocation to check whether end input waits for all elements finished or not.
*/
@Test
public void testEndInput() throws Exception {
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
createTestHarness(new DelayedAsyncFunction(10), -1, 2, AsyncDataStream.OutputMode.ORDERED);

final long initialTime = 0L;
final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<>(6, initialTime + 3));

testHarness.open();

try {
synchronized (testHarness.getCheckpointLock()) {
testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
}

// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
}

TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
} finally {
synchronized (testHarness.getCheckpointLock()) {
testHarness.close();
}
}
}

private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarness(
AsyncFunction<Integer, OUT> function,
long timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -170,12 +169,4 @@ public void processWatermark(Watermark mark) throws Exception {
public long getCurrentWatermark() {
return currentWatermark;
}

public void endInput() throws Exception {
if (getOneInputOperator() instanceof BoundedOneInput) {
((BoundedOneInput) getOneInputOperator()).endInput();
} else {
throw new UnsupportedOperationException("The operator is not BoundedOneInput");
}
}
}

This file was deleted.

0 comments on commit d4be257

Please sign in to comment.