Skip to content

Commit

Permalink
[FLINK-13051][runtime] Replace the non-selectable stream task with th…
Browse files Browse the repository at this point in the history
…e input-selectable one
  • Loading branch information
sunhaibotb authored and pnowojski committed Aug 30, 2019
1 parent ac0799a commit dc74248
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.OutputTag;

import org.slf4j.Logger;
Expand Down Expand Up @@ -283,9 +282,7 @@ public <IN1, IN2, OUT> void addCoOperator(
TypeInformation<OUT> outTypeInfo,
String operatorName) {

Class<? extends AbstractInvokable> vertexClass =
taskOperatorFactory.isOperatorSelectiveReading(Thread.currentThread().getContextClassLoader()) ?
TwoInputSelectableStreamTask.class : TwoInputStreamTask.class;
Class<? extends AbstractInvokable> vertexClass = TwoInputSelectableStreamTask.class;

addNode(vertexID, slotSharingGroup, coLocationGroup, vertexClass, taskOperatorFactory, operatorName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
* for {@link StreamOneInputProcessor} and {@link StreamTwoInputProcessor}.
* for {@link StreamOneInputProcessor} and {@link StreamTwoInputSelectableProcessor}.
*/
@Internal
public class InputProcessorUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;

import static org.apache.flink.util.Preconditions.checkNotNull;
import javax.annotation.Nullable;

/**
* This handler is mainly used for selecting the next available input index
Expand All @@ -30,19 +30,24 @@
@Internal
public class TwoInputSelectionHandler {

@Nullable
private final InputSelectable inputSelector;

private InputSelection inputSelection;

private int availableInputsMask;

public TwoInputSelectionHandler(InputSelectable inputSelectable) {
this.inputSelector = checkNotNull(inputSelectable);
public TwoInputSelectionHandler(@Nullable InputSelectable inputSelectable) {
this.inputSelector = inputSelectable;
this.availableInputsMask = (int) new InputSelection.Builder().select(1).select(2).build().getInputMask();
}

void nextSelection() {
inputSelection = inputSelector.nextSelection();
if (inputSelector == null) {
inputSelection = InputSelection.ALL;
} else {
inputSelection = inputSelector.nextSelection();
}
}

int selectNextInputIndex(int lastReadInputIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.io.IOException;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link StreamTask} for executing a {@link TwoInputStreamOperator} and supporting
* the {@link TwoInputStreamOperator} to select input for reading.
Expand All @@ -49,8 +47,8 @@ protected void createInputProcessor(
TypeSerializer<IN1> inputDeserializer1,
TypeSerializer<IN2> inputDeserializer2) throws IOException {

checkState(headOperator instanceof InputSelectable);
TwoInputSelectionHandler twoInputSelectionHandler = new TwoInputSelectionHandler((InputSelectable) headOperator);
TwoInputSelectionHandler twoInputSelectionHandler = new TwoInputSelectionHandler(
headOperator instanceof InputSelectable ? (InputSelectable) headOperator : null);

this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
inputGates1, inputGates2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {

TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public ProcessingTimeService getProcessingTimeService() {
}

/**
* This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
* This must be overwritten for OneInputStreamTask or TwoInputSelectableStreamTask test harnesses.
*/
protected void initializeInputs() throws IOException, InterruptedException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testMultipleSetupsThrowsException() {
// expected
}

harness = new StreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
harness = new StreamTaskTestHarness<>(TwoInputSelectableStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator())
.chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -63,26 +61,16 @@
import static org.junit.Assert.assertEquals;

/**
* Tests for {@link TwoInputStreamTask} and {@link TwoInputSelectableStreamTask}. Theses tests
* implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}
* and {@link org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor}.
* Tests for {@link TwoInputSelectableStreamTask}. Theses tests implicitly also test the
* {@link org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor}.
*
* <p>Note:<br>
* We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
* used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
* TwoInputStreamOperators.
* used as a representative to test {@link TwoInputSelectableStreamTask}, since {@link TwoInputSelectableStreamTask}
* is used for all {@link TwoInputStreamOperator}s.
*/
@RunWith(Parameterized.class)
public class TwoInputStreamTaskTest {

@Parameterized.Parameter
public boolean isInputSelectable;

@Parameterized.Parameters(name = "isInputSelectable = {0}")
public static List<Boolean> parameters() {
return Arrays.asList(Boolean.FALSE, Boolean.TRUE);
}

/**
* This test verifies that open() and close() are correctly called. This test also verifies
* that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
Expand All @@ -93,13 +81,12 @@ public static List<Boolean> parameters() {
public void testOpenCloseAndTimestamps() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
CoStreamMap<String, Integer, String> coMapOperator = isInputSelectable ?
new AnyReadingCoStreamMap<>(new TestOpenCloseMapFunction()) : new CoStreamMap<>(new TestOpenCloseMapFunction());
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new TestOpenCloseMapFunction());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());

Expand Down Expand Up @@ -141,16 +128,15 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<String, Integer, String>(
isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
CoStreamMap<String, Integer, String> coMapOperator = isInputSelectable ?
new AnyReadingCoStreamMap<>(new IdentityMap()) : new CoStreamMap<>(new IdentityMap());
CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap());
streamConfig.setStreamOperator(coMapOperator);
streamConfig.setOperatorID(new OperatorID());

Expand Down Expand Up @@ -252,14 +238,10 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {
@Test
@SuppressWarnings("unchecked")
public void testCheckpointBarriers() throws Exception {
if (isInputSelectable) {
// In the case of selective reading, checkpoints are not currently supported, and we skip this test
return;
}

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<String, Integer, String>(
TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
Expand Down Expand Up @@ -341,14 +323,10 @@ public void testCheckpointBarriers() throws Exception {
@Test
@SuppressWarnings("unchecked")
public void testOvertakingCheckpointBarriers() throws Exception {
if (isInputSelectable) {
// In the case of selective reading, checkpoints are not currently supported, and we skip this test
return;
}

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

Expand Down Expand Up @@ -423,7 +401,7 @@ public void testOvertakingCheckpointBarriers() throws Exception {
@Test
public void testOperatorMetricReuse() throws Exception {
final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
Expand Down Expand Up @@ -495,11 +473,10 @@ public InputSelection nextSelection() {
@Test
public void testWatermarkMetrics() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(
isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

CoStreamMap<String, Integer, String> headOperator = isInputSelectable ?
new AnyReadingCoStreamMap<>(new IdentityMap()) : new CoStreamMap<>(new IdentityMap());
CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap());
final OperatorID headOperatorId = new OperatorID();

OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
Expand Down Expand Up @@ -600,18 +577,15 @@ public TaskMetricGroup getMetricGroup() {
@Test
public void testHandlingEndOfInput() throws Exception {
final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
3,
2,
new int[] {1, 2, 2},
TwoInputSelectableStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

testHarness
.setupOperatorChain(
new OperatorID(),
isInputSelectable ? new TestBoundedAndSelectableTwoInputOperator("Operator0") : new TestBoundedTwoInputOperator("Operator0"))
new TestBoundedTwoInputOperator("Operator0"))
.chain(
new OperatorID(),
new TestBoundedOneInputStreamOperator("Operator1"),
Expand All @@ -623,36 +597,19 @@ public void testHandlingEndOfInput() throws Exception {
testHarness.invoke();
testHarness.waitForTaskRunning();

TestBoundedTwoInputOperator headOperator = (TestBoundedTwoInputOperator) testHarness.getTask().headOperator;

testHarness.processElement(new StreamRecord<>("Hello-1"), 0, 0);
testHarness.endInput(0, 0);
testHarness.processElement(new StreamRecord<>("Hello-2"), 0, 1);
testHarness.endInput(0, 1);

testHarness.waitForInputProcessing();

testHarness.processElement(new StreamRecord<>("Hello-3"), 1, 0);
testHarness.processElement(new StreamRecord<>("Hello-4"), 1, 1);
testHarness.processElement(new StreamRecord<>("Hello-2"), 1, 0);
testHarness.endInput(1, 0);
testHarness.endInput(1, 1);

testHarness.waitForInputProcessing();

testHarness.processElement(new StreamRecord<>("Hello-5"), 2, 0);
testHarness.processElement(new StreamRecord<>("Hello-6"), 2, 1);
testHarness.endInput(2, 0);
testHarness.endInput(2, 1);

testHarness.waitForInputProcessing();
testHarness.waitForTaskCompletion();

expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-1"));
expectedOutput.add(new StreamRecord<>("[Operator0-1]: Hello-2"));
expectedOutput.add(new StreamRecord<>("[Operator0-1]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-3"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-4"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-5"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-6"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Hello-2"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: Bye"));
expectedOutput.add(new StreamRecord<>("[Operator1]: Bye"));

Expand Down Expand Up @@ -723,31 +680,5 @@ public String map2(Integer value) throws Exception {
return value.toString();
}
}

private static class AnyReadingCoStreamMap<IN1, IN2, OUT> extends CoStreamMap<IN1, IN2, OUT>
implements InputSelectable {

public AnyReadingCoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper);
}

@Override
public InputSelection nextSelection() {
return InputSelection.ALL;
}
}

private static class TestBoundedAndSelectableTwoInputOperator
extends TestBoundedTwoInputOperator implements InputSelectable {

public TestBoundedAndSelectableTwoInputOperator(String name) {
super(name);
}

@Override
public InputSelection nextSelection() {
return InputSelection.ALL;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


/**
* Test harness for testing a {@link TwoInputStreamTask} or a {@link TwoInputSelectableStreamTask}.
* Test harness for testing a {@link TwoInputSelectableStreamTask}.
*
* <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements
* and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -243,6 +241,7 @@ private void buildJoin(
joinAndAssert(operator, buildInput, probeInput, expectOutSize, expectOutKeySize, expectOutVal, false);
}

@SuppressWarnings("unchecked")
static void joinAndAssert(
Object operator,
MutableObjectIterator<BinaryRow> input1,
Expand All @@ -256,9 +255,7 @@ static void joinAndAssert(
new IntType(), new IntType(), new IntType(), new IntType());
TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
new TwoInputStreamTaskTestHarness<>(
(operator instanceof InputSelectable || operator instanceof StreamOperatorFactory) ?
TwoInputSelectableStreamTask::new :
TwoInputStreamTask::new,
TwoInputSelectableStreamTask::new,
2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo, baseRowType);
testHarness.memorySize = 36 * 1024 * 1024;
testHarness.getExecutionConfig().enableObjectReuse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -242,7 +242,7 @@ public static LinkedBlockingQueue<Object> join(
new IntType(), new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
final TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputStreamTask::new, 2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo,
TwoInputSelectableStreamTask::new, 2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo,
joinedInfo);

// Deep pit!!! Cause in TwoInputStreamTaskTestHarness, one record one buffer.
Expand Down
Loading

0 comments on commit dc74248

Please sign in to comment.