Skip to content

Commit

Permalink
[FLINK-14231][task] Change StreamTask to close operators with StreamO…
Browse files Browse the repository at this point in the history
…peratorWrapper to make the endInput semantics on the chain strict
  • Loading branch information
sunhaibotb authored and pnowojski committed Feb 21, 2020
1 parent 56ce696 commit 5fce538
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -76,7 +77,7 @@
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
private static final long serialVersionUID = 1L;

private static final String STATE_NAME = "_async_wait_operator_state_";
Expand Down Expand Up @@ -234,13 +235,11 @@ public void initializeState(StateInitializationContext context) throws Exception
}

@Override
public void close() throws Exception {
try {
waitInFlightInputsFinished();
}
finally {
super.close();
}
public void endInput() throws Exception {
// we should wait here for the data in flight to be finished. the reason is that the
// timer not in running will be forbidden to fire after this, so that when the async
// operation is stuck, it results in deadlock due to what the timeout timer is not fired
waitInFlightInputsFinished();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
Expand Down Expand Up @@ -278,25 +276,33 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
* @param inputId the input ID starts from 1 which indicates the first input.
*/
public void endHeadOperatorInput(int inputId) throws Exception {
endOperatorInput(getHeadOperator(), inputId);
if (headOperatorWrapper != null) {
headOperatorWrapper.endOperatorInput(inputId);
}
}

/**
* Ends all inputs of the non-head operator specified by {@code streamOperator})
* (now there is only one input for each non-head operator).
*
* @param streamOperator non-head operator for ending the only input.
* Executes {@link StreamOperator#initializeState()} followed by {@link StreamOperator#open()}
* of each operator in the chain of this {@link StreamTask}. State initialization and opening
* happens from <b>tail to head</b> operator in the chain, contrary to {@link StreamOperator#close()}
* which happens <b>head to tail</b>(see {@link #closeOperators(StreamTaskActionExecutor)}).
*/
public void endNonHeadOperatorInput(StreamOperator<?> streamOperator) throws Exception {
checkState(streamOperator != getHeadOperator());
endOperatorInput(streamOperator, 1);
protected void initializeStateAndOpenOperators() throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState();
operator.open();
}
}

private void endOperatorInput(StreamOperator<?> streamOperator, int inputId) throws Exception {
if (streamOperator instanceof BoundedOneInput) {
((BoundedOneInput) streamOperator).endInput();
} else if (streamOperator instanceof BoundedMultiInput) {
((BoundedMultiInput) streamOperator).endInput(inputId);
/**
* Closes all operators in a chain effect way. Closing happens from <b>head to tail</b> operator
* in the chain, contrary to {@link StreamOperator#open()} which happens <b>tail to head</b>
* (see {@link #initializeStateAndOpenOperators()}).
*/
protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
if (headOperatorWrapper != null) {
headOperatorWrapper.close(actionExecutor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,7 @@ private void beforeInvoke() throws Exception {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.

initializeStateAndOpen();
operatorChain.initializeStateAndOpenOperators();
});
}

Expand Down Expand Up @@ -494,14 +493,12 @@ private void afterInvoke() throws Exception {

final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();

// close all operators in a chain effect way
operatorChain.closeOperators(actionExecutor);

// make sure no further checkpoint and notification actions happen.
// we make sure that no other thread is currently in the locked scope before
// we close the operators by trying to acquire the checkpoint scope lock
// we also need to make sure that no triggers fire concurrently with the close logic
// at the same time, this makes sure that during any "regular" exit where still
actionExecutor.runThrowing(() -> {
// this is part of the main logic, so if this fails, the task is considered failed
closeAllOperators();

// make sure no new timers can come
FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);
Expand Down Expand Up @@ -608,32 +605,6 @@ public final boolean isCanceled() {
return canceled;
}

/**
* Execute {@link StreamOperator#close()} of each operator in the chain of this
* {@link StreamTask}. Closing happens from <b>head to tail</b> operator in the chain,
* contrary to {@link StreamOperator#open()} which happens <b>tail to head</b>
* (see {@link #initializeStateAndOpen()}).
*/
private void closeAllOperators() throws Exception {
// We need to close them first to last, since upstream operators in the chain might emit
// elements in their close methods.
boolean isHeadOperator = true;
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators()) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();

// The operators on the chain, except for the head operator, must be one-input operators.
// So after the upstream operator on the chain is closed, the input of its downstream operator
// reaches the end.
if (!isHeadOperator) {
operatorChain.endNonHeadOperatorInput(operator);
} else {
isHeadOperator = false;
}

operator.close();
}
}

private void shutdownAsyncThreads() throws Exception {
if (!asyncOperationsThreadPool.isShutdown()) {
asyncOperationsThreadPool.shutdownNow();
Expand Down Expand Up @@ -961,20 +932,6 @@ private void checkpointState(
checkpointingOperation.executeCheckpointing();
}

/**
* Execute {@link StreamOperator#initializeState()} followed by {@link StreamOperator#open()} of each operator in
* the chain of this {@link StreamTask}. State initialization and opening happens from <b>tail to head</b> operator
* in the chain, contrary to {@link StreamOperator#close()} which happens <b>head to tail</b>
* (see {@link #closeAllOperators()}.
*/
private void initializeStateAndOpen() throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState();
operator.open();
}
}

// ------------------------------------------------------------------------
// Operator Events
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {

// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
testHarness.close();
}

Expand Down Expand Up @@ -347,6 +348,7 @@ private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exceptio
expectedOutput.add(new StreamRecord<>(16, initialTime + 8));

synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
testHarness.close();
}

Expand Down Expand Up @@ -665,6 +667,7 @@ private void testAsyncTimeout(

// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.endInput();
testHarness.close();
}

Expand Down Expand Up @@ -698,6 +701,7 @@ public void testTimeoutCleanup() throws Exception {
}

synchronized (harness.getCheckpointLock()) {
harness.endInput();
harness.close();
}

Expand Down Expand Up @@ -861,6 +865,7 @@ public void testRestartWithFullQueue() throws Exception {
}

synchronized (recoverHarness.getCheckpointLock()) {
recoverHarness.endInput();
recoverHarness.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

import org.junit.Test;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
Expand All @@ -60,32 +60,44 @@ public void testAvoidTaskStarvation() throws Exception {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);

testHarness.setupOperatorChain(new OperatorID(), new ReplicatingMailOperatorFactory())
.chain(new OperatorID(), new ReplicatingMailOperatorFactory(), IntSerializer.INSTANCE)
final int maxProcessingElements = 3;

testHarness.setupOperatorChain(new OperatorID(), new ReplicatingMailOperatorFactory(maxProcessingElements))
.chain(new OperatorID(), new ReplicatingMailOperatorFactory(maxProcessingElements), IntSerializer.INSTANCE)
.finish();

testHarness.invoke();
testHarness.waitForTaskRunning();

testHarness.processElement(new StreamRecord<>(0));
testHarness.processElement(new StreamRecord<>(0));
testHarness.processElement(new StreamRecord<>(0));
for (int i = 0; i < maxProcessingElements; i++) {
testHarness.processElement(new StreamRecord<>(0));
}

testHarness.endInput();
testHarness.waitForTaskCompletion();

// with each input two mails should be processed, one of each operator in the chain
List<Integer> expected = new ArrayList<>();
for (int i = 0; i < maxProcessingElements; i++) {
expected.add(i * 2);
}
List<Integer> numMailsProcessed = testHarness.getOutput().stream()
.map(element -> ((StreamRecord<Integer>) element).getValue())
.collect(Collectors.toList());
assertThat(numMailsProcessed, is(Arrays.asList(0, 2, 4)));
assertThat(numMailsProcessed, is(expected));
}

private static class ReplicatingMailOperatorFactory extends AbstractStreamOperatorFactory<Integer>
implements OneInputStreamOperatorFactory<Integer, Integer>, YieldingOperatorFactory<Integer> {

private final int maxProcessingElements;

private MailboxExecutor mailboxExecutor;

ReplicatingMailOperatorFactory(final int maxProcessingElements) {
this.maxProcessingElements = maxProcessingElements;
}

@Override
public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
this.mailboxExecutor = mailboxExecutor;
Expand All @@ -96,7 +108,7 @@ public <Operator extends StreamOperator<Integer>> Operator createStreamOperator(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Integer>> output) {
ReplicatingMailOperator operator = new ReplicatingMailOperator(mailboxExecutor);
ReplicatingMailOperator operator = new ReplicatingMailOperator(maxProcessingElements, mailboxExecutor);
operator.setProcessingTimeService(processingTimeService);
operator.setup(containingTask, config, output);
return (Operator) operator;
Expand All @@ -114,25 +126,42 @@ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classL

private static class ReplicatingMailOperator extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Integer, Integer> {

private final int maxProcessingElements;

private final ReplicatingMail replicatingMail;

ReplicatingMailOperator(final MailboxExecutor mailboxExecutor) {
replicatingMail = new ReplicatingMail(mailboxExecutor);
private long numProcessedElements = 0;

ReplicatingMailOperator(final int maxProcessingElements, final MailboxExecutor mailboxExecutor) {
this.maxProcessingElements = maxProcessingElements;
this.replicatingMail = new ReplicatingMail(mailboxExecutor);
}

@Override
public void processElement(StreamRecord<Integer> upstreamMailCount) throws Exception {
if (numProcessedElements >= maxProcessingElements) {
return;
}

// for the very first element, enqueue one mail that replicates itself
if (!replicatingMail.hasBeenEnqueued()) {
replicatingMail.run();
}
// output how many mails have been processed so far (from upstream and this operator)
output.collect(new StreamRecord<>(replicatingMail.getMailCount() + upstreamMailCount.getValue()));

if (++numProcessedElements == maxProcessingElements) {
replicatingMail.stop();
}
}
}

private static class ReplicatingMail implements RunnableWithException {
private int mailCount = -1;

private boolean stopped = false;

private final MailboxExecutor mailboxExecutor;

ReplicatingMail(final MailboxExecutor mailboxExecutor) {
Expand All @@ -142,7 +171,9 @@ private static class ReplicatingMail implements RunnableWithException {
@Override
public void run() {
try {
mailboxExecutor.execute(this, "Blocking mail" + ++mailCount);
if (!stopped) {
mailboxExecutor.execute(this, "Blocking mail" + ++mailCount);
}
} catch (RejectedExecutionException e) {
// during shutdown the executor will reject new mails, which is fine for us.
}
Expand All @@ -155,5 +186,9 @@ boolean hasBeenEnqueued() {
int getMailCount() {
return mailCount;
}

void stop() {
stopped = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import scala.concurrent.duration.FiniteDuration;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -602,7 +603,7 @@ public void testQuiesceTimerServiceAfterOpClose() throws Exception {
}

@Test
public void testHandlingEndOfInput() throws Exception {
public void testClosingAllOperatorsOnChainProperly() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO,
Expand All @@ -616,8 +617,6 @@ public void testHandlingEndOfInput() throws Exception {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

testHarness.invoke();
testHarness.waitForTaskRunning();

Expand All @@ -626,15 +625,16 @@ public void testHandlingEndOfInput() throws Exception {

testHarness.waitForTaskCompletion();

expectedOutput.add(new StreamRecord<>("Hello"));
expectedOutput.add(new StreamRecord<>("[Operator0]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator0]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator1]: EndOfInput"));
expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));
ArrayList<StreamRecord<String>> expected = new ArrayList<>();
Collections.addAll(expected,
new StreamRecord<>("Hello"),
new StreamRecord<>("[Operator0]: End of input"),
new StreamRecord<>("[Operator0]: Bye"),
new StreamRecord<>("[Operator1]: End of input"),
new StreamRecord<>("[Operator1]: Bye"));

TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
testHarness.getOutput());
final Object[] output = testHarness.getOutput().toArray();
assertArrayEquals("Output was not correct.", expected.toArray(), output);
}

private static class TestOperator
Expand Down
Loading

0 comments on commit 5fce538

Please sign in to comment.