Skip to content

Commit

Permalink
Merge pull request apache#9843 from kamilwu/portable-runners-metrics
Browse files Browse the repository at this point in the history
[BEAM-4775] Converting MonitoringInfos to MetricResults in PortableRunner
  • Loading branch information
Ardagan committed Oct 30, 2019
2 parents 1ccecd3 + f53e47a commit 7a02887
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 69 deletions.
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ def parse_namespace_and_name(monitoring_info_proto):
return split[0], split[1]


def get_step_name(monitoring_info_proto):
"""Returns a step name for the given monitoring info or None if step name
cannot be specified."""
# Right now only metrics that have a PTRANSFORM are taken into account
return monitoring_info_proto.labels.get(PTRANSFORM_LABEL)


def to_key(monitoring_info_proto):
"""Returns a key based on the URN and labels.
Expand Down
45 changes: 13 additions & 32 deletions sdks/python/apache_beam/runners/portability/fn_api_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@

import apache_beam as beam # pylint: disable=ungrouped-imports
from apache_beam import coders
from apache_beam import metrics
from apache_beam.coders.coder_impl import create_InputStream
from apache_beam.coders.coder_impl import create_OutputStream
from apache_beam.metrics import metric
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options import pipeline_options
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability import common_urns
Expand All @@ -62,6 +61,7 @@
from apache_beam.runners import runner
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import fn_api_runner_transforms
from apache_beam.runners.portability import portable_metrics
from apache_beam.runners.portability.fn_api_runner_transforms import create_buffer_id
from apache_beam.runners.portability.fn_api_runner_transforms import only_element
from apache_beam.runners.portability.fn_api_runner_transforms import split_buffer_id
Expand Down Expand Up @@ -1883,53 +1883,34 @@ def get(self, timeout=None):
return self._response


class FnApiMetrics(metrics.metric.MetricResults):
class FnApiMetrics(metric.MetricResults):
def __init__(self, step_monitoring_infos, user_metrics_only=True):
"""Used for querying metrics from the PipelineResult object.
step_monitoring_infos: Per step metrics specified as MonitoringInfos.
use_monitoring_infos: If true, return the metrics based on the
step_monitoring_infos.
user_metrics_only: If true, includes user metrics only.
"""
self._counters = {}
self._distributions = {}
self._gauges = {}
self._user_metrics_only = user_metrics_only
self._init_metrics_from_monitoring_infos(step_monitoring_infos)
self._monitoring_infos = step_monitoring_infos

def _init_metrics_from_monitoring_infos(self, step_monitoring_infos):
for smi in step_monitoring_infos.values():
# Only include user metrics.
for mi in smi:
if (self._user_metrics_only and
not monitoring_infos.is_user_monitoring_info(mi)):
continue
key = self._to_metric_key(mi)
if monitoring_infos.is_counter(mi):
self._counters[key] = (
monitoring_infos.extract_metric_result_map_value(mi))
elif monitoring_infos.is_distribution(mi):
self._distributions[key] = (
monitoring_infos.extract_metric_result_map_value(mi))
elif monitoring_infos.is_gauge(mi):
self._gauges[key] = (
monitoring_infos.extract_metric_result_map_value(mi))

def _to_metric_key(self, monitoring_info):
# Right now this assumes that all metrics have a PTRANSFORM
transform_id = monitoring_info.labels['PTRANSFORM']
namespace, name = monitoring_infos.parse_namespace_and_name(monitoring_info)
return MetricKey(transform_id, MetricName(namespace, name))
counters, distributions, gauges = \
portable_metrics.from_monitoring_infos(smi, user_metrics_only)
self._counters.update(counters)
self._distributions.update(distributions)
self._gauges.update(gauges)

def query(self, filter=None):
counters = [metrics.execution.MetricResult(k, v, v)
counters = [MetricResult(k, v, v)
for k, v in self._counters.items()
if self.matches(filter, k)]
distributions = [metrics.execution.MetricResult(k, v, v)
distributions = [MetricResult(k, v, v)
for k, v in self._distributions.items()
if self.matches(filter, k)]
gauges = [metrics.execution.MetricResult(k, v, v)
gauges = [MetricResult(k, v, v)
for k, v in self._gauges.items()
if self.matches(filter, k)]

Expand Down
25 changes: 24 additions & 1 deletion sdks/python/apache_beam/runners/portability/local_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import grpc
from google.protobuf import text_format

from apache_beam.metrics import monitoring_infos
from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_fn_api_pb2_grpc
Expand Down Expand Up @@ -107,6 +108,26 @@ def stop(self, timeout=1):
if os.path.exists(self._staging_dir) and self._cleanup_staging_dir:
shutil.rmtree(self._staging_dir, ignore_errors=True)

def GetJobMetrics(self, request, context=None):
if request.job_id not in self._jobs:
raise LookupError("Job {} does not exist".format(request.job_id))

result = self._jobs[request.job_id].result
monitoring_info_list = []
for mi in result._monitoring_infos_by_stage.values():
monitoring_info_list.extend(mi)

# Filter out system metrics
user_monitoring_info_list = [
x for x in monitoring_info_list
if monitoring_infos._is_user_monitoring_info(x) or
monitoring_infos._is_user_distribution_monitoring_info(x)
]

return beam_job_api_pb2.GetJobMetricsResponse(
metrics=beam_job_api_pb2.MetricResults(
committed=user_monitoring_info_list))


class SubprocessSdkWorker(object):
"""Manages a SDK worker implemented as a subprocess communicating over grpc.
Expand Down Expand Up @@ -176,6 +197,7 @@ def __init__(self,
self._log_queues = []
self.state = beam_job_api_pb2.JobState.STARTING
self.daemon = True
self.result = None

@property
def state(self):
Expand Down Expand Up @@ -204,11 +226,12 @@ def run(self):
def _run_job(self):
with JobLogHandler(self._log_queues):
try:
fn_api_runner.FnApiRunner(
result = fn_api_runner.FnApiRunner(
provision_info=self._provision_info).run_via_runner_api(
self._pipeline_proto)
logging.info('Successfully completed job.')
self.state = beam_job_api_pb2.JobState.DONE
self.result = result
except: # pylint: disable=bare-except
logging.exception('Error running pipeline.')
logging.exception(traceback)
Expand Down
69 changes: 69 additions & 0 deletions sdks/python/apache_beam/runners/portability/portable_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import logging

from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metric import MetricName


def from_monitoring_infos(monitoring_info_list, user_metrics_only=False):
"""Groups MonitoringInfo objects into counters, distributions and gauges.
Args:
monitoring_info_list: An iterable of MonitoringInfo objects.
user_metrics_only: If true, includes user metrics only.
Returns:
A tuple containing three dictionaries: counters, distributions and gauges,
respectively. Each dictionary contains (MetricKey, metric result) pairs.
"""
counters = {}
distributions = {}
gauges = {}

for mi in monitoring_info_list:
if (user_metrics_only and
not monitoring_infos.is_user_monitoring_info(mi)):
continue

try:
key = _create_metric_key(mi)
except ValueError as e:
logging.debug(str(e))
continue
metric_result = (monitoring_infos.extract_metric_result_map_value(mi))

if monitoring_infos.is_counter(mi):
counters[key] = metric_result
elif monitoring_infos.is_distribution(mi):
distributions[key] = metric_result
elif monitoring_infos.is_gauge(mi):
gauges[key] = metric_result

return counters, distributions, gauges


def _create_metric_key(monitoring_info):
step_name = monitoring_infos.get_step_name(monitoring_info)
if not step_name:
raise ValueError('Failed to deduce step_name from MonitoringInfo: {}'
.format(monitoring_info))
namespace, name = monitoring_infos.parse_namespace_and_name(monitoring_info)
return MetricKey(step_name, MetricName(namespace, name))
32 changes: 24 additions & 8 deletions sdks/python/apache_beam/runners/portability/portable_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import grpc

from apache_beam import version as beam_version
from apache_beam import metrics
from apache_beam.metrics import metric
from apache_beam.metrics.execution import MetricResult
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
Expand All @@ -41,6 +42,7 @@
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import fn_api_runner_transforms
from apache_beam.runners.portability import job_server
from apache_beam.runners.portability import portable_metrics
from apache_beam.runners.portability import portable_stager
from apache_beam.runners.worker import sdk_worker_main
from apache_beam.runners.worker import worker_pool_main
Expand Down Expand Up @@ -366,16 +368,30 @@ def add_runner_options(parser):
state_stream, cleanup_callbacks)


class PortableMetrics(metrics.metric.MetricResults):
class PortableMetrics(metric.MetricResults):
def __init__(self, job_metrics_response):
# TODO(lgajowy): Convert portable metrics to MetricResults
# and allow querying them (BEAM-4775)
pass
metrics = job_metrics_response.metrics
self.attempted = portable_metrics.from_monitoring_infos(metrics.attempted)
self.committed = portable_metrics.from_monitoring_infos(metrics.committed)

@staticmethod
def _combine(committed, attempted, filter):
all_keys = set(committed.keys()) | set(attempted.keys())
return [
MetricResult(key, committed.get(key), attempted.get(key))
for key in all_keys
if metric.MetricResults.matches(filter, key)
]

def query(self, filter=None):
return {'counters': [],
'distributions': [],
'gauges': []}
counters, distributions, gauges = [
self._combine(x, y, filter)
for x, y in zip(self.committed, self.attempted)
]

return {self.COUNTERS: counters,
self.DISTRIBUTIONS: distributions,
self.GAUGES: gauges}


class PipelineResult(runner.PipelineResult):
Expand Down
32 changes: 10 additions & 22 deletions sdks/python/apache_beam/testing/load_tests/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,24 @@ def setUp(self):
self.input_options = json.loads(self.pipeline.get_option('input_options'))
self.project_id = self.pipeline.get_option('project')

self.publish_to_big_query = self.pipeline.get_option('publish_to_big_query')
self.metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_namespace = self.pipeline.get_option('metrics_table')

if not self.are_metrics_collected():
logging.info('Metrics will not be collected')
self.metrics_monitor = None
else:
self.metrics_monitor = MetricsReader(
project_name=self.project_id,
bq_table=self.metrics_namespace,
bq_dataset=self.metrics_dataset,
)
self.metrics_monitor = MetricsReader(
publish_to_bq=self.pipeline.get_option('publish_to_big_query') ==
'true',
project_name=self.project_id,
bq_table=self.metrics_namespace,
bq_dataset=self.metrics_dataset,
# Apply filter to prevent system metrics from being published
filters=MetricsFilter().with_namespace(self.metrics_namespace)
)

def tearDown(self):
result = self.pipeline.run()
result.wait_until_finish()

if self.metrics_monitor:
self.metrics_monitor.publish_metrics(result)

def apply_filter(self, allowed):
"""Prevents metrics from namespaces other than specified in the argument
from being published."""
if allowed:
self.metrics_monitor.filters = MetricsFilter().with_namespaces(allowed)
self.metrics_monitor.publish_metrics(result)

def get_option_or_default(self, opt_name, default=0):
"""Returns a pipeline option or a default value if it was not provided.
Expand All @@ -92,10 +84,6 @@ def get_option_or_default(self, opt_name, default=0):
except ValueError as exc:
self.fail(str(exc))

def are_metrics_collected(self):
return self.publish_to_big_query == 'true' and None not in (
self.project_id, self.metrics_dataset, self.metrics_namespace)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class MetricsReader(object):
publishers = []

def __init__(self, project_name=None, bq_table=None, bq_dataset=None,
filters=None):
publish_to_bq=False, filters=None):
"""Initializes :class:`MetricsReader` .
Args:
Expand All @@ -182,7 +182,8 @@ def __init__(self, project_name=None, bq_table=None, bq_dataset=None,
"""
self._namespace = bq_table
self.publishers.append(ConsoleMetricsPublisher())
check = project_name and bq_table and bq_dataset

check = project_name and bq_table and bq_dataset and publish_to_bq
if check:
bq_publisher = BigQueryMetricsPublisher(
project_name, bq_table, bq_dataset)
Expand Down Expand Up @@ -311,8 +312,8 @@ def _prepare_runtime_metrics(self, distributions):
min_values = []
max_values = []
for dist in distributions:
min_values.append(dist.committed.min)
max_values.append(dist.committed.max)
min_values.append(dist.result.min)
max_values.append(dist.result.max)
# finding real start
min_value = min(min_values)
# finding real end
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/testing/load_tests/pardo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@
class ParDoTest(LoadTest):
def setUp(self):
super(ParDoTest, self).setUp()
if self.are_metrics_collected():
self.apply_filter([self.metrics_namespace])
self.iterations = self.get_option_or_default('iterations')
self.number_of_counters = self.get_option_or_default('number_of_counters')
self.number_of_operations = self.get_option_or_default(
Expand Down

0 comments on commit 7a02887

Please sign in to comment.