Skip to content

Commit

Permalink
Move AccumulatedMetricResults to be an inner class of MetricContainer…
Browse files Browse the repository at this point in the history
…StempMap
  • Loading branch information
aviemzur committed Apr 30, 2017
1 parent b87a860 commit aeb3000
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 455 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.AccumulatedMetricResults;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
Expand Down Expand Up @@ -375,7 +375,7 @@ public State getState() {

@Override
public MetricResults metrics() {
return new AccumulatedMetricResults(
return MetricsContainerStepMap.asMetricResults(
evaluationContext.getAttemptedMetrics(),
evaluationContext.getCommittedMetrics());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.AccumulatedMetricResults;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.joda.time.Duration;
Expand Down Expand Up @@ -75,7 +74,7 @@ public State waitUntilFinish(Duration duration) {

@Override
public MetricResults metrics() {
return new AccumulatedMetricResults(
return MetricsContainerStepMap.asAttemptedOnlyMetricResults(
(MetricsContainerStepMap) aggregators.get(FlinkMetricContainer.ACCUMULATOR_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.metrics.AccumulatedMetricResults;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsFilter;
Expand Down Expand Up @@ -81,8 +81,8 @@ MetricsContainer getMetricsContainer(String stepName) {
}

void updateMetrics() {
AccumulatedMetricResults metricResults =
new AccumulatedMetricResults(metricsAccumulator.getLocalValue());
MetricResults metricResults =
MetricsContainerStepMap.asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().build());
updateCounters(metricQueryResults.counters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.AccumulatedMetricResults;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -111,7 +111,8 @@ public State waitUntilFinish(final Duration duration) {

@Override
public MetricResults metrics() {
return new AccumulatedMetricResults(MetricsAccumulator.getInstance().value());
return MetricsContainerStepMap.asAttemptedOnlyMetricResults(
MetricsAccumulator.getInstance().value());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.metrics.AccumulatedMetricResults;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsFilter;

Expand All @@ -41,8 +41,9 @@ class SparkBeamMetric implements Metric {

Map<String, ?> renderAll() {
Map<String, Object> metrics = new HashMap<>();
AccumulatedMetricResults metricResults =
new AccumulatedMetricResults(MetricsAccumulator.getInstance().value());
MetricResults metricResults =
MetricsContainerStepMap.asAttemptedOnlyMetricResults(
MetricsAccumulator.getInstance().value());
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().build());
for (MetricResult<Long> metricResult : metricQueryResults.counters()) {
Expand Down
Loading

0 comments on commit aeb3000

Please sign in to comment.