Skip to content

Commit

Permalink
[FLINK-10571][storm] Remove topology support
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 9, 2019
1 parent 43a7497 commit 7a9a6ad
Show file tree
Hide file tree
Showing 45 changed files with 29 additions and 4,371 deletions.
53 changes: 4 additions & 49 deletions docs/dev/libs/storm_compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ under the License.
[Flink streaming]({{ site.baseurl }}/dev/datastream_api.html) is compatible with Apache Storm interfaces and therefore allows
reusing code that was implemented for Storm.

You can:

- execute a whole Storm `Topology` in Flink.
- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
You can use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.

This document shows how to use existing Storm code with Flink.

Expand Down Expand Up @@ -60,48 +57,9 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t
If you want to avoid large uber-jars, you can manually copy `storm-core-0.9.4.jar`, `json-simple-1.1.jar` and `flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster node (*before* the cluster is started).
For this case, it is sufficient to include only your own Spout and Bolt classes (and their internal dependencies) into the program jar.

# Execute Storm Topologies

Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:

- `StormSubmitter` replaced by `FlinkSubmitter`
- `NimbusClient` and `Client` replaced by `FlinkClient`
- `LocalCluster` replaced by `FlinkLocalCluster`

In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder

// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("counter");

Config conf = new Config();
if(runLocal) { // submit to test cluster
// replaces: LocalCluster cluster = new LocalCluster();
FlinkLocalCluster cluster = new FlinkLocalCluster();
cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
// optional
// conf.put(Config.NIMBUS_HOST, "remoteHost");
// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
}
{% endhighlight %}
</div>
</div>

# Embed Storm Operators in Flink Streaming Programs

As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
Spouts and Bolts can be embedded into regular streaming programs.
The Storm compatibility layer offers a wrapper classes for each, namely `SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).

Per default, both wrappers convert Storm output tuples to Flink's [Tuple]({{site.baseurl}}/dev/api_concepts.html#tuples-and-case-classes) types (ie, `Tuple0` to `Tuple25` according to the number of fields of the Storm tuples).
Expand Down Expand Up @@ -182,9 +140,8 @@ See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/fli

In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.

For embedded usage, Flink's configuration mechanism must be used.
To replicate this functionality Flink's configuration mechanism must be used.
A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
Expand All @@ -211,9 +168,8 @@ env.getConfig().setGlobalJobParameters(config);
## Multiple Output Streams

Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.

For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
The output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`.
Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already.
Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.

Expand Down Expand Up @@ -282,7 +238,6 @@ To run the examples, you need to assemble a correct jar file.

There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
Compare `pom.xml` to see how both jars are built.
Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).

You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.

Expand Down
109 changes: 0 additions & 109 deletions flink-contrib/flink-storm-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ under the License.
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class
</includes>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-storm_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -284,112 +274,13 @@ under the License.
</configuration>
</execution>

<!-- WordCount Storm topology-->
<!-- Example for whole topologies (ie, if FlinkTopology is used) -->
<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
Thus, 'defaults.yaml' is not available for maven-jar-plugin.
Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
<execution>
<id>WordCount-StormTopology</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<finalName>WordCount</finalName>
<classifier>StormTopology</classifier>
</configuration>
</execution>

<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- WordCount Storm topology-->
<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<executions>
<execution>
<id>WordCount-StormTopology</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>WordCount-StormTopology</finalName>

<artifactSet>
<includes>
<include>org.apache.storm:storm-core</include>
<!-- Storm's recursive dependencies -->
<include>org.yaml:snakeyaml</include>
<include>com.googlecode.json-simple:json-simple</include>
<include>org.apache.flink:flink-storm_${scala.binary.version}</include>
<include>org.apache.flink:flink-storm-examples_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.storm:storm-core</artifact>
<includes>
<include>defaults.yaml</include>
<include>org/apache/storm/*.class</include>
<include>org/apache/storm/topology/*.class</include>
<include>org/apache/storm/spout/*.class</include>
<include>org/apache/storm/task/*.class</include>
<include>org/apache/storm/tuple/*.class</include>
<include>org/apache/storm/generated/*.class</include>
<include>org/apache/storm/metric/**/*.class</include>
<include>org/apache/storm/utils/*.class</include>
<include>org/apache/storm/serialization/*.class</include>
<include>org/apache/storm/curator/**/*.class</include>
<include>org/apache/storm/grouping/**/*.class</include>
<include>org/apache/storm/thrift/**/*.class</include>
<!-- Storm's recursive dependencies -->
<include>org/json/simple/**/*.class</include>
<include>org/yaml/snakeyaml/**/*.class</include>
<include>org/apache/storm/shade/**/*.class</include>
</includes>
</filter>
<filter>
<artifact>org.apache.flink:flink-storm-examples_${scala.binary.version}</artifact>
<includes>
<include>org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.class
</include>
<include>org/apache/flink/storm/wordcount/WordCountTopology.class</include>
<include>org/apache/flink/storm/wordcount/operators/*.class</include>
<include>org/apache/flink/storm/util/*.class</include>
<include>org/apache/flink/storm/wordcount/util/WordCountData.class</include>
</includes>
</filter>
<filter>
<artifact>org.apache.flink:flink-storm_${scala.binary.version}</artifact>
<includes>
<include>org/apache/flink/storm/api/*.class</include>
<include>org/apache/flink/storm/util/*.class</include>
<include>org/apache/flink/storm/wrappers/*.class</include>
</includes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
Expand Down

This file was deleted.

Loading

0 comments on commit 7a9a6ad

Please sign in to comment.