Skip to content

Commit

Permalink
Minor cleanup of tests using TestStream. (apache#10188)
Browse files Browse the repository at this point in the history
* Minor cleanup of tests using TestStream. Added advance_watermark_to_infinity to end of TestStream.

* Combines multiple lines into one.
  • Loading branch information
acrites authored and lukecwik committed Dec 20, 2019
1 parent 7d05c3b commit b06fb9d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 29 deletions.
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/fileio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def test_streaming_complex_timing(self):
# TODO(BEAM-3759): Add many firings per window after getting PaneInfo.
ts.advance_processing_time(5)
ts.advance_watermark_to(timestamp)
ts.advance_watermark_to_infinity()

def no_colon_file_naming(*args):
file_name = fileio.destination_prefix_naming()(*args)
Expand Down Expand Up @@ -572,7 +573,8 @@ def test_streaming_different_file_types(self):
.add_elements([next(input), next(input)])
.advance_watermark_to(30)
.add_elements([next(input), next(input)])
.advance_watermark_to(40))
.advance_watermark_to(40)
.advance_watermark_to_infinity())

def no_colon_file_naming(*args):
file_name = fileio.destination_prefix_naming()(*args)
Expand Down
41 changes: 14 additions & 27 deletions sdks/python/apache_beam/transforms/trigger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,19 +422,6 @@ def test_trigger_encoding(self):

class TriggerPipelineTest(unittest.TestCase):

def setUp(self):
# Use state on the TestCase class, since other references would be pickled
# into a closure and not have the desired side effects.
TriggerPipelineTest.all_records = []

def record_dofn(self):
class RecordDoFn(beam.DoFn):

def process(self, element):
TriggerPipelineTest.all_records.append(element)

return RecordDoFn()

def test_after_count(self):
with TestPipeline() as p:
def construct_timestamped(k_t):
Expand Down Expand Up @@ -471,29 +458,28 @@ def test_multiple_accumulating_firings(self):
if i % 5 == 0:
ts.advance_watermark_to(i)
ts.advance_processing_time(5)
ts.advance_watermark_to_infinity()

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (p
| ts
| beam.WindowInto(
FixedWindows(10),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
trigger=AfterWatermark(
early=AfterAll(
AfterCount(1), AfterProcessingTime(5))
))
| beam.GroupByKey()
| beam.FlatMap(lambda x: x[1])
| beam.ParDo(self.record_dofn()))
records = (p
| ts
| beam.WindowInto(
FixedWindows(10),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
trigger=AfterWatermark(
early=AfterAll(
AfterCount(1), AfterProcessingTime(5))
))
| beam.GroupByKey()
| beam.FlatMap(lambda x: x[1]))

# The trigger should fire twice. Once after 5 seconds, and once after 10.
# The firings should accumulate the output.
first_firing = [str(i) for i in elements if i <= 5]
second_firing = [str(i) for i in elements]
self.assertListEqual(first_firing + second_firing,
TriggerPipelineTest.all_records)
assert_that(records, equal_to(first_firing + second_firing))


class TranscriptTest(unittest.TestCase):
Expand Down Expand Up @@ -816,6 +802,7 @@ def keyed(key, values):
else:
raise ValueError('Unexpected action: %s' % action)
test_stream.add_elements([json.dumps(('expect', []))])
test_stream.advance_watermark_to_infinity()

read_test_stream = test_stream | beam.Map(json.loads)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def test_in_streaming_mode(self):
.advance_watermark_to(start_time +
GroupIntoBatchesTest.NUM_ELEMENTS)
.advance_watermark_to_infinity())
pipeline = TestPipeline()
pipeline = TestPipeline(options=StandardOptions(streaming=True))
# window duration is 6 and batch size is 5, so output batch size should be
# 5 (flush because of batchSize reached)
expected_0 = 5
Expand Down

0 comments on commit b06fb9d

Please sign in to comment.