From 88636799ee68cae80b093436ff2e5eace0675b95 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 12 Nov 2015 14:39:45 +0100 Subject: [PATCH] [FLINK-2837][storm] various improvements for the compatibility layer - refactor to use Storm's topology builder - remove FlinkTopologyBuilder - instantiate context-based StreamExecutionEnvironment (local or remote) - remove some of the Flink and Storm behavior replicating classes - modify FlinkTopology to parse Storm topology directly - replace StormTestBase with StreamingTestBase - add print example - FlinkTopologyBuilder changes (check if all inputs are available before processing) - correct package typo - two input support - add join example - update docs This closes #1398. --- docs/apis/storm_compatibility.md | 12 +- flink-contrib/flink-storm-examples/pom.xml | 10 +- .../ExclamationLocal.java | 14 +- .../ExclamationTopology.java | 14 +- .../ExclamationWithBolt.java | 7 +- .../ExclamationWithSpout.java | 5 +- .../operators/ExclamationBolt.java | 2 +- .../flink/storm/join/SingleJoinExample.java | 88 ++++ .../flink/storm/print/PrintSampleStream.java | 61 +++ .../flink/storm/util/FiniteFileSpout.java | 2 - .../flink/storm/util/FiniteInMemorySpout.java | 3 - .../wordcount/BoltTokenizerWordCount.java | 1 - .../wordcount/BoltTokenizerWordCountPojo.java | 1 - .../BoltTokenizerWordCountWithNames.java | 1 - .../flink/storm/wordcount/WordCountLocal.java | 13 +- .../storm/wordcount/WordCountLocalByName.java | 13 +- .../wordcount/WordCountRemoteByClient.java | 7 +- .../wordcount/WordCountRemoteBySubmitter.java | 7 +- .../storm/wordcount/WordCountTopology.java | 11 +- .../operators/WordCountInMemorySpout.java | 7 +- .../ExclamationWithBoltITCase.java | 5 +- .../ExclamationWithSpoutITCase.java | 5 +- .../StormExclamationLocalITCase.java | 5 +- .../flink/storm/join/SingleJoinITCase.java | 58 +++ .../flink/storm/split/SplitBoltTopology.java | 8 +- .../flink/storm/split/SplitSpoutTopology.java | 8 +- .../storm/split/SplitStreamBoltLocal.java | 16 +- .../storm/split/SplitStreamSpoutLocal.java | 16 +- .../tests/StormFieldsGroupingITCase.java | 11 +- .../storm/tests/operators/TaskIdBolt.java | 3 + .../flink/storm/util/StormTestBase.java | 117 ----- .../BoltTokenizerWordCountITCase.java | 5 +- .../BoltTokenizerWordCountPojoITCase.java | 5 +- ...BoltTokenizerWordCountWithNamesITCase.java | 5 +- .../wordcount/SpoutSourceWordCountITCase.java | 5 +- .../storm/wordcount/WordCountLocalITCase.java | 5 +- .../wordcount/WordCountLocalNamedITCase.java | 6 +- .../src/test/resources/log4j-test.properties | 2 +- .../apache/flink/storm/api/FlinkClient.java | 4 +- .../flink/storm/api/FlinkLocalCluster.java | 23 +- .../storm/api/FlinkOutputFieldsDeclarer.java | 1 - .../apache/flink/storm/api/FlinkTopology.java | 483 ++++++++++++++++-- .../flink/storm/api/FlinkTopologyBuilder.java | 421 --------------- .../apache/flink/storm/util/StormConfig.java | 7 +- .../flink/storm/util/StormStreamSelector.java | 8 +- .../flink/storm/wrappers/BoltWrapper.java | 89 ++-- .../storm/wrappers/BoltWrapperTwoInput.java | 134 +++++ .../storm/wrappers/FlinkTopologyContext.java | 3 +- .../wrappers/SetupOutputFieldsDeclarer.java | 4 +- .../flink/storm/wrappers/SpoutCollector.java | 3 +- .../flink/storm/wrappers/SpoutWrapper.java | 8 +- .../flink/storm/wrappers/StormTuple.java | 54 +- .../storm/wrappers/WrapperSetupHelper.java | 23 +- .../api/FlinkOutputFieldsDeclarerTest.java | 2 - .../storm/api/FlinkTopologyBuilderTest.java | 81 --- .../flink/storm/api/FlinkTopologyTest.java | 73 ++- .../org/apache/flink/storm/api/TestBolt.java | 4 +- .../org/apache/flink/storm/api/TestSpout.java | 4 +- .../flink/storm/api/TestTopologyBuilder.java | 29 -- .../storm/util/StormStreamSelectorTest.java | 6 +- .../flink/storm/util/TestDummyBolt.java | 4 +- .../flink/storm/util/TestDummySpout.java | 4 +- .../org/apache/flink/storm/util/TestSink.java | 8 +- .../storm/wrappers/BoltCollectorTest.java | 2 - .../flink/storm/wrappers/BoltWrapperTest.java | 11 +- .../wrappers/FlinkTopologyContextTest.java | 6 +- .../SetupOutputFieldsDeclarerTest.java | 2 - .../storm/wrappers/SpoutCollectorTest.java | 2 - .../storm/wrappers/SpoutWrapperTest.java | 1 - .../flink/storm/wrappers/StormTupleTest.java | 37 +- .../wrappers/WrapperSetupHelperTest.java | 36 +- 71 files changed, 1132 insertions(+), 1009 deletions(-) rename flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/{excamation => exclamation}/ExclamationLocal.java (86%) rename flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/{excamation => exclamation}/ExclamationTopology.java (93%) rename flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/{excamation => exclamation}/ExclamationWithBolt.java (97%) rename flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/{excamation => exclamation}/ExclamationWithSpout.java (99%) rename flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/{excamation => exclamation}/operators/ExclamationBolt.java (97%) create mode 100644 flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java create mode 100644 flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java create mode 100644 flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java delete mode 100644 flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java delete mode 100644 flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java create mode 100644 flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java delete mode 100644 flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java delete mode 100644 flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md index 103b605f2c2d9..fe6bf35b68ef6 100644 --- a/docs/apis/storm_compatibility.md +++ b/docs/apis/storm_compatibility.md @@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes: -- `TopologyBuilder` replaced by `FlinkTopologyBuilder` - `StormSubmitter` replaced by `FlinkSubmitter` - `NimbusClient` and `Client` replaced by `FlinkClient` - `LocalCluster` replaced by `FlinkLocalCluster` In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology. -The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*. -If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. -If a parameter is not specified, the value is taken from `flink-conf.yaml`. +The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*. +If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`.
~~~java -FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology(); +TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder // actual topology assembling code and used Spouts/Bolts can be used as-is builder.setSpout("source", new FileSpout(inputFilePath)); @@ -81,12 +79,12 @@ builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("count Config conf = new Config(); if(runLocal) { // submit to test cluster FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("WordCount", conf, builder.createTopology()); + cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); } else { // submit to remote cluster // optional // conf.put(Config.NIMBUS_HOST, "remoteHost"); // conf.put(Config.NIMBUS_THRIFT_PORT, 6123); - FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); } ~~~
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml index cee6cac94ae00..6f3a0506e96ec 100644 --- a/flink-contrib/flink-storm-examples/pom.xml +++ b/flink-contrib/flink-storm-examples/pom.xml @@ -61,6 +61,14 @@ under the License. ${project.version} test + + + org.apache.storm + storm-starter + 0.9.4 + + + @@ -226,7 +234,7 @@ under the License. - +