Skip to content

Commit

Permalink
[FLINK-2586] Unstable Storm Compatibility Tests
Browse files Browse the repository at this point in the history
 - added BLOCKING flag to FlinkLocalCluster
 - added NullTerminatingSpout and SpoutOutputCollectorObserver plus tests
 - reworked test accordingly
   - set BLOCKING flag for ITCases
   - make infinite spouts finite using NullTerminatingSpout
   - removed sleep time to get stable
 - fixed bug in BoltSplitITCase and SpoutSplitITCase
   - exception in VerifyAndEnrichBolt is swallowed and test would not fail (replaced by errorFlag)
 - reduced sleep-time in BoltSplitITCase and SpoutSplitITCase to reduce testing time
 - fixed WrapperSetupHelperTest
   - reworked to origianl version with more than two inputs
     (was limite to two inputs because more the two inputs per bolt was not supported in between, which is now fixed)
minor code cleanup (removed unused imports, indenting, etc.)
fixed small error in documenation

This closes apache#1502.
  • Loading branch information
mjsax committed Jan 14, 2016
1 parent e4d8944 commit d1c93d2
Show file tree
Hide file tree
Showing 25 changed files with 423 additions and 74 deletions.
2 changes: 1 addition & 1 deletion docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ someStream.map(...).disableChaining();
<tr>
<td>Start a new resource group</td>
<td>
<p>Start a new resource group containing the map and the subsequent operators.
<p>Start a new resource group containing the filter and the subsequent operators.
{% highlight java %}
someStream.filter(...).startNewResourceGroup();
{% endhighlight %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
Expand Down Expand Up @@ -63,11 +62,10 @@ public static void main(final String[] args) throws Exception {
// execute program locally
Config conf = new Config();
conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);
cluster.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.NullTerminatingSpout;
import org.apache.flink.storm.util.TupleOutputFormatter;

import storm.starter.bolt.PrinterBolt;
import storm.starter.bolt.SingleJoinBolt;

Expand All @@ -37,12 +39,16 @@ public static void main(String[] args) throws Exception {
final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender", "hobbies"));
final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

Config conf = new Config();
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("gender", genderSpout);

builder.setSpout("age", ageSpout);
// only required to stabilize integration test
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);
final NullTerminatingSpout finalGenderSpout = new NullTerminatingSpout(genderSpout);
final NullTerminatingSpout finalAgeSpout = new NullTerminatingSpout(ageSpout);

builder.setSpout("gender", finalGenderSpout);
builder.setSpout("age", finalAgeSpout);
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
.fieldsGrouping("gender", new Fields("id"))
.fieldsGrouping("age", new Fields("id"));
Expand All @@ -56,9 +62,6 @@ public static void main(String[] args) throws Exception {
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("join");
}

Config conf = new Config();
conf.setDebug(true);

String[] hobbies = new String[] {"reading", "biking", "travelling", "watching tv"};

for (int i = 0; i < 10; i++) {
Expand All @@ -79,10 +82,6 @@ public static void main(String[] args) throws Exception {

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

cluster.shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* Prints incoming tweets. Tweets can be filtered by keywords.
*/
public class PrintSampleStream {
public class PrintSampleStream {
public static void main(String[] args) throws Exception {
String consumerKey = args[0];
String consumerSecret = args[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class VerifyAndEnrichBolt extends BaseRichBolt {
private final String token;
private OutputCollector collector;

public static boolean errorOccured = false;

public VerifyAndEnrichBolt(boolean evenOrOdd) {
this.evenOrOdd = evenOrOdd;
this.token = evenOrOdd ? "even" : "odd";
Expand All @@ -48,7 +50,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
@Override
public void execute(Tuple input) {
if ((input.getInteger(0) % 2 == 0) != this.evenOrOdd) {
throw new RuntimeException("Invalid number detected.");
errorOccured = true;
}
this.collector.emit(new Values(this.token, input.getInteger(0)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.flink.storm.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
Expand Down Expand Up @@ -60,10 +61,9 @@ public static void main(final String[] args) throws Exception {
final TopologyBuilder builder = WordCountTopology.buildTopology();

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.flink.storm.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
Expand Down Expand Up @@ -61,11 +62,9 @@ public static void main(final String[] args) throws Exception {
final TopologyBuilder builder = WordCountTopology.buildTopology(false);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.NullTerminatingSpout;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.TupleOutputFormatter;
import org.apache.flink.storm.wordcount.operators.BoltCounter;
Expand Down Expand Up @@ -67,7 +69,8 @@ public static TopologyBuilder buildTopology(boolean indexOrName) {
// read the text file from given input path
final String[] tokens = textPath.split(":");
final String inputFile = tokens[tokens.length - 1];
builder.setSpout(spoutId, new WordCountFileSpout(inputFile));
// inserting NullTerminatingSpout only required to stabilize integration test
builder.setSpout(spoutId, new NullTerminatingSpout(new WordCountFileSpout(inputFile)));
} else {
builder.setSpout(spoutId, new WordCountInMemorySpout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.flink.storm.split;

import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.junit.Assert;
import org.junit.Test;

public class BoltSplitITCase {

@Test
public void testTopology() throws Exception {
SplitStreamBoltLocal.main(new String[] { "0", "/dev/null" });
Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public static TopologyBuilder buildTopology() {
final String[] tokens = outputPath.split(":");
final String outputFile = tokens[tokens.length - 1];
builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
} else {
builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
}

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public static TopologyBuilder buildTopology() {
final String[] tokens = outputPath.split(":");
final String outputFile = tokens[tokens.length - 1];
builder.setBolt(sinkId, new BoltFileSink(outputFile, formatter))
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
} else {
builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
}

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitStreamBoltLocal {
public final static String topologyId = "Bolt split stream example";
Expand All @@ -41,7 +40,8 @@ public static void main(final String[] args) throws Exception {
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);
// run topology for 5 seconds
Utils.sleep(5 * 1000);

cluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitStreamSpoutLocal {
public final static String topologyId = "Spout split stream example";
Expand All @@ -41,7 +40,8 @@ public static void main(final String[] args) throws Exception {
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);
// run topology for 5 seconds
Utils.sleep(5 * 1000);

cluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.flink.storm.split;

import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.junit.Assert;
import org.junit.Test;

public class SpoutSplitITCase {

@Test
public void testTopology() throws Exception {
SplitStreamSpoutLocal.main(new String[] { "0", "/dev/null" });
Assert.assertFalse(VerifyAndEnrichBolt.errorOccured);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.flink.storm.tests;

import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
Expand Down Expand Up @@ -61,12 +61,9 @@ protected void testProgram() throws Exception {
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

// TODO kill does no do anything so far
cluster.killTopology(topologyId);
Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.flink.storm.tests;

import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
Expand All @@ -27,7 +27,6 @@
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.util.StreamingProgramTestBase;


public class StormUnionITCase extends StreamingProgramTestBase {

private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n"
Expand Down Expand Up @@ -76,12 +75,9 @@ protected void testProgram() throws Exception {

// execute program locally
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

// TODO kill does no do anything so far
cluster.killTopology(topologyId);
Config conf = new Config();
conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));
cluster.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected void postSubmit() throws Exception {

@Override
protected void testProgram() throws Exception {
WordCountLocal.main(new String[]{this.textPath, this.resultPath});
WordCountLocal.main(new String[] { this.textPath, this.resultPath });
}

}
Loading

0 comments on commit d1c93d2

Please sign in to comment.