Skip to content

Commit

Permalink
[hotfix][test] Separate some shared inner classes from test classes i…
Browse files Browse the repository at this point in the history
…nto the separate files
  • Loading branch information
sunhaibotb authored and pnowojski committed Aug 19, 2019
1 parent e9c9b0e commit 9042a15
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void testAnyOrderedReading() throws Exception {
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));

testBase(new AnyReadingStreamOperator("Operator0"), true, expectedOutput, true);
testBase(new TestAnyModeReadingStreamOperator("Operator0"), true, expectedOutput, true);
}

@Test
Expand All @@ -71,7 +73,7 @@ public void testAnyUnorderedReading() throws Exception {
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));

testBase(new AnyReadingStreamOperator("Operator0"), false, expectedOutput, false);
testBase(new TestAnyModeReadingStreamOperator("Operator0"), false, expectedOutput, false);
}

@Test
Expand All @@ -85,7 +87,7 @@ public void testSequentialReading() throws Exception {
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));

testBase(new SequentialReadingStreamOperator("Operator0"), false, expectedOutput, true);
testBase(new TestSequentialReadingStreamOperator("Operator0"), false, expectedOutput, true);
}

@Test
Expand Down Expand Up @@ -210,73 +212,6 @@ public void startProcessing() {
}
}

private static class AnyReadingStreamOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, Integer, String>, InputSelectable {

private final String name;

AnyReadingStreamOperator(String name) {
super();

this.name = name;
}

@Override
public InputSelection nextSelection() {
return InputSelection.ALL;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<Integer> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}
}

/**
* Test operator for sequential reading.
*/
public static class SequentialReadingStreamOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, Integer, String>, InputSelectable, BoundedMultiInput {

private final String name;

private InputSelection inputSelection;

public SequentialReadingStreamOperator(String name) {
super();

this.name = name;
this.inputSelection = InputSelection.FIRST;
}

@Override
public InputSelection nextSelection() {
return inputSelection;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<Integer> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}

@Override
public void endInput(int inputId) {
if (inputId == 1) {
inputSelection = InputSelection.SECOND;
}
}
}

private static class SpecialRuleReadingStreamOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, Integer, String>, InputSelectable, BoundedMultiInput {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.TestBoundedTwoInputOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;

import org.junit.Assert;
Expand Down Expand Up @@ -737,38 +737,8 @@ public InputSelection nextSelection() {
}
}

/**
* Uses to test handling the EndOfInput notification.
*/
private static class TestBoundedTwoInputOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, String, String>, BoundedMultiInput {

private static final long serialVersionUID = 1L;

private final String name;

public TestBoundedTwoInputOperator(String name) {
this.name = name;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}

@Override
public void endInput(int inputId) {
output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: Bye"));
}
}

private static class TestBoundedAndSelectableTwoInputOperator
extends TestBoundedTwoInputOperator implements InputSelectable {
extends TestBoundedTwoInputOperator implements InputSelectable {

public TestBoundedAndSelectableTwoInputOperator(String name) {
super(name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.util;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* A test operator class for any mode reading.
*/
public class TestAnyModeReadingStreamOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, Integer, String>, InputSelectable {

private final String name;

public TestAnyModeReadingStreamOperator(String name) {
super();

this.name = name;
}

@Override
public InputSelection nextSelection() {
return InputSelection.ALL;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<Integer> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.util;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* A test operator class implementing {@link BoundedMultiInput}.
*/
public class TestBoundedTwoInputOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, String, String>, BoundedMultiInput {

private static final long serialVersionUID = 1L;

private final String name;

public TestBoundedTwoInputOperator(String name) {
this.name = name;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}

@Override
public void endInput(int inputId) {
output.collect(new StreamRecord<>("[" + name + "-" + inputId + "]: Bye"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.util;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
* A test operator class for sequential reading.
*/
public class TestSequentialReadingStreamOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<String, Integer, String>, InputSelectable, BoundedMultiInput {

private final String name;

private InputSelection inputSelection;

public TestSequentialReadingStreamOperator(String name) {
super();

this.name = name;
this.inputSelection = InputSelection.FIRST;
}

@Override
public InputSelection nextSelection() {
return inputSelection;
}

@Override
public void processElement1(StreamRecord<String> element) {
output.collect(element.replace("[" + name + "-1]: " + element.getValue()));
}

@Override
public void processElement2(StreamRecord<Integer> element) {
output.collect(element.replace("[" + name + "-2]: " + element.getValue()));
}

@Override
public void endInput(int inputId) {
if (inputId == 1) {
inputSelection = InputSelection.SECOND;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTaskSelectiveReadingTest.SequentialReadingStreamOperator;
import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

import org.junit.Test;
Expand Down Expand Up @@ -60,7 +60,7 @@ public void testSequentialReading() throws Exception {
.setParallelism(2);
TestListResultSink<String> resultSink = new TestListResultSink<>();

TwoInputStreamOperator<String, Integer, String> twoInputStreamOperator = new SequentialReadingStreamOperator("Operator0");
TwoInputStreamOperator<String, Integer, String> twoInputStreamOperator = new TestSequentialReadingStreamOperator("Operator0");
twoInputStreamOperator.setChainingStrategy(ChainingStrategy.NEVER);

source0.connect(source1)
Expand Down

0 comments on commit 9042a15

Please sign in to comment.