Skip to content

Commit

Permalink
[BEAM-10125] adding cross-language KafkaIO integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed May 29, 2020
1 parent 6e42344 commit 9e5dd68
Showing 1 changed file with 141 additions and 0 deletions.
141 changes: 141 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Integration test for Python cross-language pipelines for Java KafkaIO."""

from __future__ import absolute_import

import contextlib
import logging
import os
import socket
import subprocess
import time
import typing
import unittest

import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.io.external.kafka import WriteToKafka
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline


class CrossLanguageKafkaIO(object):
def __init__(self, bootstrap_servers, topic, expansion_service=None):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.expansion_service = expansion_service
self.sum_counter = Metrics.counter('source', 'elements_sum')

def build_write_pipeline(self, pipeline):
_ = (
pipeline
| 'Impulse' >> beam.Impulse()
| 'Generate' >> beam.FlatMap(lambda x: range(1000)) # pylint: disable=range-builtin-not-iterating
| 'Reshuffle' >> beam.Reshuffle()
| 'MakeKV' >> beam.Map(lambda x:
(b'', str(x).encode())).with_output_types(
typing.Tuple[bytes, bytes])
| 'WriteToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': self.bootstrap_servers},
topic=self.topic,
expansion_service=self.expansion_service))

def build_read_pipeline(self, pipeline):
_ = (
pipeline
| 'ReadFromKafka' >> ReadFromKafka(
consumer_config={
'bootstrap.servers': self.bootstrap_servers,
'auto.offset.reset': 'earliest'
},
topics=[self.topic],
expansion_service=self.expansion_service)
| 'Windowing' >> beam.WindowInto(
beam.window.FixedWindows(300),
trigger=beam.transforms.trigger.AfterProcessingTime(60),
accumulation_mode=beam.transforms.trigger.AccumulationMode.
DISCARDING)
| 'DecodingValue' >> beam.Map(lambda elem: int(elem[1].decode()))
| 'CombineGlobally' >> beam.CombineGlobally(sum).without_defaults()
| 'SetSumCounter' >> beam.Map(self.sum_counter.inc))

def run_xlang_kafkaio(self, pipeline):
self.build_write_pipeline(pipeline)
self.build_read_pipeline(pipeline)
pipeline.run(False)


@unittest.skipUnless(
os.environ.get('LOCAL_KAFKA_JAR'),
"LOCAL_KAFKA_JAR environment var is not provided.")
class CrossLanguageKafkaIOTest(unittest.TestCase):
def get_open_port(self):
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except: # pylint: disable=bare-except
# Above call will fail for nodes that only support IPv6.
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind(('localhost', 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port

@contextlib.contextmanager
def local_kafka_service(self, local_kafka_jar_file):
kafka_port = str(self.get_open_port())
zookeeper_port = str(self.get_open_port())
kafka_server = None
try:
kafka_server = subprocess.Popen(
['java', '-jar', local_kafka_jar_file, kafka_port, zookeeper_port])
time.sleep(3)
yield kafka_port
finally:
if kafka_server:
kafka_server.kill()

def get_options(self):
options = PipelineOptions([
'--runner',
'FlinkRunner',
'--parallelism',
'2',
'--experiment',
'beam_fn_api'
])
return options

def test_kafkaio_write(self):
local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
with self.local_kafka_service(local_kafka_jar) as kafka_port:
options = self.get_options()
p = TestPipeline(options=options)
p.not_use_test_runner_api = True
CrossLanguageKafkaIO('localhost:%s' % kafka_port,
'xlang_kafkaio_test').build_write_pipeline(p)
job = p.run()
job.wait_until_finish()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()

0 comments on commit 9e5dd68

Please sign in to comment.