diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md index 103b605f2c2d9..fe6bf35b68ef6 100644 --- a/docs/apis/storm_compatibility.md +++ b/docs/apis/storm_compatibility.md @@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes: -- `TopologyBuilder` replaced by `FlinkTopologyBuilder` - `StormSubmitter` replaced by `FlinkSubmitter` - `NimbusClient` and `Client` replaced by `FlinkClient` - `LocalCluster` replaced by `FlinkLocalCluster` In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology. -The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*. -If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. -If a parameter is not specified, the value is taken from `flink-conf.yaml`. +The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*. +If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`.
~~~java -FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology(); +TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder // actual topology assembling code and used Spouts/Bolts can be used as-is builder.setSpout("source", new FileSpout(inputFilePath)); @@ -81,12 +79,12 @@ builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("count Config conf = new Config(); if(runLocal) { // submit to test cluster FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("WordCount", conf, builder.createTopology()); + cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); } else { // submit to remote cluster // optional // conf.put(Config.NIMBUS_HOST, "remoteHost"); // conf.put(Config.NIMBUS_THRIFT_PORT, 6123); - FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); + FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology()); } ~~~
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml index cee6cac94ae00..6f3a0506e96ec 100644 --- a/flink-contrib/flink-storm-examples/pom.xml +++ b/flink-contrib/flink-storm-examples/pom.xml @@ -61,6 +61,14 @@ under the License. ${project.version} test + + + org.apache.storm + storm-starter + 0.9.4 + + + @@ -226,7 +234,7 @@ under the License. - +