Skip to content

Commit

Permalink
Merge pull request apache#11663 from [BEAM-9767]: Fix flaky streaming…
Browse files Browse the repository at this point in the history
… wordcount

[BEAM-9767]: Fix flaky streaming wordcount
  • Loading branch information
pabloem committed May 11, 2020
2 parents 6d5a5cf + 45ddff8 commit 5d33f9c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,13 @@ def read(self, *labels):

reader = StreamingCacheSource(
self._cache_dir, labels, self._is_cache_complete).read(tail=False)
header = next(reader)

# Return an empty iterator if there is nothing in the file yet. This can
# only happen when tail is False.
try:
header = next(reader)
except StopIteration:
return iter([]), -1
return StreamingCache.Reader([header], [reader]).read(), 1

def read_multiple(self, labels):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ def test_exists(self):
cache.write([TestStreamFileRecord()], 'my_label')
self.assertTrue(cache.exists('my_label'))

def test_empty(self):
CACHED_PCOLLECTION_KEY = repr(CacheKey('arbitrary_key', '', '', ''))

cache = StreamingCache(cache_dir=None)
self.assertFalse(cache.exists(CACHED_PCOLLECTION_KEY))
cache.write([], CACHED_PCOLLECTION_KEY)
reader, _ = cache.read(CACHED_PCOLLECTION_KEY)

# Assert that an empty reader returns an empty list.
self.assertFalse([e for e in reader])

def test_single_reader(self):
"""Tests that we expect to see all the correctly emitted TestStreamPayloads.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def process(self, element):
@unittest.skipIf(
sys.version_info < (3, 5, 3),
'The tests require at least Python 3.6 to work.')
@timeout(30)
@timeout(60)
def test_streaming_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
def process(self, element):
Expand Down

0 comments on commit 5d33f9c

Please sign in to comment.