Skip to content

Commit

Permalink
[FLINK-13051][runtime] Rename TwoInputSelectableStreamTask and Stream…
Browse files Browse the repository at this point in the history
…TwoInputSelectableProcessor
  • Loading branch information
sunhaibotb authored and pnowojski committed Aug 30, 2019
1 parent 7e21ec8 commit ce55783
Show file tree
Hide file tree
Showing 16 changed files with 37 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.OutputTag;

import org.slf4j.Logger;
Expand Down Expand Up @@ -282,7 +282,7 @@ public <IN1, IN2, OUT> void addCoOperator(
TypeInformation<OUT> outTypeInfo,
String operatorName) {

Class<? extends AbstractInvokable> vertexClass = TwoInputSelectableStreamTask.class;
Class<? extends AbstractInvokable> vertexClass = TwoInputStreamTask.class;

addNode(vertexID, slotSharingGroup, coLocationGroup, vertexClass, taskOperatorFactory, operatorName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
* for {@link StreamOneInputProcessor} and {@link StreamTwoInputSelectableProcessor}.
* for {@link StreamOneInputProcessor} and {@link StreamTwoInputProcessor}.
*/
@Internal
public class InputProcessorUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.ExceptionUtils;

import java.io.IOException;
Expand All @@ -45,13 +46,13 @@
import static org.apache.flink.util.Preconditions.checkState;

/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}.
* Input reader for {@link TwoInputStreamTask}.
*
* @param <IN1> The type of the records that arrive on the first input
* @param <IN2> The type of the records that arrive on the second input
*/
@Internal
public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements StreamInputProcessor {
public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {

private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;

Expand Down Expand Up @@ -83,7 +84,7 @@ public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements Stream

private boolean isPrepared;

public StreamTwoInputSelectableProcessor(
public StreamTwoInputProcessor(
Collection<InputGate> inputGates1,
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* This handler is mainly used for selecting the next available input index
* in {@link StreamTwoInputSelectableProcessor}.
* in {@link StreamTwoInputProcessor}.
*/
@Internal
public class TwoInputSelectionHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler;

import java.io.IOException;
Expand All @@ -34,9 +34,9 @@
* the {@link TwoInputStreamOperator} to select input for reading.
*/
@Internal
public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> {
public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> {

public TwoInputSelectableStreamTask(Environment env) {
public TwoInputStreamTask(Environment env) {
super(env);
}

Expand All @@ -50,7 +50,7 @@ protected void createInputProcessor(
TwoInputSelectionHandler twoInputSelectionHandler = new TwoInputSelectionHandler(
headOperator instanceof InputSelectable ? (InputSelectable) headOperator : null);

this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
this.inputProcessor = new StreamTwoInputProcessor<>(
inputGates1, inputGates2,
inputDeserializer1, inputDeserializer2,
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {

TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void testBase(
// Utilities
// ------------------------------------------------------------------------

private static class TestSelectiveReadingTask<IN1, IN2, OUT> extends TwoInputSelectableStreamTask<IN1, IN2, OUT> {
private static class TestSelectiveReadingTask<IN1, IN2, OUT> extends TwoInputStreamTask<IN1, IN2, OUT> {

private volatile boolean started;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public ProcessingTimeService getProcessingTimeService() {
}

/**
* This must be overwritten for OneInputStreamTask or TwoInputSelectableStreamTask test harnesses.
* This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
*/
protected void initializeInputs() throws IOException, InterruptedException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testMultipleSetupsThrowsException() {
// expected
}

harness = new StreamTaskTestHarness<>(TwoInputSelectableStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
harness = new StreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
harness.setupOperatorChain(new OperatorID(), new TwoInputTestOperator())
.chain(new OperatorID(), new TestOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.TestBoundedTwoInputOperator;
Expand All @@ -61,12 +62,12 @@
import static org.junit.Assert.assertEquals;

/**
* Tests for {@link TwoInputSelectableStreamTask}. Theses tests implicitly also test the
* {@link org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor}.
* Tests for {@link TwoInputStreamTask}. Theses tests implicitly also test the
* {@link StreamTwoInputProcessor}.
*
* <p>Note:<br>
* We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
* used as a representative to test {@link TwoInputSelectableStreamTask}, since {@link TwoInputSelectableStreamTask}
* used as a representative to test {@link TwoInputStreamTask}, since {@link TwoInputStreamTask}
* is used for all {@link TwoInputStreamOperator}s.
*/
public class TwoInputStreamTaskTest {
Expand All @@ -80,7 +81,7 @@ public class TwoInputStreamTaskTest {
public void testOpenCloseAndTimestamps() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

Expand Down Expand Up @@ -126,7 +127,7 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
Expand Down Expand Up @@ -238,7 +239,7 @@ public void testCheckpointBarriers() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();
Expand Down Expand Up @@ -322,7 +323,7 @@ public void testOvertakingCheckpointBarriers() throws Exception {

final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
2, 2, new int[] {1, 2},
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

Expand Down Expand Up @@ -397,7 +398,7 @@ public void testOvertakingCheckpointBarriers() throws Exception {
@Test
public void testOperatorMetricReuse() throws Exception {
final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

testHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator())
Expand Down Expand Up @@ -470,7 +471,7 @@ public InputSelection nextSelection() {
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap());
Expand Down Expand Up @@ -574,7 +575,7 @@ public TaskMetricGroup getMetricGroup() {
@Test
public void testHandlingEndOfInput() throws Exception {
final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


/**
* Test harness for testing a {@link TwoInputSelectableStreamTask}.
* Test harness for testing a {@link TwoInputStreamTask}.
*
* <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements
* and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -255,7 +255,7 @@ static void joinAndAssert(
new IntType(), new IntType(), new IntType(), new IntType());
TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new,
TwoInputStreamTask::new,
2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo, baseRowType);
testHarness.memorySize = 36 * 1024 * 1024;
testHarness.getExecutionConfig().enableObjectReuse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -242,7 +242,7 @@ public static LinkedBlockingQueue<Object> join(
new IntType(), new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new VarCharType(VarCharType.MAX_LENGTH));
final TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new, 2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo,
TwoInputStreamTask::new, 2, 1, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo,
joinedInfo);

// Deep pit!!! Cause in TwoInputStreamTaskTestHarness, one record one buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.dataformat.BaseRow;
Expand Down Expand Up @@ -78,7 +78,7 @@ private void init(boolean leftOut, boolean rightOut, boolean buildLeft) throws E
HashJoinType type = HashJoinType.of(buildLeft, leftOut, rightOut);
HashJoinOperator operator = newOperator(33 * 32 * 1024, type, !buildLeft);
testHarness = new TwoInputStreamTaskTestHarness<>(
TwoInputSelectableStreamTask::new, 2, 2, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo, joinedInfo);
TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo, joinedInfo);
testHarness.memorySize = 36 * 1024 * 1024;
testHarness.getExecutionConfig().enableObjectReuse();
testHarness.setupOutputForSingletonOperatorChain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testFullJoin() throws Exception {

private TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> buildSortMergeJoin(StreamOperator operator) throws Exception {
final TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
new TwoInputStreamTaskTestHarness<>(TwoInputSelectableStreamTask::new, 2, 2,
new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, 2, 2,
new int[]{1, 2}, typeInfo, (TypeInformation) typeInfo, joinedInfo);

testHarness.memorySize = 36 * 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.junit.Assert.assertEquals;

/**
* Tests for selective reading of {@code TwoInputSelectableStreamTask}.
* Tests for selective reading of {@code TwoInputStreamTask}.
*/
public class StreamTaskSelectiveReadingITCase {
@Test
Expand Down

0 comments on commit ce55783

Please sign in to comment.