Skip to content

Commit

Permalink
[BEAM-7674] Combine batch and streaming BQ Streaming Insert ITs
Browse files Browse the repository at this point in the history
I added a new test - test_multiple_destination_transform_streaming
that was a streaming version of test_multiple_destination_transform.
Combining them would allow the same test to be run in both batch
and streaming.
  • Loading branch information
ttanay committed Jul 16, 2019
1 parent f7cbf88 commit a035703
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 89 deletions.
133 changes: 45 additions & 88 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
Expand Down Expand Up @@ -561,6 +562,10 @@ def test_value_provider_transform(self):

@attr('IT')
def test_multiple_destinations_transform(self):
streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest("TestStream is not supported on TestDataflowRunner")

output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)

Expand All @@ -576,100 +581,52 @@ def test_multiple_destinations_transform(self):

bad_record = {'language': 1, 'manguage': 2}

pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
if streaming:
pipeline_verifiers = [
PipelineStateMatcher(PipelineState.RUNNING),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
else:
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers),
experiments='use_beam_bq_sink')

with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)

schema_table_pcv = beam.pvalue.AsDict(
p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1),
(full_output_table_2, schema2)]))

table_record_pcv = beam.pvalue.AsDict(
p | "MakeTables" >> beam.Create([('table1', full_output_table_1),
('table2', full_output_table_2)]))

input2 = p | "Broken record" >> beam.Create([bad_record])

input = (input, input2) | beam.Flatten()

r = (input
| "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=lambda x, tables: (tables['table1']
if 'language' in x
else tables['table2']),
table_side_inputs=(table_record_pcv,),
schema=lambda dest, table_map: table_map.get(dest, None),
schema_side_inputs=(schema_table_pcv,),
method='STREAMING_INSERTS'))

assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
equal_to([(full_output_table_1, bad_record)]))

@attr('IT')
def test_multiple_destinations_transform_streaming(self):
if isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest("TestStream is not supported on TestDataflowRunner")
output_table_1 = '%s%s_streaming' % (self.output_table, 1)
output_table_2 = '%s%s_streaming' % (self.output_table, 2)

full_output_table_1 = '%s:%s' % (self.project, output_table_1)
full_output_table_2 = '%s:%s' % (self.project, output_table_2)

schema1 = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
schema2 = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE'}]}

bad_record = {'language': 1, 'manguage': 2}

pipeline_verifiers = [
PipelineStateMatcher(PipelineState.RUNNING),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]

args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers),
experiments='use_beam_bq_sink',
streaming=True)

with beam.Pipeline(argv=args) as p:
_SIZE = len(_ELEMENTS)
test_stream = (TestStream()
.advance_watermark_to(0)
.add_elements(_ELEMENTS[:_SIZE//2])
.advance_watermark_to(100)
.add_elements(_ELEMENTS[_SIZE//2:])
.advance_watermark_to_infinity())
input = p | test_stream
if streaming:
_SIZE = len(_ELEMENTS)
test_stream = (TestStream()
.advance_watermark_to(0)
.add_elements(_ELEMENTS[:_SIZE//2])
.advance_watermark_to(100)
.add_elements(_ELEMENTS[_SIZE//2:])
.advance_watermark_to_infinity())
input = p | test_stream
else:
input = p | beam.Create(_ELEMENTS)

schema_table_pcv = beam.pvalue.AsDict(
p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1),
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
"apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
"apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
.test_multiple_destinations_transform_streaming",
.test_multiple_destinations_transform",
]
def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
def argMap = ["runner": "TestDirectRunner",
Expand Down

0 comments on commit a035703

Please sign in to comment.