Skip to content

Commit

Permalink
[ BEAM-3788] Updates kafka.py pydocs (apache#11928)
Browse files Browse the repository at this point in the history
* Updates kafka.py pydocs
  • Loading branch information
chamikaramj committed Jun 9, 2020
1 parent 564cef0 commit bfa8019
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 29 deletions.
87 changes: 62 additions & 25 deletions sdks/python/apache_beam/io/external/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,67 @@
# limitations under the License.
#

"""
PTransforms for supporting Kafka in Python pipelines. These transforms do not
run a Kafka client in Python. Instead, they expand to ExternalTransforms
which the Expansion Service resolves to the Java SDK's KafkaIO. In other
words: they are cross-language transforms.
Note: To use these transforms, you need to start a Java Expansion Service.
Please refer to the portability documentation on how to do that. Flink Users
can use the built-in Expansion Service of the Flink Runner's Job Server. The
expansion service address has to be provided when instantiating the
transforms.
If you start Flink's Job Server, the expansion service will be started on
port 8097. This is also the configured default for this transform. For a
different address, please set the expansion_service parameter.
For more information see:
- https://beam.apache.org/documentation/runners/flink/
"""Unbounded source and sink transforms for
`Kafka <href="https://kafka.apache.org/>`_.
These transforms are currently supported by Beam portable runners (for
example, portable Flink and Spark) as well as Dataflow runner.
**Setup**
Transforms provided in this module are cross-language transforms
implemented in the Beam Java SDK. During the pipeline construction, Python SDK
will connect to a Java expansion service to expand these transforms.
To facilitate this, a small amount of setup is needed before using these
transforms in a Beam Python pipeline.
There are several ways to setup cross-language Kafka transforms.
* Option 1: use the default expansion service
* Option 2: specify a custom expansion service
See below for details regarding each of these options.
*Option 1: Use the default expansion service*
This is the recommended and easiest setup option for using Python Kafka
transforms. This option is only available for Beam 2.22.0 and later.
This option requires following pre-requisites before running the Beam
pipeline.
* Install Java runtime in the computer from where the pipeline is constructed
and make sure that 'java' command is available.
In this option, Python SDK will either download (for released Beam version) or
build (when running from a Beam Git clone) a expansion service jar and use
that to expand transforms. Currently Kafka transforms use the
'beam-sdks-java-io-expansion-service' jar for this purpose.
*Option 2: specify a custom expansion service*
In this option, you startup your own expansion service and provide that as
a parameter when using the transforms provided in this module.
This option requires following pre-requisites before running the Beam
pipeline.
* Startup your own expansion service.
* Update your pipeline to provide the expansion service address when
initiating Kafka transforms provided in this module.
Flink Users can use the built-in Expansion Service of the Flink Runner's
Job Server. If you start Flink's Job Server, the expansion service will be
started on port 8097. For a different address, please set the
expansion_service parameter.
**More information**
For more information regarding cross-language transforms see:
- https://beam.apache.org/roadmap/portability/
For more information specific to Flink runner see:
- https://beam.apache.org/documentation/runners/flink/
"""

# pytype: skip-file
Expand Down Expand Up @@ -68,11 +110,7 @@ class ReadFromKafka(ExternalTransform):
each item in the specified Kafka topics. If no Kafka Deserializer for
key/value is provided, then the data will be returned as a raw byte array.
Note: Runners need to support translating Read operations in order to use
this source. At the moment only the Flink Runner supports this.
Experimental; no backwards compatibility guarantees. It requires special
preparation of the Java SDK. See BEAM-7870.
Experimental; no backwards compatibility guarantees.
"""

# Returns the key/value data as raw byte arrays
Expand Down Expand Up @@ -135,8 +173,7 @@ class WriteToKafka(ExternalTransform):
If no Kafka Serializer for key/value is provided, then key/value are
assumed to be byte arrays.
Experimental; no backwards compatibility guarantees. It requires special
preparation of the Java SDK. See BEAM-7870.
Experimental; no backwards compatibility guarantees.
"""

# Default serializer which passes raw bytes to Kafka
Expand Down
4 changes: 0 additions & 4 deletions website/www/site/content/en/documentation/io/built-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ This table contains I/O transforms that are currently planned or in-progress. St
<td>Apache DistributedLog</td><td>Java</td>
<td><a href="https://issues.apache.org/jira/browse/BEAM-607">BEAM-607</a></td>
</tr>
<tr>
<td>Apache Kafka</td><td>Python</td>
<td><a href="https://issues.apache.org/jira/browse/BEAM-3788">BEAM-3788</a></td>
</tr>
<tr>
<td>Apache Sqoop</td><td>Java</td>
<td><a href="https://issues.apache.org/jira/browse/BEAM-67">BEAM-67</a></td>
Expand Down

0 comments on commit bfa8019

Please sign in to comment.