-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4775] Converting MonitoringInfos to MetricResults in PortableRunner #9843
Conversation
5440307
to
67e73d3
Compare
Run Python PreCommit |
We'll have to rely on r:@Ardagan as Alex is not working on this anymore. |
monitoring_infos._is_user_distribution_monitoring_info(x) | ||
] | ||
|
||
return beam_job_api_pb2.GetJobMetricsResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, here is some background. I was not aware that MetricResult existed in proto form now. Creating a proto was suggested but not pursued at the time.
https://s.apache.org/get-metrics-api
When I last worked here MetricResult did not have a proto format. The plan was to use MonitoringInfos for a language agnostic format, and then MetricResult would be a language specific format (python, java, go, etc.) Each runner should provide a way to return MonitoringInfos, and each language would have a library to convert MonitoringInfos to MetricResult protos
It seems like you might be using a different approach, to use MetricResult protos as the language agnostic solution
Its hard for me to review, I don't think I am up to date on whatever the current plan/usage of these protos are.
|
||
Args: | ||
monitoring_info_list: An iterable of MonitoringInfo objects. | ||
user_metrics_only: If true, includes user metrics only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a Returns section to pydoc comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
@staticmethod | ||
def _create_metric_key(monitoring_info): | ||
step_name = ParseMonitoringInfoMixin._get_step_name(monitoring_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_get_step_name seems like it should live on monitoring_infos.py
|
||
@staticmethod | ||
def _get_step_name(monitoring_info): | ||
keys_to_check = [monitoring_infos.PTRANSFORM_LABEL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem correct. Why would you consider the step name to be under labels other than PTRANSFORM_LABEL
see montioring_info specs defined here
https://github.com/apache/beam/blob/d4afbabf38a3ab557625c9c091ed5f06ca6731ce/model/pipeline/src/main/proto/metrics.proto
PTRANSFORM_LABEL is the only one used for this purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm running a simple test (sdks/python/apache_beam/testing/load_tests/pardo_test.py) on FlinkRunner and I see a lot of metrics with PCOLLECTION
label. They have different PCOLLECTION
label value ('u'ref_PCollection_PCollection_16', 'u'ref_PCollection_PCollection_10' and so on), but their urn is the same: beam:metric:element_count:v1
. Shouldn't we include all of these metrics in final result? If I didn't consider the step name to be under labels other than PTRANSFORM_LABEL, their MetricKey would be like ('', MetricName('beam', 'metric:element_count:v1'))
. It that case, some of the metrics would be lost, because their MetricKey would be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the intention is that URN + Labels defines the metric instance.
Think of URN as the class of metric, and URN+Labels defines the object instance, as an analogy.
I don't quite remember exactly what MetricKey contained, but generally our collection objects for metrics need to account for URN+Labels to correctly identify the metric instance.
MetricResult was originally designed just for user metrics, which did not have labels. Just a name and namespace. The labels concept was introduced to MonitoringInfos later. Then the name and namespace were reworked to be labels.
See the user metric MonitoringInfoSpec, defining what should be populated on an MI
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Dataflow runner we have to translate SDK PTransform and PCollection names back to user defined names. I'm still checking the code to see if that is required here as well.
Some additional background:
ElementCount metric is defined only by PCollection name. If you need to get a step name from it, you need to store it in mapping from PCollection to StepNames. PCollection name is incorrect to be considered a step name.
As Alex mentioned each metric is defined by URN + List of Labels. Each URN can have different set of labels that uniquely identify the metric. We need to treat each URN differently atm. This comes from the point that for example same PCollection can technically be relevant to different step names and choosing specific step name might not be trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point now.
I think it might be too difficult (if possible at all) to implement such a mapping right here. Since this is a portable runner, such a logic would have to be aware of all possible runner's implementation details.
I'm going to stay with checking PTRANSFORM_LABEL only for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to aggregate away the other labels, you could use this approach if you sum everything with the same ptransform_label
@@ -361,16 +363,31 @@ def add_runner_options(parser): | |||
state_stream, cleanup_callbacks) | |||
|
|||
|
|||
class PortableMetrics(metrics.metric.MetricResults): | |||
class PortableMetrics(metric.MetricResults, | |||
portable_metrics.ParseMonitoringInfoMixin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the use of double inheritance. You can use the methods from portable_metrics.ParseMonitoringInfoMixin by making that file define the methods in the module instead of a class, since it only defined static methods
from apache_beam.metrics.metric import MetricName | ||
|
||
|
||
class ParseMonitoringInfoMixin(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this class and instead define the methods on a module, without a class. All the methods here are static, so there is no need for a class/object instance,
bq_table=self.metrics_namespace, | ||
bq_dataset=self.metrics_dataset, | ||
) | ||
self.metrics_monitor = MetricsReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with MetricsReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use MetricsReader to push metrics obtained from PipelineResult to BigQuery in load tests. This diff is just a fix to prevent system metrics from being published, as we don't need them in BigQuery
Happy to keep reviewing but please have Ardagan review as well. |
Thanks for your review so far, I've put some fixes |
Run Python PreCommit |
def _create_metric_key(monitoring_info): | ||
step_name = monitoring_infos.get_step_name(monitoring_info) | ||
if not step_name: | ||
raise ValueError("Step name is empty") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dump monitoring info to error message. Otherwise this error is indescriptive. Ideally change text to what you expect here, for example: "Monitoring info should contain XXX field. Monitoring info: {}" or in this case it can be: "Failed to deduce step_name from MonitoringInfo: {}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Failed to deduce step_name..." looks good.
try: | ||
key = _create_metric_key(mi) | ||
except ValueError: | ||
continue |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have @Ardagan approve as well. I think you've addressed my concerns
092e32f
to
cbeda3e
Compare
FnApiRunner and PortableRunner shares the same set of unit tests. Tests that check metrics were, however, disabled in PortableRunner tests suites. This commit removes this limitation.
Using committed property could have been a source of an error if runner doesn't support committed metrics.
cbeda3e
to
f53e47a
Compare
run python precommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to get tests green before merging. LGTM overall.
There are no more comments, I'll merge PR. |
Thanks! |
This PR enables users to retrieve metrics from portable runners via query() call.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.