Skip to content

Commit

Permalink
Merge pull request apache#11634 from Change TestStreamImpl to a produ…
Browse files Browse the repository at this point in the history
…cer/consumer pattern

Change TestStreamImpl to a producer/consumer pattern
  • Loading branch information
pabloem committed May 15, 2020
2 parents ac190b8 + 4a69c9a commit 578694b
Showing 1 changed file with 58 additions and 12 deletions.
70 changes: 58 additions & 12 deletions sdks/python/apache_beam/runners/direct/test_stream_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import itertools
import logging
from queue import Empty as EmptyException
from queue import Queue
from threading import Thread

from apache_beam import ParDo
from apache_beam import coders
Expand Down Expand Up @@ -61,6 +64,10 @@
_LOGGER = logging.getLogger(__name__)


class _EndOfStream:
pass


class _WatermarkController(PTransform):
"""A runner-overridable PTransform Primitive to control the watermark.
Expand Down Expand Up @@ -263,8 +270,12 @@ def events_from_script(events):
return itertools.chain(events)

@staticmethod
def events_from_rpc(endpoint, output_tags, coder):
def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive):
"""Yields the events received from the given endpoint.
This is the producer thread that reads events from the TestStreamService and
puts them onto the shared queue. At the end of the stream, an _EndOfStream
is placed on the channel to signify a successful end.
"""
stub_channel = grpc.insecure_channel(endpoint)
stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel)
Expand All @@ -273,20 +284,55 @@ def events_from_rpc(endpoint, output_tags, coder):
event_request = beam_runner_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])

event_stream = stub.Events(event_request, timeout=30)
try:
while True:
yield _TestStream.test_stream_payload_to_events(
next(event_stream), coder)
except StopIteration:
return
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
event_stream = stub.Events(event_request)
for e in event_stream:
channel.put(_TestStream.test_stream_payload_to_events(e, coder))
if not is_alive():
return
channel.put(_EndOfStream())

@staticmethod
def events_from_rpc(endpoint, output_tags, coder):
"""Yields the events received from the given endpoint.
This method starts a new thread that reads from the TestStreamService and
puts the events onto a shared queue. This method then yields all elements
from the queue. Unfortunately, this is necessary because the GRPC API does
not allow for non-blocking calls when utilizing a streaming RPC. It is
officially suggested from the docs to use a producer/consumer pattern to
handle streaming RPCs. By doing so, this gives this method control over when
to cancel reading from the RPC if the server takes too long to respond.
"""
# Shared variable with the producer queue. This shuts down the producer if
# the consumer exits early.
is_alive = True

# The shared queue that allows the producer and consumer to communicate.
channel = Queue() # type: Queue[Union[test_stream.Event, _EndOfStream]]
event_stream = Thread(
target=_TestStream._stream_events_from_rpc,
args=(endpoint, output_tags, coder, channel, lambda: is_alive))
event_stream.setDaemon(True)
event_stream.start()

# This pumps the shared queue for events until the _EndOfStream sentinel is
# reached. If the TestStreamService takes longer than expected, the queue
# will timeout and an EmptyException will be raised. This also sets the
# shared is_alive sentinel to shut down the producer.
while True:
try:
# Raise an EmptyException if there are no events during the last timeout
# period.
event = channel.get(timeout=30)
if isinstance(event, _EndOfStream):
break
yield event
except EmptyException as e:
_LOGGER.warning(
'TestStream timed out waiting for new events from service.'
' Stopping pipeline.')
return
raise e
is_alive = False
raise e

@staticmethod
def test_stream_payload_to_events(payload, coder):
Expand Down

0 comments on commit 578694b

Please sign in to comment.