Skip to content

Commit

Permalink
[FLINK-9136][tests] Remove StreamingProgramTestBase
Browse files Browse the repository at this point in the history
This closes apache#5817.
  • Loading branch information
zentol committed May 2, 2018
1 parent 0973e34 commit de10b40
Show file tree
Hide file tree
Showing 20 changed files with 178 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,25 @@
package org.apache.flink.storm.exclamation;

import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.Test;

/**
* Test for the ExclamationWithBolt example.
*/
public class ExclamationWithBoltITCase extends StreamingProgramTestBase {

protected String textPath;
protected String resultPath;
protected String exclamationNum;
public class ExclamationWithBoltITCase extends AbstractTestBase {

@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
this.exclamationNum = "3";
}
@Test
public void testProgram() throws Exception {
String textPath = createTempFile("text.txt", WordCountData.TEXT);
String resultPath = getTempDirPath("result");
String exclamationNum = "3";

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
ExclamationWithBolt.main(new String[]{textPath, resultPath, exclamationNum});

@Override
protected void testProgram() throws Exception {
ExclamationWithBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,23 @@
package org.apache.flink.storm.exclamation;

import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.Test;

/**
* Test for the ExclamationWithSpout example.
*/
public class ExclamationWithSpoutITCase extends StreamingProgramTestBase {

protected String textPath;
protected String resultPath;
public class ExclamationWithSpoutITCase extends AbstractTestBase {

@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
}
@Test
public void testProgram() throws Exception {
String textPath = createTempFile("text.txt", WordCountData.TEXT);
String resultPath = getTempDirPath("result");

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
ExclamationWithSpout.main(new String[]{textPath, resultPath});

@Override
protected void testProgram() throws Exception {
ExclamationWithSpout.main(new String[]{this.textPath, this.resultPath});
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,24 @@
package org.apache.flink.storm.exclamation;

import org.apache.flink.storm.exclamation.util.ExclamationData;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.Test;

/**
* Test for the ExclamationLocal example.
*/
public class StormExclamationLocalITCase extends StreamingProgramTestBase {

protected String textPath;
protected String resultPath;
protected String exclamationNum;
public class StormExclamationLocalITCase extends AbstractTestBase {

@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
this.exclamationNum = "3";
}
@Test
public void testProgram() throws Exception {
String textPath = createTempFile("text.txt", WordCountData.TEXT);
String resultPath = getTempDirPath("result");
String exclamationNum = "3";

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath);
}
ExclamationLocal.main(new String[]{textPath, resultPath, exclamationNum});

@Override
protected void testProgram() throws Exception {
ExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum});
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

package org.apache.flink.storm.join;

import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.util.AbstractTestBase;

import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;

import org.junit.Test;

/**
* Test for the SingleJoin example.
*/
public class SingleJoinITCase extends StreamingProgramTestBase {
public class SingleJoinITCase extends AbstractTestBase {

protected static String[] expectedOutput = {
"(male,20)",
Expand All @@ -40,23 +42,14 @@ public class SingleJoinITCase extends StreamingProgramTestBase {
"(female,29)"
};

protected String resultPath;

@Override
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), this.resultPath);
}

@Override
protected void testProgram() throws Exception {
@Test
public void testProgram() throws Exception {
String resultPath = getTempDirPath("result");
// We need to remove the file scheme because we can't use the Flink file system.
// (to remain compatible with Storm)
SingleJoinExample.main(new String[]{ this.resultPath.replace("file:", "") });
SingleJoinExample.main(new String[]{resultPath.replace("file:", "")});

compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), resultPath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import org.apache.flink.storm.tests.operators.TaskIdBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.MathUtils;

import org.apache.storm.Config;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,24 +42,36 @@
* This test relies on the hash function used by the {@link DataStream#keyBy}, which is
* assumed to be {@link MathUtils#murmurHash}.
*/
public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
public class StormFieldsGroupingITCase extends AbstractTestBase {

private static final String topologyId = "FieldsGrouping Test";
private static final String spoutId = "spout";
private static final String boltId = "bolt";
private static final String sinkId = "sink";
private String resultPath;

@Override
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
}
@Test
public void testProgram() throws Exception {
String resultPath = this.getTempDirPath("result");

final String[] tokens = resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];

final TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number"));
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();

@Override
protected void postSubmit() throws Exception {
List<String> expectedResults = Arrays.asList(
"-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947",
"-65105105", "-518907128", "-252332814");
"-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947",
"-65105105", "-518907128", "-252332814");

List<String> actualResults = new ArrayList<>();
readAllResultLines(actualResults, resultPath, new String[0], false);
Expand All @@ -82,23 +95,4 @@ protected void postSubmit() throws Exception {
}
}

@Override
protected void testProgram() throws Exception {
final String[] tokens = this.resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];

final TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number"));
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,25 @@
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.MetaDataSpout;
import org.apache.flink.storm.tests.operators.VerifyMetaDataBolt;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.util.AbstractTestBase;

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Test;

/**
* Test for meta data spouts/bolts.
*/
public class StormMetaDataITCase extends StreamingProgramTestBase {
public class StormMetaDataITCase extends AbstractTestBase {

private static final String topologyId = "FieldsGrouping Test";
private static final String spoutId = "spout";
private static final String boltId1 = "bolt1";
private static final String boltId2 = "bolt2";

@Override
protected void testProgram() throws Exception {
@Test
public void testProgram() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(spoutId, new MetaDataSpout(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.MergerBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.util.AbstractTestBase;

import org.apache.storm.Config;
import org.apache.storm.topology.TopologyBuilder;
import org.junit.Test;

/**
* Test for the {@link MergerBolt}.
*/
public class StormUnionITCase extends StreamingProgramTestBase {
public class StormUnionITCase extends AbstractTestBase {

private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n"
+ "431529176\n" + "1260042744\n" + "1761283695\n" + "1749940626\n" + "892128508\n"
Expand All @@ -47,20 +48,11 @@ public class StormUnionITCase extends StreamingProgramTestBase {
private static final String spoutId3 = "spout3";
private static final String boltId = "merger";
private static final String sinkId = "sink";
private String resultPath;

@Override
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(RESULT, this.resultPath);
}
@Test
public void testProgram() throws Exception {
String resultPath = this.getTempDirPath("result");

@Override
protected void testProgram() throws Exception {
final TopologyBuilder builder = new TopologyBuilder();

// get input data
Expand All @@ -73,7 +65,7 @@ protected void testProgram() throws Exception {
.shuffleGrouping(spoutId2, FiniteRandomSpout.STREAM_PREFIX + 0)
.shuffleGrouping(spoutId3, FiniteRandomSpout.STREAM_PREFIX + 0);

final String[] tokens = this.resultPath.split(":");
final String[] tokens = resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);

Expand All @@ -83,6 +75,8 @@ protected void testProgram() throws Exception {
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();

compareResultsByLinesInMemory(RESULT, resultPath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,24 @@

package org.apache.flink.storm.wordcount;

import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.Test;

/**
* Test for the BoltTokenizerWordCount example.
*/
public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {

protected String textPath;
protected String resultPath;
public class BoltTokenizerWordCountITCase extends AbstractTestBase {

@Override
protected void preSubmit() throws Exception {
this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
this.resultPath = this.getTempDirPath("result");
}
@Test
public void testProgram() throws Exception {
String textPath = createTempFile("text.txt", WordCountData.TEXT);
String resultPath = getTempDirPath("result");

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
}
BoltTokenizerWordCount.main(new String[]{textPath, resultPath});

@Override
protected void testProgram() throws Exception {
BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath});
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}

}
Loading

0 comments on commit de10b40

Please sign in to comment.