Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8949] SpannerIO integration tests #11210

Merged
merged 16 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Please migrate your code to use
See the updated
[datastore_wordcount](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py)
for example usage.
* Python SDK: Added integration tests and updated batch write functionality for Google Cloud Spanner transform ([BEAM-8949](https://issues.apache.org/jira/browse/BEAM-8949)).

## New Features / Improvements
* Python SDK will now use Python 3 type annotations as pipeline type hints.
Expand Down
47 changes: 23 additions & 24 deletions sdks/python/apache_beam/io/gcp/experimental/spannerio.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
from apache_beam.pvalue import TaggedOutput
from apache_beam.transforms import PTransform
from apache_beam.transforms import ptransform_fn
from apache_beam.transforms import window
from apache_beam.transforms.combiners import ToList
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
Expand Down Expand Up @@ -984,7 +984,7 @@ def delete(table, keyset):
})


@with_input_types(typing.Union[MutationGroup, TaggedOutput])
@with_input_types(typing.List[MutationGroup])
@with_output_types(MutationGroup)
class _BatchFn(DoFn):
"""
Expand All @@ -995,7 +995,6 @@ def __init__(self, max_batch_size_bytes, max_number_rows, max_number_cells):
self._max_number_rows = max_number_rows
self._max_number_cells = max_number_cells

def start_bundle(self):
self._batch = MutationGroup()
self._size_in_bytes = 0
self._rows = 0
Expand All @@ -1008,31 +1007,30 @@ def _reset_count(self):
self._cells = 0

def process(self, element):
mg_info = element.info
for elem in element:
mszb marked this conversation as resolved.
Show resolved Hide resolved
mg_info = elem.info
if mg_info['byte_size'] + self._size_in_bytes > \
mszb marked this conversation as resolved.
Show resolved Hide resolved
self._max_batch_size_bytes \
or mg_info['cells'] + self._cells > self._max_number_cells \
or mg_info['rows'] + self._rows > self._max_number_rows:
# Batch is full, output the batch and resetting the count.
if self._batch:
yield self._batch
self._reset_count()
self._batch.extend(elem)

if mg_info['byte_size'] + self._size_in_bytes > self._max_batch_size_bytes \
or mg_info['cells'] + self._cells > self._max_number_cells \
or mg_info['rows'] + self._rows > self._max_number_rows:
# Batch is full, output the batch and resetting the count.
if self._batch:
yield self._batch
self._reset_count()
# total byte size of the mutation group.
self._size_in_bytes += mg_info['byte_size']

self._batch.extend(element)
# total rows in the mutation group.
self._rows += mg_info['rows']

# total byte size of the mutation group.
self._size_in_bytes += mg_info['byte_size']
# total cells in the mutation group.
self._cells += mg_info['cells']

# total rows in the mutation group.
self._rows += mg_info['rows']

# total cells in the mutation group.
self._cells += mg_info['cells']

def finish_bundle(self):
if self._batch is not None:
yield window.GlobalWindows.windowed_value(self._batch)
self._batch = None
if self._batch:
yield self._batch
self._reset_count()


@with_input_types(MutationGroup)
Expand Down Expand Up @@ -1135,6 +1133,7 @@ def expand(self, pcoll):

batching_batchables = (
filter_batchable_mutations['batchable']
| 'combine to list' >> ToList()
| ParDo(
_BatchFn(
max_batch_size_bytes=self._max_batch_size_bytes,
Expand Down
138 changes: 138 additions & 0 deletions sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import logging
import random
import sys
import unittest
import uuid

from nose.plugins.attrib import attr

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

# Protect against environments where spanner library is not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
# pylint: disable=unused-import
try:
from google.cloud import spanner
from apache_beam.io.gcp.experimental.spannerio import create_transaction
from apache_beam.io.gcp.experimental.spannerio import ReadOperation
from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner
except ImportError:
spanner = None
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
# pylint: enable=unused-import

_LOGGER = logging.getLogger(__name__)
_TEST_INSTANCE_ID = 'beam-test'


@unittest.skipIf(spanner is None, 'GCP dependencies are not installed.')
class SpannerReadTest(unittest.TestCase):
TEST_DATABASE = None
_database_prefix = "pybeam-read-{}"
_data = None
_SPANNER_CLIENT = None
_SPANNER_INSTANCE = None

@classmethod
def _generate_table_name(cls):
cls.TEST_DATABASE = cls._database_prefix.format(
''.join(random.sample(uuid.uuid4().hex, 15)))
return cls.TEST_DATABASE

@classmethod
def _create_database(cls):
_LOGGER.info("Creating test database: %s" % cls.TEST_DATABASE)
instance = cls._SPANNER_INSTANCE
database = instance.database(
cls.TEST_DATABASE,
ddl_statements=[
"""CREATE TABLE Users (
UserId INT64 NOT NULL,
Key STRING(1024)
) PRIMARY KEY (UserId)""",
])
operation = database.create()
_LOGGER.info("Creating database: Done! %s" % str(operation.result()))

@classmethod
def _add_dummy_interies(cls):
_LOGGER.info("Dummy Data: Adding dummy data...")
instance = cls._SPANNER_INSTANCE
database = instance.database(cls.TEST_DATABASE)
data = cls._data = [(x + 1, uuid.uuid4().hex) for x in range(200)]
with database.batch() as batch:
batch.insert(table='Users', columns=('UserId', 'Key'), values=data)

@classmethod
def setUpClass(cls):
_LOGGER.info(".... PyVersion ---> %s" % str(sys.version))
_LOGGER.info(".... Setting up!")
cls.test_pipeline = TestPipeline(is_integration_test=True)
cls.args = cls.test_pipeline.get_full_options_as_args()
cls.runner_name = type(cls.test_pipeline.runner).__name__
cls.project = cls.test_pipeline.get_option('project')
cls.instance = (
cls.test_pipeline.get_option('instance') or _TEST_INSTANCE_ID)
_ = cls._generate_table_name()
spanner_client = cls._SPANNER_CLIENT = spanner.Client()
_LOGGER.info(".... Spanner Client created!")
cls._SPANNER_INSTANCE = spanner_client.instance(cls.instance)
cls._create_database()
cls._add_dummy_interies()
_LOGGER.info("Spanner Read IT Setup Complete...")

@attr('IT')
def test_read_via_table(self):
_LOGGER.info("Spanner Read via table")
with beam.Pipeline(argv=self.args) as p:
r = p | ReadFromSpanner(
self.project,
self.instance,
self.TEST_DATABASE,
table="Users",
columns=["UserId", "Key"])
assert_that(r, equal_to(self._data))

@attr('IT')
def test_read_via_sql(self):
_LOGGER.info("Running Spanner via sql")
with beam.Pipeline(argv=self.args) as p:
r = p | ReadFromSpanner(
self.project,
self.instance,
self.TEST_DATABASE,
sql="select * from Users")
assert_that(r, equal_to(self._data))

@classmethod
def tearDownClass(cls):
# drop the testing database after the tests
database = cls._SPANNER_INSTANCE.database(cls.TEST_DATABASE)
database.drop()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def test_batch_byte_size(
# and each bach should contains 25 mutations.
res = (
p | beam.Create(mutation_group)
| 'combine to list' >> beam.combiners.ToList()
mszb marked this conversation as resolved.
Show resolved Hide resolved
| beam.ParDo(
_BatchFn(
max_batch_size_bytes=1450,
Expand All @@ -522,6 +523,7 @@ def test_batch_disable(self, mock_batch_snapshot_class, mock_batch_checkout):
# either to lower value or zero
res = (
p | beam.Create(mutation_group)
| 'combine to list' >> beam.combiners.ToList()
| beam.ParDo(
_BatchFn(
max_batch_size_bytes=1450,
Expand Down Expand Up @@ -550,6 +552,7 @@ def test_batch_max_rows(self, mock_batch_snapshot_class, mock_batch_checkout):
# (contains 5 mutation groups each).
res = (
p | beam.Create(mutation_group)
| 'combine to list' >> beam.combiners.ToList()
| beam.ParDo(
_BatchFn(
max_batch_size_bytes=1048576,
Expand Down Expand Up @@ -582,6 +585,7 @@ def test_batch_max_cells(
# total_batches = Total Number of Cells / Max Cells
res = (
p | beam.Create(mutation_group)
| 'combine to list' >> beam.combiners.ToList()
| beam.ParDo(
_BatchFn(
max_batch_size_bytes=1048576,
Expand Down
Loading