Skip to content

Commit

Permalink
[FLINK-2304] Add named attribute access to Storm compatibility layer
Browse files Browse the repository at this point in the history
  - extended FlinkTuple to enable named attribute access
  - extended BoltWrapper for user defined input schema
  - extended FlinkTopologyBuilder to handle declared output schemas
  - adapted JUnit tests
  - added new examples and ITCases
  - updated READMEs
  - updated documentation

Closes apache#878
  • Loading branch information
mjsax authored and gyfora committed Jul 22, 2015
1 parent 148395b commit 0332050
Show file tree
Hide file tree
Showing 28 changed files with 1,430 additions and 132 deletions.
33 changes: 26 additions & 7 deletions docs/apis/storm_compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This document shows how to use existing Storm code with Flink.
* This will be replaced by the TOC
{:toc}

### Project Configuration
# Project Configuration

Support for Storm is contained in the `flink-storm-compatibility-core` Maven module.
The code resides in the `org.apache.flink.stormcompatibility` package.
Expand All @@ -51,7 +51,7 @@ Add the following dependency to your `pom.xml` if you want to execute Storm code

**Please note**: `flink-storm-compatibility-core` is not part of the provided binary Flink distribution. Thus, you need to include `flink-storm-compatiblitly-core` classes (and their dependencies) in your program jar that is submitted to Flink's JobManager.

### Execute Storm Topologies
# Execute Storm Topologies

Flink provides a Storm compatible API (`org.apache.flink.stormcompatibility.api`) that offers replacements for the following classes:

Expand Down Expand Up @@ -88,18 +88,18 @@ if(runLocal) { // submit to test cluster
</div>
</div>

### Embed Storm Operators in Flink Streaming Programs
# Embed Storm Operators in Flink Streaming Programs

As an alternative, Spouts and Bolts can be embedded into regular streaming programs.
The Storm compatibility layer offers a wrapper classes for each, namely `StormSpoutWrapper` and `StormBoltWrapper` (`org.apache.flink.stormcompatibility.wrappers`).

Per default, both wrappers convert Storm output tuples to Flink's `Tuple` types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
Per default, both wrappers convert Storm output tuples to Flink's [Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm tuples).
For single field output tuples a conversion to the field's data type is also possible (eg, `String` instead of `Tuple1<String>`).

Because Flink cannot infer the output field types of Storm operators, it is required to specify the output type manually.
In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` can be used.

#### Embed Spouts
## Embed Spouts

In order to use a Spout as Flink source, use `StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that serves as first argument to `addSource(...)`.
Expand All @@ -126,7 +126,7 @@ Using `StormFiniteSpoutWrapper` allows the Flink program to shut down automatica
If `StormSpoutWrapper` is used, the program will run until it is [canceled](cli.html) manually.


#### Embed Bolts
## Embed Bolts

In order to use a Bolt as Flink operator, use `DataStream.transform(String, TypeInformation, OneInputStreamOperator)`.
The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` that serves as last argument to `transform(...)`.
Expand All @@ -149,7 +149,26 @@ DataStream<Tuple2<String, Integer>> counts = text.transform(
</div>
</div>

### Storm Compatibility Examples
### Named Attribute Access for Embedded Bolts

Bolts can accesses input tuple fields via name (additionally to access via index).
To use this feature with embedded Bolts, you need to have either a

1. [POJO](programming_guide.html#pojos) type input stream or
2. [Tuple](programming_guide.html#tuples-and-case-classes) type input stream and spedify the input schema (ie, name-to-index-mapping)

For POJO input types, Flink accesses the fields via reflection.
For this case, Flink expects either a corresponding public member variable or public getter method.
For example, if a Bolt accesses a field via name `sentence` (eg, `String s = input.getStringByField("sentence");`), the input POJO class must have a member variable `public String sentence;` or method `public String getSentence() { ... };` (pay attention to camel-case naming).

For `Tuple` input types, it is required to specify the input schema using Storm's `Fields` class.
For this case, the constructor of `StormBoltWrapper` takes an additional argument: `new StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(new StormBoltTokenizerByName(), new Fields("sentence"))`.
The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.getStringByField("sentence")` is equivalent to `input.getString(0)`.

See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.

# Storm Compatibility Examples

You can find more examples in Maven module `flink-storm-compatibilty-examples`.
For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a
The following Strom features are not (yet/fully) supported by the compatibility layer right now:
* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
* access to tuple attributes (ie, fields) only by index (access by name is coming)
* only default stream is supported currently (ie, only a single output stream)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {

/** the declared output schema */
private Fields outputSchema;
Fields outputSchema;

@Override
public void declare(final Fields fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class FlinkTopologyBuilder {
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
/** All user bolts by their ID */
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
/** All declared output schemas by operator ID */
private final HashMap<String, Fields> outputSchemas = new HashMap<String, Fields>();

/**
* Creates a Flink program that used the specified spouts and bolts.
Expand All @@ -79,6 +83,7 @@ public FlinkTopology createTopology() {

final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
this.outputSchemas.put(spoutId, declarer.outputSchema);

/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
* and StormCollector)
Expand Down Expand Up @@ -118,6 +123,7 @@ public FlinkTopology createTopology() {

final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userBolt.declareOutputFields(declarer);
this.outputSchemas.put(boltId, declarer.outputSchema);

final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();

Expand Down Expand Up @@ -162,7 +168,7 @@ public FlinkTopology createTopology() {
final TypeInformation<?> outType = declarer.getOutputType();

final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
new StormBoltWrapper(userBolt));
new StormBoltWrapper(userBolt, this.outputSchemas.get(producerId)));
if (outType != null) {
// only for non-sink nodes
availableOperators.put(boltId, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOp
}
} else {
throw new UnsupportedOperationException(
"SimpleStormBoltWrapper can handle not more then 25 attributes, but "
"Flink cannot handle more then 25 attributes, but "
+ this.numberOfAttributes + " are declared by the given bolt");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.stormcompatibility.wrappers;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
Expand All @@ -29,7 +29,6 @@
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;



Expand All @@ -53,6 +52,8 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
private final IRichBolt bolt;
/** Number of attributes of the bolt's output tuples */
private final int numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
private final Fields inputSchema;

/**
* We have to use this because Operators must output
Expand All @@ -61,25 +62,44 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
private TimestampedCollector<OUT> flinkCollector;

/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
* such that it can be used within a Flink streaming program. The output type will be one of
* {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's
* declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
this(bolt, false);
this(bolt, null, false);
}

/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
* such that it can be used within a Flink streaming program. The output type can be any type if
* parameter {@code rawOutput} is {@code true} and the bolt's number of declared output tuples
* is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
* {@link Tuple25} depending on the bolt's declared number of attributes.
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple1} to {@link Tuple25}. The output type will be one of {@link Tuple1} to {@link Tuple25} depending on
* the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
throws IllegalArgumentException {
this(bolt, inputSchema, false);
}

/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
* for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
* bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
* of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
Expand All @@ -91,8 +111,34 @@ public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
* not 1 or if {@code rawOuput} is {@code false} and the number of declared output
* attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput)
throws IllegalArgumentException {
this(bolt, null, rawOutput);
}

/**
* Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
* used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple1} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutput
* Set to {@code true} if a single attribute output stream, should not be of type
* {@link Tuple1} but be of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is
* not 1 or if {@code rawOuput} is {@code false} and the number of declared output
* attributes is not with range [1;25].
*/
public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, final boolean rawOutput)
throws IllegalArgumentException {
this.bolt = bolt;
this.inputSchema = inputSchema;
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
}

Expand All @@ -101,7 +147,7 @@ public void open(final Configuration parameters) throws Exception {
super.open(parameters);

final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
(StreamingRuntimeContext)super.runtimeContext, false);
super.runtimeContext, false);
flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;

Expand All @@ -122,11 +168,12 @@ public void close() throws Exception {
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
flinkCollector.setTimestamp(element.getTimestamp());
this.bolt.execute(new StormTuple<IN>(element.getValue()));
this.bolt.execute(new StormTuple<IN>(element.getValue(), inputSchema));
}

@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}

}
Loading

0 comments on commit 0332050

Please sign in to comment.