-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1672] Accumulable MetricsContainers. #2649
Conversation
55bcc26
to
46e2ecf
Compare
R: @bjchambers for Java SDK and Direct runner changes. |
DistributionData update = metricUpdate.getUpdate(); | ||
|
||
// update flink metric | ||
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is removing the support for exporting Beam Metrics to Flink Metrics. (Same holds for updateCounters()
and updateGauge()
.
Unfortunately this is not tested right now since this is somewhat hard to do. I created a Jira issue for this: https://issues.apache.org/jira/browse/BEAM-2056
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, missed this part. I'll try to implement a similar solution to what is done with Spark's metrics sink.
We use Spark's implementation of MetricResults
and report these metrics to Spark's metrics sink.
https://github.com/apache/beam/blob/v0.6.0/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/WithMetricsSupport.java
https://github.com/apache/beam/blob/v0.6.0/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
In this PR this is now done with AccumulatedAttemptedMetricResults
. I'll try to use this to report these to Flink metrics similarly to how we report these to Spark's metrics sink.
16dcc79
to
ebbb72a
Compare
// Report the physical metrics from the end of this step. | ||
context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); | ||
|
||
synchronized (context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't need to syncrhonize.
- Synchronizing when there are multiple threads executing will be expensive.
- Each processing thread should have a dedicated evaluation context and a dedicated metrics container, and only the processing thread should be writing to it.
- The metrics container (I believe) uses Atomic values, so that reading will receive a reasonable value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for synchronization is exactly what you remarked on, the evaluation context is shared between several threads.
There is synchronization in the existing metrics implementation for this reason as well DirectMetrics.java.
I agree that each thread having its own evaluation context would be better but I don't think this should be changed as part of this PR.
I will see if there is a different point in the flow in which the containers can be merged without synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I forgot this was in the DirectRunner. That sounds reasonable, although @tgroh may have an idea if there is somewhere better to merge containers.
finishBundle(evaluator, enforcements); | ||
|
||
synchronized (context) { | ||
context.getCommittedMetrics().update(stepName, metricsContainer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same answer as below.
// Report the physical metrics after each element | ||
MetricUpdates deltas = metricsContainer.getUpdates(); | ||
if (deltas != null) { | ||
context.getMetrics().updatePhysical(inputBundle, deltas); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reporting attempted counters during processing allows a tentative value to be read during processing. Is there a reason we removed that functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To do this with the accumulable metrics containers would mean synchronization after each element is processed (as was done in the existing direct metrics implementation) as well as creating a separate metrics container instance for each element.
If reporting attempted counters during processing is a crucial feature for Direct runner we can return the previous implementation and not use the accumulable version. I think, however, that the reuse of code and reduction of LOC >> this fairly niche feature in the direct runner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideas for other options that preserve functionality and code reduction:
- Have the DirectRunner report tentative metrics every Nth element (reduces the amount of synchronization).
- Allow copying the MetricsContainer. Then, the DirectRunner can create a copy and put it on a queue for a separate thread that actually does the update aggregation. This will avoid synchronization.
- Extending (2) -- I believe it should be safe for multiple threads to read a MetricsContainer even if it is being modified by one thread, since it uses Atomic values. Given that, we could just pass a reference to the MetricsContainer to incorporate to the queue, and have it maintain control of the aggregated MetricsContainers.
2 or 3 would also be most like this would happen within a distributed runner, where the actual aggregation was happening outside of the processing thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a separate ticket and PR? or does this block merging this change? As master has already changed and this PR must be rebased again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be done in a way similar to how we update the state of the WatermarkManager
, where we toss something on a concurrent queue, try to acquire a lock and update the visible state but give up if the lock is already held (and eventually are updated the rest of the way) (e.g. https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L912)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fine, but as we're approaching 2 weeks on this PR's review and the code on master has diverged yet again I suggest we open this as a future ticket. We can give code references and how suggest this change should be implemented and perhaps tag this with starter
to encourage contributions.
@tgroh WDYT? Can you open such a ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this could be OK, although it is concerning that we are regressing existing behavior as part of a refactoring...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively we could not change direct runner to use accumulable metrics containers and open a task for someone to do that in a future PR.
I will go with this option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good -- created https://issues.apache.org/jira/browse/BEAM-2186 to track direct runner updates.
|
||
String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey()); | ||
Long update = metricUpdate.getUpdate(); | ||
Long update = metricResult.attempted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q (for my understanding): Are flink accumulators aggregated across all attempts at processing, or only across successfully checkpointed processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually neither: aggregator values are lost in case of failure so it's "aggregated since last failure".
/** | ||
* Implementation of attempted {@link MetricResults} using accumulated {@link MetricsContainers}. | ||
*/ | ||
public class AccumulatedAttemptedMetricResults extends AccumulatedMetricResults { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What makes this special to attempted metrics? Couldn't it be used for committed metrics as well? Is it just that when given an AttemptedAndCommitted it extracts the Attempted? If yes, would it make sense to parameterize it on that part, so it could be used for either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@Override | ||
public void inc() { | ||
add(1); | ||
public void update(Long n) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also solve this using the approach taken in Sum Long CombineFn. Specifically, instead of MetricCell<Counter, Long>
make this MetricCell<Counter, long[]>
, where the array is expected to be one element. This avoids the boxing/unboxing.
Alternative, the boxing/unboxing here may be OK as long as we aren't using a Long
as the accumulator/value holder.
/** | ||
* Update value of this cell. | ||
*/ | ||
void update(DataT data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this do a set or a merge with the given data? If it does a set, then the method below should be called merge
or something else to distinguish the behaviors. Either way, doc should indicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does a set, which is why I chose update
over merge
, void return type and stated in the Javadoc that it updates the value of the cell.
Is this unclear? Should I use a different method name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be tempted to call this set
or setValue
then, or at least update the Javadoc to say that it sets the value of this cell to {@code data}
or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is it isn't a set
it combines the input with the existing value, which is why I opted for update
as the name.
We could change this to combineAndSet
or mergeAndSet
if that is clearer.
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Metrics containers by step. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be worth calling this something like "MetricsContainerStepMap" or something, to make it clear the behavior this provides?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
public void updateAll(MetricsContainers other) { | ||
for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to do any locking? Either way, should extend the documentation to describe how this should be used by the runner, and what requirements/guarantees it provides in terms of thread safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on how the runner uses it. It isn't thread-safe. I'll add Javadoc to reflect this.
/** | ||
* Implementation of {@link MetricResults} by means of accumulated {@link MetricsContainers}. | ||
*/ | ||
public class AccumulatedMetricResults extends MetricResults { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class should have more documentation and should have some tests written to verify/document its behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
ac9e386
to
b87a860
Compare
Rebased on top of master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as my experience with (Spark) accumulators goes, things might slip when moving from test setups (i.e. local cluster), to real-world distributed clusters.
Since I understand this has already been tested on an actual cluster, I have only a few minor style comments.
* {@link MetricsContainerStepMap}. | ||
*/ | ||
public Map<String, MetricsContainer> asMap() { | ||
return new HashMap<>(metricsContainers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is a new HashMap
instance needed here?
import org.apache.spark.AccumulatorParam; | ||
|
||
|
||
/** | ||
* Metrics accumulator param. | ||
*/ | ||
class MetricsAccumulatorParam implements AccumulatorParam<SparkMetricsContainer> { | ||
class MetricsAccumulatorParam implements AccumulatorParam<MetricsContainerStepMap> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this could be made an inner static class of MetricsContainerStepMap
so that usages would look like so: new MetricsContainerStepMap.AccumulatorParam()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this is AccumulatorParam
is a Spark interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this an issue?
It is instantiated here, so instead of new MetricsAccumulatorParam()
it could be new MetricsContainerStepMap.AccumulatorParam()
.
Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is MetricsContainerStepMap
is in sdks-java-core which is not dependent on Spark.
* Implementation of {@link MetricResults} using | ||
* {@link MetricsContainerStepMap MetricsContainerStepMaps}. | ||
*/ | ||
public class AccumulatedMetricResults extends MetricResults { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if a better name could be given here.
Perhaps PipelineMetricResults
to indicate that these MetricResults
pertain to all steps in a given pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* | ||
* <p>This constructor is intended for runners which support both attempted and committed metrics. | ||
*/ | ||
public AccumulatedMetricResults( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a personal preference towards static factory methods here to avoid leaking the concrete type out, i.e.,
AccumulatedMetricResults.of(...)
, or PipelieMetricResults.of(...)
if you adopt the naming suggested above.
This is however, a personal preference (by definition) so it extends MatterOfTaste
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@staslev Addressed your comments in my latest commit. |
I'd import the static methods and use Other than that the parts I have looked at LGTM. |
I looked over the Flink parts again. This LGTM, now that Flink Metrics support is retained. 👍 |
aeb3000
to
f943e18
Compare
Rebased on top of master. |
Two test failures of flaky tests, verified they passed locally. |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.