Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1672] Accumulable MetricsContainers. #2649

Closed
wants to merge 3 commits into from

Conversation

aviemzur
Copy link
Member

@aviemzur aviemzur commented Apr 22, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@aviemzur aviemzur force-pushed the accumulable-metricscontainer branch 3 times, most recently from 55bcc26 to 46e2ecf Compare April 23, 2017 05:00
@aviemzur
Copy link
Member Author

R: @bjchambers for Java SDK and Direct runner changes.
R: @aljoscha for Flink runner changes.
R: @staslev for Spark runner changes.

DistributionData update = metricUpdate.getUpdate();

// update flink metric
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is removing the support for exporting Beam Metrics to Flink Metrics. (Same holds for updateCounters() and updateGauge().

Unfortunately this is not tested right now since this is somewhat hard to do. I created a Jira issue for this: https://issues.apache.org/jira/browse/BEAM-2056

Copy link
Member Author

@aviemzur aviemzur Apr 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, missed this part. I'll try to implement a similar solution to what is done with Spark's metrics sink.

We use Spark's implementation of MetricResults and report these metrics to Spark's metrics sink.
https://github.com/apache/beam/blob/v0.6.0/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
https://github.com/apache/beam/blob/v0.6.0/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java

In this PR this is now done with AccumulatedAttemptedMetricResults. I'll try to use this to report these to Flink metrics similarly to how we report these to Spark's metrics sink.

@aviemzur aviemzur force-pushed the accumulable-metricscontainer branch 2 times, most recently from 16dcc79 to ebbb72a Compare April 24, 2017 06:31
// Report the physical metrics from the end of this step.
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());

synchronized (context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't need to syncrhonize.

  1. Synchronizing when there are multiple threads executing will be expensive.
  2. Each processing thread should have a dedicated evaluation context and a dedicated metrics container, and only the processing thread should be writing to it.
  3. The metrics container (I believe) uses Atomic values, so that reading will receive a reasonable value.

Copy link
Member Author

@aviemzur aviemzur Apr 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for synchronization is exactly what you remarked on, the evaluation context is shared between several threads.

There is synchronization in the existing metrics implementation for this reason as well DirectMetrics.java.

I agree that each thread having its own evaluation context would be better but I don't think this should be changed as part of this PR.

I will see if there is a different point in the flow in which the containers can be merged without synchronization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I forgot this was in the DirectRunner. That sounds reasonable, although @tgroh may have an idea if there is somewhere better to merge containers.

finishBundle(evaluator, enforcements);

synchronized (context) {
context.getCommittedMetrics().update(stepName, metricsContainer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be synchronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as below.

// Report the physical metrics after each element
MetricUpdates deltas = metricsContainer.getUpdates();
if (deltas != null) {
context.getMetrics().updatePhysical(inputBundle, deltas);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reporting attempted counters during processing allows a tentative value to be read during processing. Is there a reason we removed that functionality?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this with the accumulable metrics containers would mean synchronization after each element is processed (as was done in the existing direct metrics implementation) as well as creating a separate metrics container instance for each element.

If reporting attempted counters during processing is a crucial feature for Direct runner we can return the previous implementation and not use the accumulable version. I think, however, that the reuse of code and reduction of LOC >> this fairly niche feature in the direct runner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideas for other options that preserve functionality and code reduction:

  1. Have the DirectRunner report tentative metrics every Nth element (reduces the amount of synchronization).
  2. Allow copying the MetricsContainer. Then, the DirectRunner can create a copy and put it on a queue for a separate thread that actually does the update aggregation. This will avoid synchronization.
  3. Extending (2) -- I believe it should be safe for multiple threads to read a MetricsContainer even if it is being modified by one thread, since it uses Atomic values. Given that, we could just pass a reference to the MetricsContainer to incorporate to the queue, and have it maintain control of the aggregated MetricsContainers.

2 or 3 would also be most like this would happen within a distributed runner, where the actual aggregation was happening outside of the processing thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a separate ticket and PR? or does this block merging this change? As master has already changed and this PR must be rebased again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done in a way similar to how we update the state of the WatermarkManager, where we toss something on a concurrent queue, try to acquire a lock and update the visible state but give up if the lock is already held (and eventually are updated the rest of the way) (e.g. https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L912)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine, but as we're approaching 2 weeks on this PR's review and the code on master has diverged yet again I suggest we open this as a future ticket. We can give code references and how suggest this change should be implemented and perhaps tag this with starter to encourage contributions.

@tgroh WDYT? Can you open such a ticket?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could be OK, although it is concerning that we are regressing existing behavior as part of a refactoring...

Copy link
Member Author

@aviemzur aviemzur May 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we could not change direct runner to use accumulable metrics containers and open a task for someone to do that in a future PR.
I will go with this option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good -- created https://issues.apache.org/jira/browse/BEAM-2186 to track direct runner updates.


String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey());
Long update = metricUpdate.getUpdate();
Long update = metricResult.attempted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q (for my understanding): Are flink accumulators aggregated across all attempts at processing, or only across successfully checkpointed processing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually neither: aggregator values are lost in case of failure so it's "aggregated since last failure".

/**
* Implementation of attempted {@link MetricResults} using accumulated {@link MetricsContainers}.
*/
public class AccumulatedAttemptedMetricResults extends AccumulatedMetricResults {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes this special to attempted metrics? Couldn't it be used for committed metrics as well? Is it just that when given an AttemptedAndCommitted it extracts the Attempted? If yes, would it make sense to parameterize it on that part, so it could be used for either?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Override
public void inc() {
add(1);
public void update(Long n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also solve this using the approach taken in Sum Long CombineFn. Specifically, instead of MetricCell<Counter, Long> make this MetricCell<Counter, long[]>, where the array is expected to be one element. This avoids the boxing/unboxing.

Alternative, the boxing/unboxing here may be OK as long as we aren't using a Long as the accumulator/value holder.

/**
* Update value of this cell.
*/
void update(DataT data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do a set or a merge with the given data? If it does a set, then the method below should be called merge or something else to distinguish the behaviors. Either way, doc should indicate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does a set, which is why I chose update over merge, void return type and stated in the Javadoc that it updates the value of the cell.
Is this unclear? Should I use a different method name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be tempted to call this set or setValue then, or at least update the Javadoc to say that it sets the value of this cell to {@code data} or something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is it isn't a set it combines the input with the existing value, which is why I opted for update as the name.
We could change this to combineAndSet or mergeAndSet if that is clearer.

import java.util.concurrent.ConcurrentHashMap;

/**
* Metrics containers by step.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth calling this something like "MetricsContainerStepMap" or something, to make it clear the behavior this provides?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

public void updateAll(MetricsContainers other) {
for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to do any locking? Either way, should extend the documentation to describe how this should be used by the runner, and what requirements/guarantees it provides in terms of thread safety.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how the runner uses it. It isn't thread-safe. I'll add Javadoc to reflect this.

/**
* Implementation of {@link MetricResults} by means of accumulated {@link MetricsContainers}.
*/
public class AccumulatedMetricResults extends MetricResults {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class should have more documentation and should have some tests written to verify/document its behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 69.986% when pulling b87a860 on aviemzur:accumulable-metricscontainer into 14d60b2 on apache:master.

@aviemzur
Copy link
Member Author

Rebased on top of master
Addressed comments in the review.
PTAL: @bjchambers @aljoscha @staslev

Copy link
Contributor

@staslev staslev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as my experience with (Spark) accumulators goes, things might slip when moving from test setups (i.e. local cluster), to real-world distributed clusters.

Since I understand this has already been tested on an actual cluster, I have only a few minor style comments.

* {@link MetricsContainerStepMap}.
*/
public Map<String, MetricsContainer> asMap() {
return new HashMap<>(metricsContainers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a new HashMap instance needed here?

import org.apache.spark.AccumulatorParam;


/**
* Metrics accumulator param.
*/
class MetricsAccumulatorParam implements AccumulatorParam<SparkMetricsContainer> {
class MetricsAccumulatorParam implements AccumulatorParam<MetricsContainerStepMap> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could be made an inner static class of MetricsContainerStepMap so that usages would look like so: new MetricsContainerStepMap.AccumulatorParam().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is AccumulatorParam is a Spark interface.

Copy link
Contributor

@staslev staslev Apr 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an issue?

It is instantiated here, so instead of new MetricsAccumulatorParam() it could be new MetricsContainerStepMap.AccumulatorParam().

Am I missing something?

Copy link
Member Author

@aviemzur aviemzur May 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is MetricsContainerStepMap is in sdks-java-core which is not dependent on Spark.

* Implementation of {@link MetricResults} using
* {@link MetricsContainerStepMap MetricsContainerStepMaps}.
*/
public class AccumulatedMetricResults extends MetricResults {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if a better name could be given here.
Perhaps PipelineMetricResults to indicate that these MetricResults pertain to all steps in a given pipeline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*
* <p>This constructor is intended for runners which support both attempted and committed metrics.
*/
public AccumulatedMetricResults(
Copy link
Contributor

@staslev staslev Apr 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a personal preference towards static factory methods here to avoid leaking the concrete type out, i.e.,
AccumulatedMetricResults.of(...), or PipelieMetricResults.of(...) if you adopt the naming suggested above.

This is however, a personal preference (by definition) so it extends MatterOfTaste.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 69.978% when pulling b87a860 on aviemzur:accumulable-metricscontainer into 14d60b2 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 69.993% when pulling aeb3000 on aviemzur:accumulable-metricscontainer into 14d60b2 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 69.993% when pulling aeb3000 on aviemzur:accumulable-metricscontainer into 14d60b2 on apache:master.

@aviemzur
Copy link
Member Author

aviemzur commented May 1, 2017

@staslev Addressed your comments in my latest commit.

@staslev
Copy link
Contributor

staslev commented May 3, 2017

I'd import the static methods and use asMetricResults, asAttemptedOnlyMetricResults etc. directly, rather than in a qualified manner like so MetricsContainerStepMap.asAttemptedOnlyMetricResults which long-ish.

Other than that the parts I have looked at LGTM.

@aljoscha
Copy link
Contributor

aljoscha commented May 5, 2017

I looked over the Flink parts again. This LGTM, now that Flink Metrics support is retained. 👍

@aviemzur aviemzur force-pushed the accumulable-metricscontainer branch from aeb3000 to f943e18 Compare May 5, 2017 20:24
@aviemzur
Copy link
Member Author

aviemzur commented May 5, 2017

Rebased on top of master.
Removed all code changes from direct runner.
Will merge when tests pass.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 70.646% when pulling f943e18 on aviemzur:accumulable-metricscontainer into a629f73 on apache:master.

@aviemzur
Copy link
Member Author

aviemzur commented May 6, 2017

Two test failures of flaky tests, verified they passed locally.

@asfgit asfgit closed this in 019d300 May 6, 2017
aviemzur added a commit to aviemzur/beam that referenced this pull request May 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants