Skip to content

Commit

Permalink
[FLINK-11878][runtime] Implement the runtime handling of BoundedOneIn…
Browse files Browse the repository at this point in the history
…put and BoundedMultiInput
  • Loading branch information
sunhaibotb authored and pnowojski committed Jun 24, 2019
1 parent 1ee24d6 commit 3b82310
Show file tree
Hide file tree
Showing 21 changed files with 552 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,29 @@ public long getInputMask() {
return inputMask;
}

/**
* Tests if the input specified by {@code inputId} is selected.
*
* @param inputId The input id, see the description of {@code inputId} in {@link Builder#select(int)}.
* @return {@code true} if the input is selected, {@code false} otherwise.
*/
public boolean isInputSelected(int inputId) {
return (inputMask & (1L << (inputId - 1))) != 0;
}

/**
* Tests if all inputs are selected.
*
* @return {@code true} if the input mask equals -1, {@code false} otherwise.
*/
public boolean areAllInputsSelected() {
return inputMask == -1L;
}

/**
* Tells whether or not the input mask includes all of two inputs.
*
* @return {@code true} if the input mask includes all of two inputs, false otherwise.
* @return {@code true} if the input mask includes all of two inputs, {@code false} otherwise.
*/
public boolean isALLMaskOf2() {
return isALLMaskOf2;
Expand Down Expand Up @@ -120,6 +139,16 @@ public static final class Builder {

private long inputMask = 0;

/**
* Returns a {@code Builder} that uses the input mask of the specified {@code selection}
* as the initial mask.
*/
public static Builder from(InputSelection selection) {
Builder builder = new Builder();
builder.inputMask = selection.inputMask;
return builder;
}

/**
* Selects an input identified by the given {@code inputId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

Expand Down Expand Up @@ -55,13 +56,17 @@ public StreamSource(SRC sourceFunction) {
this.chainingStrategy = ChainingStrategy.HEAD;
}

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final OperatorChain<?, ?> operatorChain) throws Exception {

run(lockingObject, streamStatusMaintainer, output, operatorChain);
}

public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final Output<StreamRecord<OUT>> collector,
final OperatorChain<?, ?> operatorChain) throws Exception {

final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

Expand Down Expand Up @@ -96,9 +101,14 @@ public void run(final Object lockingObject,

// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
// a final watermark that indicates that we reached the end of event-time, and end inputs
// of the operator chain
if (!isCanceledOrStopped()) {
advanceToEndOfEventTime();

synchronized (lockingObject) {
operatorChain.endInput(1);
}
}
} finally {
// make sure that the context is closed in any case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import org.slf4j.Logger;
Expand Down Expand Up @@ -69,6 +70,8 @@ public class StreamInputProcessor<IN> {

private final Object lock;

private final OperatorChain<?, ?> operatorChain;

// ---------------- Status and Watermark Valve ------------------

/** Valve that controls how watermarks and stream statuses are forwarded. */
Expand Down Expand Up @@ -96,7 +99,8 @@ public StreamInputProcessor(
OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge,
String taskName) throws IOException {
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

Expand All @@ -120,6 +124,8 @@ public StreamInputProcessor(

this.watermarkGauge = watermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.operatorChain = checkNotNull(operatorChain);
}

public boolean processInput() throws Exception {
Expand All @@ -128,7 +134,7 @@ public boolean processInput() throws Exception {
StreamElement recordOrMark = input.pollNextNullable();
if (recordOrMark == null) {
input.isAvailable().get();
return !input.isFinished();
return !checkFinished();
}
int channel = input.getLastChannel();
checkState(channel != StreamTaskInput.UNSPECIFIED);
Expand Down Expand Up @@ -174,6 +180,16 @@ private void initializeNumRecordsIn() {
}
}

private boolean checkFinished() throws Exception {
boolean isFinished = input.isFinished();
if (isFinished) {
synchronized (lock) {
operatorChain.endInput(1);
}
}
return isFinished;
}

public void cleanup() throws Exception {
input.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.Optional;

Expand Down Expand Up @@ -88,6 +90,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {

private final Object lock;

private final OperatorChain<?, ?> operatorChain;

// ---------------- Status and Watermark Valves ------------------

/**
Expand Down Expand Up @@ -124,6 +128,9 @@ public class StreamTwoInputProcessor<IN1, IN2> {

private Counter numRecordsIn;

private final BitSet finishedChannels1;
private final BitSet finishedChannels2;

private boolean isFinished;

@SuppressWarnings("unchecked")
Expand All @@ -142,7 +149,8 @@ public StreamTwoInputProcessor(
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
String taskName) throws IOException {
String taskName,
OperatorChain<?, ?> operatorChain) throws IOException {

final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

Expand Down Expand Up @@ -191,6 +199,11 @@ public StreamTwoInputProcessor(
this.input1WatermarkGauge = input1WatermarkGauge;
this.input2WatermarkGauge = input2WatermarkGauge;
metrics.gauge("checkpointAlignmentTime", barrierHandler::getAlignmentDurationNanos);

this.operatorChain = checkNotNull(operatorChain);

this.finishedChannels1 = new BitSet();
this.finishedChannels2 = new BitSet();
}

public boolean processInput() throws Exception {
Expand Down Expand Up @@ -294,7 +307,7 @@ else if (recordOrWatermark.isLatencyMarker()) {
}
}

private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws Exception {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
Expand All @@ -307,6 +320,30 @@ private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOExceptio
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}

handleEndOfPartitionEvent(bufferOrEvent.getChannelIndex());
}
}

private void handleEndOfPartitionEvent(int channelIndex) throws Exception {
int finishedInputId = -1;

if (channelIndex < numInputChannels1) {
finishedChannels1.set(channelIndex);
if (finishedChannels1.cardinality() == numInputChannels1) {
finishedInputId = 1;
}
} else {
finishedChannels2.set(channelIndex - numInputChannels1);
if (finishedChannels2.cardinality() == numInputChannels2) {
finishedInputId = 2;
}
}

if (finishedInputId > 0) {
synchronized (lock) {
operatorChain.endInput(finishedInputId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
private final StreamTaskInput input1;
private final StreamTaskInput input2;

private final OperatorChain<?, ?> operatorChain;

/**
* Valves that control how watermarks and stream statuses from the 2 inputs are forwarded.
*/
Expand Down Expand Up @@ -103,7 +106,8 @@ public StreamTwoInputSelectableProcessor(
StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge) {
WatermarkGauge input2WatermarkGauge,
OperatorChain<?, ?> operatorChain) {

checkState(streamOperator instanceof InputSelectable);

Expand All @@ -126,6 +130,8 @@ public StreamTwoInputSelectableProcessor(
unionedInputGate2.getNumberOfInputChannels(),
new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1));

this.operatorChain = checkNotNull(operatorChain);

this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;

Expand Down Expand Up @@ -329,10 +335,13 @@ private void waitForOneInput(StreamTaskInput input)
setAvailableInput(input.getInputIndex());
}

private boolean checkFinished() {
private boolean checkFinished() throws Exception {
if (getInput(lastReadInputIndex).isFinished()) {
inputSelection = (lastReadInputIndex == 0) ? InputSelection.SECOND : InputSelection.FIRST;
// TODO: adds the runtime handling of the BoundedMultiInput interface;

synchronized (lock) {
operatorChain.endInput(getInputId(lastReadInputIndex));
}
}

return input1.isFinished() && input2.isFinished();
Expand All @@ -350,6 +359,10 @@ private StreamTaskInput getInput(int inputIndex) {
return inputIndex == 0 ? input1 : input2;
}

private int getInputId(int inputIndex) {
return inputIndex + 1;
}

private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {

private final TwoInputStreamOperator<IN1, IN2, ?> operator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void init() throws Exception {
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge,
getTaskNameWithSubtaskAndId());
getTaskNameWithSubtaskAndId(),
operatorChain);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
Expand Down
Loading

0 comments on commit 3b82310

Please sign in to comment.