Skip to content

Commit

Permalink
This closes apache/beam-site#206
Browse files Browse the repository at this point in the history
  • Loading branch information
davorbonaci committed Apr 21, 2017
2 parents bd6db50 + 1ebae83 commit 17d4de1
Showing 1 changed file with 92 additions and 3 deletions.
95 changes: 92 additions & 3 deletions website/src/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ In Beam SDK each transform has a generic `apply` method <span class="language-py
[Output PCollection] = [Input PCollection] | [Transform]
```

Because Beam uses a generic `apply` method for `PCollection`, you can both chain transforms sequentially and also apply transforms that contain other transforms nested within (called **composite transforms** in the Beam SDKs).
Because Beam uses a generic `apply` method for `PCollection`, you can both chain transforms sequentially and also apply transforms that contain other transforms nested within (called [composite transforms](#transforms-composite) in the Beam SDKs).

How you apply your pipeline's transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where the nodes are `PCollection`s and the edges are transforms. For example, you can chain transforms to create a sequential pipeline, like this one:
Expand Down Expand Up @@ -260,7 +260,7 @@ The resulting workflow graph from the branching pipeline above looks like this:
[Branching Graph Graphic]
You can also build your own composite transforms that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.
You can also build your own [composite transforms](#transforms-composite) that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.
### Transforms in the Beam SDK
Expand Down Expand Up @@ -943,7 +943,96 @@ While `ParDo` always produces a main output `PCollection` (as the return value f
## <a name="transforms-composite"></a>Composite Transforms
> **Note:** This section is in progress ([BEAM-1452](https://issues.apache.org/jira/browse/BEAM-1452)).
Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one `ParDo`, `Combine`, `GroupByKey`, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.
The Beam SDK comes packed with many useful composite transforms. See the API reference pages for a list of transforms:
* [Pre-written Beam transforms for Java]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/package-summary.html)
* [Pre-written Beam transforms for Python]({{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.transforms.html)
### An example of a composite transform
The `CountWords` transform in the [WordCount example program]({{ site.baseurl }}/get-started/wordcount-example/) is an example of a composite transform. `CountWords` is a `PTransform` subclass that consists of multiple nested transforms.
In its `expand` method, the `CountWords` transform applies the following transform operations:
1. It applies a `ParDo` on the input `PCollection` of text lines, producing an output `PCollection` of individual words.
2. It applies the Beam SDK library transform `Count` on the `PCollection` of words, producing a `PCollection` of key/value pairs. Each key represents a word in the text, and each value represents the number of times that word appeared in the original data.
Note that this is also an example of nested composite transforms, as `Count` is, by itself, a composite transform.
Your composite transform's parameters and return value must match the initial input type and final return type for the entire transform, even if the transform's intermediate data changes type multiple times.
```java
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
```
```py
Python code snippet coming soon (BEAM-1926)
```
### Creating a composite transform
To create your own composite transform, create a subclass of the `PTransform` class and override the `expand` method to specify the actual processing logic. You can then use this transform just as you would a built-in transform from the Beam SDK.
{:.language-java}
For the `PTransform` class type parameters, you pass the `PCollection` types that your transform takes as input, and produces as output. To take multiple `PCollection`s as input, or produce multiple `PCollection`s as output, use one of the multi-collection types for the relevant type parameter.
The following code sample shows how to declare a `PTransform` that accepts a `PCollection` of `String`s for input, and outputs a `PCollection` of `Integer`s:
```java
static class ComputeWordLengths
extends PTransform<PCollection<String>, PCollection<Integer>> {
...
}
```
```py
Python code snippet coming soon (BEAM-1926)
```
#### Overriding the expand method
Within your `PTransform` subclass, you'll need to override the `expand` method. The `expand` method is where you add the processing logic for the `PTransform`. Your override of `expand` must accept the appropriate type of input `PCollection` as a parameter, and specify the output `PCollection` as the return value.

The following code sample shows how to override `expand` for the `ComputeWordLengths` class declared in the previous example:

```java
static class ComputeWordLengths
extends PTransform<PCollection<String>, PCollection<Integer>> {
@Override
public PCollection<Integer> expand(PCollection<String>) {
...
// transform logic goes here
...
}
```

```py
Python code snippet coming soon (BEAM-1926)
```

As long as you override the `expand` method in your `PTransform` subclass to accept the appropriate input `PCollection`(s) and return the corresponding output `PCollection`(s), you can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.

**Note:** The `expand` method of a `PTransform` is not meant to be invoked directly by the user of a transform. Instead, you should call the `apply` method on the `PCollection` itself, with the transform as an argument. This allows transforms to be nested within the structure of your pipeline.

#### PTransform Style Guide

When you create a new `PTransform`, be sure to read the [PTransform Style Guide]({{ site.baseurl }}/contribute/ptransform-style-guide/). The guide contains additional helpful information such as style guidelines, logging and testing guidance, and language-specific considerations.

## <a name="io"></a>Pipeline I/O

Expand Down

0 comments on commit 17d4de1

Please sign in to comment.