Skip to content

Commit

Permalink
[FLINK-14231][task] Add StreamOperatorWrapper that handles the close,…
Browse files Browse the repository at this point in the history
… endInput and other related logic of an operator

For each operator in the operator chain, its inputs must end completely before executing the "endInput()" method. For
the operator chain in a task, such as "OP1 - > OP2 - > ...", after the (source/network) input of OP1 are finished,
the operators on the chain are closed in the following order:

1. quiesce ProcessingTimeService of OP1 to prevent the pending timers from firing, but wait the timers in running
   to finish.
2. call OP1#close()
3. call OP2#endInput()
4. quiesce ProcessingTimeService of OP2 to prevent the pending timers from firing, but wait the timers in running
   to finish.
5. call OP2#close()
...
  • Loading branch information
sunhaibotb authored and pnowojski committed Feb 21, 2020
1 parent ab642cb commit 56ce696
Show file tree
Hide file tree
Showing 12 changed files with 705 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.state.api.output;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
Expand All @@ -28,13 +29,15 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

import java.util.Iterator;
import java.util.Optional;

/**
* A stream task that pulls elements from an {@link Iterable} instead of the network. After all
Expand Down Expand Up @@ -67,16 +70,17 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo
@Override
protected void init() throws Exception {
Preconditions.checkState(
operatorChain.getAllOperators().length == 1,
operatorChain.getNumberOfOperators() == 1,
"BoundedStreamTask's should only run a single operator");

// re-initialize the operator with the correct collector.
StreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader());
headOperator = StreamOperatorFactoryUtil.createOperator(
Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(
operatorFactory,
this,
configuration,
new CollectorWrapper<>(collector));
headOperator = headOperatorAndTimeService.f0;
headOperator.initializeState();
headOperator.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import java.util.Optional;

/**
* A utility to instantiate new operators with a given factory.
*/
Expand All @@ -34,9 +37,9 @@ public class StreamOperatorFactoryUtil {
* @param containingTask the containing task.
* @param configuration the configuration of the operator.
* @param output the output of the operator.
* @return a newly created and configured operator.
* @return a newly created and configured operator, and the {@link ProcessingTimeService} instance it can access.
*/
public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(
public static <OUT, OP extends StreamOperator<OUT>> Tuple2<OP, Optional<ProcessingTimeService>> createOperator(
StreamOperatorFactory<OUT> operatorFactory,
StreamTask<OUT, ?> containingTask,
StreamConfig configuration,
Expand All @@ -48,11 +51,13 @@ public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(
((YieldingOperatorFactory) operatorFactory).setMailboxExecutor(mailboxExecutor);
}

ProcessingTimeService processingTimeService = null;
if (operatorFactory instanceof ProcessingTimeServiceAware) {
ProcessingTimeService processingTimeService = containingTask.getProcessingTimeServiceFactory().createProcessingTimeService(mailboxExecutor);
processingTimeService = containingTask.getProcessingTimeServiceFactory().createProcessingTimeService(mailboxExecutor);
((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService);
}

return operatorFactory.createStreamOperator(containingTask, configuration, output);
OP op = operatorFactory.createStreamOperator(containingTask, configuration, output);
return new Tuple2<>(op, Optional.ofNullable(processingTimeService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
Expand Down Expand Up @@ -74,6 +73,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -91,16 +91,19 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea

private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);

/**
* Stores all operators on this chain in reverse order.
*/
private final StreamOperator<?>[] allOperators;

private final RecordWriterOutput<?>[] streamOutputs;

private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;

private final OP headOperator;
/**
* For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing
* feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and
* {@code tailOperatorWrapper} are null.
*/
@Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper;
@Nullable private final StreamOperatorWrapper<?, ?> tailOperatorWrapper;

private final int numOperators;

private final OperatorEventDispatcherImpl operatorEventDispatcher;

Expand Down Expand Up @@ -151,34 +154,42 @@ public OperatorChain(
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
List<StreamOperatorWrapper<?, ?>> allOpWrappers = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps,
allOpWrappers,
containingTask.getMailboxExecutorFactory());

if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();

headOperator = StreamOperatorFactoryUtil.createOperator(
Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
output);

OP headOperator = headOperatorAndTimeService.f0;
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
this.headOperatorWrapper = createOperatorWrapper(headOperator, containingTask, configuration, headOperatorAndTimeService.f1);

// add head operator to end of chain
allOpWrappers.add(headOperatorWrapper);

this.tailOperatorWrapper = allOpWrappers.get(0);
} else {
headOperator = null;
checkState(allOpWrappers.size() == 0);
this.headOperatorWrapper = null;
this.tailOperatorWrapper = null;
}

// add head operator to end of chain
allOps.add(headOperator);
this.numOperators = allOpWrappers.size();

this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
linkOperatorWrappers(allOpWrappers);

success = true;
}
Expand All @@ -197,16 +208,21 @@ public OperatorChain(

@VisibleForTesting
OperatorChain(
StreamOperator<?>[] allOperators,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
RecordWriterOutput<?>[] streamOutputs,
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
OP headOperator) {
StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {

this.allOperators = checkNotNull(allOperators);
this.streamOutputs = checkNotNull(streamOutputs);
this.chainEntryPoint = checkNotNull(chainEntryPoint);
this.headOperator = checkNotNull(headOperator);
this.operatorEventDispatcher = null;

checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0);
this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();

linkOperatorWrappers(allOperatorWrappers);
}

@Override
Expand Down Expand Up @@ -251,12 +267,8 @@ public void broadcastCheckpointCancelMarker(long id) throws IOException {
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
final StreamOperator<?>[] operators = this.allOperators;
for (int i = operators.length - 1; i >= 0; --i) {
final StreamOperator<?> op = operators[i];
if (op != null) {
op.prepareSnapshotPreBarrier(checkpointId);
}
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
}
}

Expand All @@ -266,7 +278,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
* @param inputId the input ID starts from 1 which indicates the first input.
*/
public void endHeadOperatorInput(int inputId) throws Exception {
endOperatorInput(headOperator, inputId);
endOperatorInput(getHeadOperator(), inputId);
}

/**
Expand All @@ -276,7 +288,7 @@ public void endHeadOperatorInput(int inputId) throws Exception {
* @param streamOperator non-head operator for ending the only input.
*/
public void endNonHeadOperatorInput(StreamOperator<?> streamOperator) throws Exception {
checkState(streamOperator != headOperator);
checkState(streamOperator != getHeadOperator());
endOperatorInput(streamOperator, 1);
}

Expand All @@ -292,8 +304,25 @@ public RecordWriterOutput<?>[] getStreamOutputs() {
return streamOutputs;
}

public StreamOperator<?>[] getAllOperators() {
return allOperators;
/**
* Returns an {@link Iterable} which traverses all operators in forward topological order.
*/
public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators() {
return getAllOperators(false);
}

/**
* Returns an {@link Iterable} which traverses all operators in forward or reverse
* topological order.
*/
public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators(boolean reverse) {
return reverse ?
new StreamOperatorWrapper.ReadIterator(tailOperatorWrapper, true) :
new StreamOperatorWrapper.ReadIterator(headOperatorWrapper, false);
}

public int getNumberOfOperators() {
return numOperators;
}

public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getChainEntryPoint() {
Expand Down Expand Up @@ -326,21 +355,9 @@ public void releaseOutputs() {
}
}

@Nullable
public OP getHeadOperator() {
return headOperator;
}

public int getChainLength() {
return allOperators == null ? 0 : allOperators.length;
}

public boolean hasSelectiveReadingOperator() {
for (StreamOperator operator : allOperators) {
if (operator instanceof InputSelectable) {
return true;
}
}
return false;
return (headOperatorWrapper == null) ? null : headOperatorWrapper.getStreamOperator();
}

// ------------------------------------------------------------------------
Expand All @@ -353,7 +370,7 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

Expand All @@ -376,7 +393,7 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
allOutputs.add(new Tuple2<>(output, outputEdge));
Expand Down Expand Up @@ -432,7 +449,7 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOp
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory) {
// create the output that the operator writes to first. this may recursively create more operators
Expand All @@ -442,17 +459,19 @@ private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOp
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
allOperatorWrappers,
mailboxExecutorFactory);

// now create the operator and give it the output collector to write its output to
OneInputStreamOperator<IN, OUT> chainedOperator = StreamOperatorFactoryUtil.createOperator(
Tuple2<OneInputStreamOperator<IN, OUT>, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorConfig.getStreamOperatorFactory(userCodeClassloader),
containingTask,
operatorConfig,
chainedOperatorOutput);

allOperators.add(chainedOperator);
OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorAndTimeService.f0;
allOperatorWrappers.add(createOperatorWrapper(chainedOperator, containingTask, operatorConfig, chainedOperatorAndTimeService.f1));

WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
Expand Down Expand Up @@ -491,6 +510,34 @@ private RecordWriterOutput<OUT> createStreamOutput(
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}

/**
* Links operator wrappers in forward topological order.
*
* @param allOperatorWrappers is an operator wrapper list of reverse topological order
*/
private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {
StreamOperatorWrapper<?, ?> previous = null;
for (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {
if (previous != null) {
previous.setPrevious(current);
}
current.setNext(previous);
previous = current;
}
}

private <T, P extends StreamOperator<T>> StreamOperatorWrapper<T, P> createOperatorWrapper(
P operator,
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Optional<ProcessingTimeService> processingTimeService) {

return new StreamOperatorWrapper<>(
operator,
processingTimeService,
containingTask.getMailboxExecutorFactory().createExecutor(operatorConfig.getChainIndex()));
}

// ------------------------------------------------------------------------
// Collectors for output chaining
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 56ce696

Please sign in to comment.