Skip to content

Commit

Permalink
Merge pull request apache#11440 from [BEAM-9767] Add a timeout to the…
Browse files Browse the repository at this point in the history
… TestStream GRPC and fix the Streaming cache timeout

[BEAM-9767] Add a timeout to the TestStream GRPC and fix the Streaming cache timeout
  • Loading branch information
pabloem committed Apr 21, 2020
2 parents 623c5ed + fc37a04 commit 7acbff4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
20 changes: 17 additions & 3 deletions sdks/python/apache_beam/runners/direct/test_stream_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from __future__ import print_function

import itertools
import logging

from apache_beam import ParDo
from apache_beam import coders
Expand Down Expand Up @@ -57,6 +58,8 @@
'Exception: grpc was not able to be imported. '
'Skip importing all grpc related moduels.')

_LOGGER = logging.getLogger(__name__)


class _WatermarkController(PTransform):
"""A runner-overridable PTransform Primitive to control the watermark.
Expand Down Expand Up @@ -270,9 +273,20 @@ def events_from_rpc(endpoint, output_tags, coder):
event_request = beam_runner_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])

event_stream = stub.Events(event_request)
for e in event_stream:
yield _TestStream.test_stream_payload_to_events(e, coder)
event_stream = stub.Events(event_request, timeout=30)
try:
while True:
yield _TestStream.test_stream_payload_to_events(
next(event_stream), coder)
except StopIteration:
return
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
_LOGGER.warning(
'TestStream timed out waiting for new events from service.'
' Stopping pipeline.')
return
raise e

@staticmethod
def test_stream_payload_to_events(payload, coder):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,11 @@ def __init__(
def _wait_until_file_exists(self, timeout_secs=30):
"""Blocks until the file exists for a maximum of timeout_secs.
"""
now_secs = time.time()
timeout_timestamp_secs = now_secs + timeout_secs

# Wait for up to `timeout_secs` for the file to be available.
start = time.time()
while not os.path.exists(self._path):
time.sleep(1)
if time.time() - start > timeout_timestamp_secs:
if time.time() - start > timeout_secs:
from apache_beam.runners.interactive.pipeline_instrument import CacheKey
pcollection_var = CacheKey.from_str(self._labels[-1]).var
raise RuntimeError(
Expand Down

0 comments on commit 7acbff4

Please sign in to comment.