Skip to content

Commit

Permalink
Merge pull request apache#10029 from kkucharc/BEAM-6335-consume-data-…
Browse files Browse the repository at this point in the history
…insertion-pipeline-group-by-key

[BEAM-6335] Test GBK streaming reading SyntheticSources
  • Loading branch information
Ardagan committed Nov 20, 2019
2 parents e01f718 + 2c89d7f commit 5159130
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
Expand Down Expand Up @@ -73,8 +73,7 @@
*/
public class SyntheticDataPublisher {

private static final KvCoder<byte[], byte[]> RECORD_CODER =
KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());
private static final Coder RECORD_CODER = StringUtf8Coder.of();

private static Options options;

Expand Down Expand Up @@ -215,7 +214,7 @@ public PubsubMessage apply(KV<byte[], byte[]> input) {

private static byte[] encodeInputElement(KV<byte[], byte[]> input) {
try {
return encodeToByteArray(RECORD_CODER, input);
return encodeToByteArray(RECORD_CODER, new String(input.getValue(), UTF_8));
} catch (CoderException e) {
throw new RuntimeException(String.format("Couldn't encode element. Exception: %s", e));
}
Expand Down
38 changes: 25 additions & 13 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class PubSubMessageMatcher(BaseMatcher):
subscription until all expected messages are shown or timeout.
"""

def __init__(self, project, sub_name, expected_msg,
timeout=DEFAULT_TIMEOUT, with_attributes=False,
strip_attributes=None):
def __init__(self, project, sub_name, expected_msg=None,
expected_msg_len=None, timeout=DEFAULT_TIMEOUT,
with_attributes=False, strip_attributes=None):
"""Initialize PubSubMessageMatcher object.
Args:
Expand All @@ -75,22 +75,31 @@ def __init__(self, project, sub_name, expected_msg,
raise ValueError('Invalid project %s.' % project)
if not sub_name:
raise ValueError('Invalid subscription %s.' % sub_name)
if not isinstance(expected_msg, list):
if not expected_msg_len and not expected_msg:
raise ValueError('Required expected_msg: {} or expected_msg_len: {}.'
.format(expected_msg, expected_msg_len))
if expected_msg and not isinstance(expected_msg, list):
raise ValueError('Invalid expected messages %s.' % expected_msg)
if expected_msg_len and not isinstance(expected_msg_len, int):
raise ValueError('Invalid expected messages %s.' % expected_msg_len)

self.project = project
self.sub_name = sub_name
self.expected_msg = expected_msg
self.expected_msg_len = expected_msg_len or len(self.expected_msg)
self.timeout = timeout
self.messages = None
self.with_attributes = with_attributes
self.strip_attributes = strip_attributes

def _matches(self, _):
if self.messages is None:
self.messages = self._wait_for_messages(len(self.expected_msg),
self.messages = self._wait_for_messages(self.expected_msg_len,
self.timeout)
return Counter(self.messages) == Counter(self.expected_msg)
if self.expected_msg:
return Counter(self.messages) == Counter(self.expected_msg)
else:
return len(self.messages) == self.expected_msg_len

def _wait_for_messages(self, expected_num, timeout):
"""Wait for messages from given subscription."""
Expand Down Expand Up @@ -131,18 +140,21 @@ def _wait_for_messages(self, expected_num, timeout):

def describe_to(self, description):
description.append_text(
'Expected %d messages.' % len(self.expected_msg))
'Expected %d messages.' % self.expected_msg_len)

def describe_mismatch(self, _, mismatch_description):
c_expected = Counter(self.expected_msg)
c_actual = Counter(self.messages)
mismatch_description.append_text(
"Got %d messages. "
"Diffs (item, count):\n"
" Expected but not in actual: %s\n"
" Unexpected: %s" % (
len(self.messages), (c_expected - c_actual).items(),
(c_actual - c_expected).items()))
"Got %d messages. " % (
len(self.messages)))
if self.expected_msg:
mismatch_description.append_text(
"Diffs (item, count):\n"
" Expected but not in actual: %s\n"
" Unexpected: %s" % (
(c_expected - c_actual).items(),
(c_actual - c_expected).items()))
if self.with_attributes and self.strip_attributes:
mismatch_description.append_text(
'\n Stripped attributes: %r' % self.strip_attributes)
69 changes: 53 additions & 16 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from apache_beam.testing.test_utils import PullResponseMessage
from apache_beam.testing.test_utils import create_pull_response

# Protect against environments where pubsub library is not available.
try:
from google.cloud import pubsub
except ImportError:
Expand All @@ -54,14 +53,18 @@ def setUpClass(cls):
def setUp(self):
self.mock_presult = mock.MagicMock()

def init_matcher(self, with_attributes=False, strip_attributes=None):
def init_matcher(self, expected_msg=None,
with_attributes=False, strip_attributes=None):
self.pubsub_matcher = PubSubMessageMatcher(
'mock_project', 'mock_sub_name', ['mock_expected_msg'],
'mock_project', 'mock_sub_name', expected_msg,
with_attributes=with_attributes, strip_attributes=strip_attributes)

def init_counter_matcher(self, expected_msg_len=1):
self.pubsub_matcher = PubSubMessageMatcher(
'mock_project', 'mock_sub_name', expected_msg_len=expected_msg_len)

def test_message_matcher_success(self, mock_get_sub, unsued_mock):
self.init_matcher()
self.pubsub_matcher.expected_msg = [b'a', b'b']
self.init_matcher(expected_msg=[b'a', b'b'])
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {})]),
Expand All @@ -72,8 +75,8 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 2)

def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
self.init_matcher(expected_msg=[PubsubMessage(b'a', {'k': 'v'})],
with_attributes=True)
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
Expand All @@ -83,8 +86,8 @@ def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})]
self.init_matcher(expected_msg=[PubsubMessage(b'a', {})],
with_attributes=True)
mock_sub = mock_get_sub.return_value
# Unexpected attribute 'k'.
mock_sub.pull.side_effect = [
Expand All @@ -96,9 +99,9 @@ def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
self.init_matcher(expected_msg=[PubsubMessage(b'a', {'k': 'v'})],
with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
Expand All @@ -108,9 +111,9 @@ def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
self.init_matcher(expected_msg=[PubsubMessage(b'a', {'k': 'v'})],
with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
# Message is missing attribute 'timestamp'.
mock_sub.pull.side_effect = [create_pull_response([
Expand All @@ -122,8 +125,7 @@ def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
self.init_matcher()
self.pubsub_matcher.expected_msg = [b'a']
self.init_matcher(expected_msg=[b'a'])
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'c', {}),
Expand All @@ -140,7 +142,7 @@ def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
self.init_matcher()
self.init_matcher(expected_msg=[b'a'])
mock_sub = mock_get_sub.return_value
mock_sub.return_value.full_name.return_value = 'mock_sub'
self.pubsub_matcher.timeout = 0.1
Expand All @@ -149,6 +151,41 @@ def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
self.assertTrue(mock_sub.pull.called)
self.assertEqual(mock_sub.acknowledge.call_count, 0)

def test_message_count_matcher_below_fail(self, mock_get_sub, unused_mock):
self.init_counter_matcher(expected_msg_len=1)
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'c', {}),
PullResponseMessage(b'd', {})]),
]
with self.assertRaises(AssertionError) as error:
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
self.assertTrue(
'\nExpected: Expected 1 messages.\n but: Got 2 messages.'
in str(error.exception.args[0]))

def test_message_count_matcher_above_fail(self, mock_get_sub, unused_mock):
self.init_counter_matcher(expected_msg_len=1)
mock_sub = mock_get_sub.return_value
self.pubsub_matcher.timeout = 0.1
with self.assertRaisesRegex(AssertionError, r'Expected 1.*\n.*Got 0'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertTrue(mock_sub.pull.called)
self.assertEqual(mock_sub.acknowledge.call_count, 0)

def test_message_count_matcher_success(self, mock_get_sub, unused_mock):
self.init_counter_matcher(expected_msg_len=15)
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [create_pull_response(
[PullResponseMessage(
b'a', {'foo': 'bar'})
for _ in range(15)]
)]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
self.assertEqual(mock_sub.acknowledge.call_count, 1)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
15 changes: 9 additions & 6 deletions sdks/python/apache_beam/testing/load_tests/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ def parseTestPipelineOptions(self, options=None):
)
}

def setUp(self):
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))
def setUp(self, pipeline_options=None):
self.pipeline = TestPipeline(pipeline_options)
input = self.pipeline.get_option('input_options') or '{}'
self.input_options = json.loads(input)
self.project_id = self.pipeline.get_option('project')

self.metrics_dataset = self.pipeline.get_option('metrics_dataset')
Expand All @@ -66,10 +67,12 @@ def setUp(self):
)

def tearDown(self):
result = self.pipeline.run()
result.wait_until_finish()
if not hasattr(self, 'result'):
self.result = self.pipeline.run()
self.result.wait_until_finish()

self.metrics_monitor.publish_metrics(result)
if self.metrics_monitor:
self.metrics_monitor.publish_metrics(self.result)

def get_option_or_default(self, opt_name, default=0):
"""Returns a pipeline option or a default value if it was not provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def publish_metrics(self, result):
# required to prepare metrics for publishing purposes. Expected is to have
# a list of dictionaries matching the schema.
insert_dicts = self._prepare_all_metrics(metrics)
if len(insert_dicts):
if len(insert_dicts) > 0:
for publisher in self.publishers:
publisher.publish(insert_dicts)

Expand All @@ -224,10 +224,11 @@ def _get_distributions(self, distributions, metric_id):
matching_namsespace, not_matching_namespace = \
split_metrics_by_namespace_and_name(distributions, self._namespace,
RUNTIME_METRIC)
runtime_metric = RuntimeMetric(matching_namsespace, metric_id)
rows.append(runtime_metric.as_dict())

rows += get_generic_distributions(not_matching_namespace, metric_id)
if len(matching_namsespace) > 0:
runtime_metric = RuntimeMetric(matching_namsespace, metric_id)
rows.append(runtime_metric.as_dict())
if len(not_matching_namespace) > 0:
rows += get_generic_distributions(not_matching_namespace, metric_id)
return rows


Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/testing/load_tests/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading

0 comments on commit 5159130

Please sign in to comment.