Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1672] Accumulable MetricsContainers. #2649

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[BEAM-1672] Use Accumulable MetricsContainers in Spark runner.
  • Loading branch information
aviemzur committed May 5, 2017
commit 71203d14bdc0ed70537b49befff40e9b93ba9454
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.beam.runners.spark;

import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.spark.metrics.SparkMetricResults;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -41,7 +43,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
protected final Future pipelineExecution;
protected JavaSparkContext javaSparkContext;
protected PipelineResult.State state;
private final SparkMetricResults metricResults = new SparkMetricResults();

SparkPipelineResult(final Future<?> pipelineExecution, final JavaSparkContext javaSparkContext) {
this.pipelineExecution = pipelineExecution;
Expand Down Expand Up @@ -106,7 +107,8 @@ public State waitUntilFinish(final Duration duration) {

@Override
public MetricResults metrics() {
return metricResults;
return asAttemptedOnlyMetricResults(
MetricsAccumulator.getInstance().value());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.Accumulator;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static class Bounded<T> extends RDD<WindowedValue<T>> {
private final SparkRuntimeContext runtimeContext;
private final int numPartitions;
private final String stepName;
private final Accumulator<SparkMetricsContainer> metricsAccum;
private final Accumulator<MetricsContainerStepMap> metricsAccum;

// to satisfy Scala API.
private static final scala.collection.immutable.Seq<Dependency<?>> NIL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
Expand All @@ -37,6 +36,7 @@
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -190,7 +190,7 @@ public scala.collection.immutable.List<DStream<?>> dependencies() {
public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
// compute parent.
scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
final Accumulator<SparkMetricsContainer> metricsAccum = MetricsAccumulator.getInstance();
final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance();
long count = 0;
SparkWatermarks sparkWatermark = null;
Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
Expand All @@ -211,7 +211,7 @@ public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
? partitionHighWatermark : globalHighWatermarkForBatch;
// Update metrics reported in the read
final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
final MetricsContainer container = metadata.getMetricsContainer().getContainer(stepName);
final MetricsContainer container = metadata.getMetricsContainers().getContainer(stepName);
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) {
final long readDurationMillis = metadata.getReadDurationMillis();
if (readDurationMillis > maxReadDuration) {
Expand All @@ -220,7 +220,7 @@ public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
} catch (IOException e) {
throw new RuntimeException(e);
}
metricsAccum.value().update(metadata.getMetricsContainer());
metricsAccum.value().updateAll(metadata.getMetricsContainers());
}

sparkWatermark =
Expand Down Expand Up @@ -260,20 +260,19 @@ public static class Metadata implements Serializable {
private final Instant lowWatermark;
private final Instant highWatermark;
private final long readDurationMillis;
private final SparkMetricsContainer metricsContainer;
private final MetricsContainerStepMap metricsContainers;

public Metadata(
long numRecords,
Instant lowWatermark,
Instant highWatermark,
final long readDurationMillis,
SparkMetricsContainer metricsContainer) {
MetricsContainerStepMap metricsContainer) {
this.numRecords = numRecords;
this.readDurationMillis = readDurationMillis;
this.metricsContainer = metricsContainer;
this.metricsContainers = metricsContainer;
this.lowWatermark = lowWatermark;
this.highWatermark = highWatermark;
metricsContainer.materialize();
}

long getNumRecords() {
Expand All @@ -292,8 +291,8 @@ public long getReadDurationMillis() {
return readDurationMillis;
}

SparkMetricsContainer getMetricsContainer() {
return metricsContainer;
MetricsContainerStepMap getMetricsContainers() {
return metricsContainers;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.Accumulator;
Expand All @@ -44,7 +45,7 @@ public class MetricsAccumulator {
private static final String ACCUMULATOR_NAME = "Beam.Metrics";
private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics";

private static volatile Accumulator<SparkMetricsContainer> instance = null;
private static volatile Accumulator<MetricsContainerStepMap> instance = null;
private static volatile FileSystem fileSystem;
private static volatile Path checkpointFilePath;

Expand All @@ -58,11 +59,13 @@ public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) {
Optional<CheckpointDir> maybeCheckpointDir =
opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
: Optional.<CheckpointDir>absent();
Accumulator<SparkMetricsContainer> accumulator =
jsc.sc().accumulator(new SparkMetricsContainer(), ACCUMULATOR_NAME,
Accumulator<MetricsContainerStepMap> accumulator =
jsc.sc().accumulator(
new MetricsContainerStepMap(),
ACCUMULATOR_NAME,
new MetricsAccumulatorParam());
if (maybeCheckpointDir.isPresent()) {
Optional<SparkMetricsContainer> maybeRecoveredValue =
Optional<MetricsContainerStepMap> maybeRecoveredValue =
recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get());
if (maybeRecoveredValue.isPresent()) {
accumulator.setValue(maybeRecoveredValue.get());
Expand All @@ -75,22 +78,23 @@ public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) {
}
}

public static Accumulator<SparkMetricsContainer> getInstance() {
public static Accumulator<MetricsContainerStepMap> getInstance() {
if (instance == null) {
throw new IllegalStateException("Metrics accumulator has not been instantiated");
} else {
return instance;
}
}

private static Optional<SparkMetricsContainer> recoverValueFromCheckpoint(
private static Optional<MetricsContainerStepMap> recoverValueFromCheckpoint(
JavaSparkContext jsc,
CheckpointDir checkpointDir) {
try {
Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir();
checkpointFilePath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME);
fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration());
SparkMetricsContainer recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath);
MetricsContainerStepMap recoveredValue =
Checkpoint.readObject(fileSystem, checkpointFilePath);
if (recoveredValue != null) {
LOG.info("Recovered metrics from checkpoint.");
return Optional.of(recoveredValue);
Expand All @@ -117,7 +121,7 @@ private static void checkpoint() throws IOException {
}

/**
* Spark Listener which checkpoints {@link SparkMetricsContainer} values for fault-tolerance.
* Spark Listener which checkpoints {@link MetricsContainerStepMap} values for fault-tolerance.
*/
public static class AccumulatorCheckpointingSparkListener extends JavaStreamingListener {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,31 @@

package org.apache.beam.runners.spark.metrics;

import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.spark.AccumulatorParam;


/**
* Metrics accumulator param.
*/
class MetricsAccumulatorParam implements AccumulatorParam<SparkMetricsContainer> {
class MetricsAccumulatorParam implements AccumulatorParam<MetricsContainerStepMap> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could be made an inner static class of MetricsContainerStepMap so that usages would look like so: new MetricsContainerStepMap.AccumulatorParam().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is AccumulatorParam is a Spark interface.

Copy link
Contributor

@staslev staslev Apr 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an issue?

It is instantiated here, so instead of new MetricsAccumulatorParam() it could be new MetricsContainerStepMap.AccumulatorParam().

Am I missing something?

Copy link
Member Author

@aviemzur aviemzur May 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is MetricsContainerStepMap is in sdks-java-core which is not dependent on Spark.

@Override
public SparkMetricsContainer addAccumulator(SparkMetricsContainer c1, SparkMetricsContainer c2) {
return c1.update(c2);
public MetricsContainerStepMap addAccumulator(
MetricsContainerStepMap c1,
MetricsContainerStepMap c2) {
return addInPlace(c1, c2);
}

@Override
public SparkMetricsContainer addInPlace(SparkMetricsContainer c1, SparkMetricsContainer c2) {
return c1.update(c2);
public MetricsContainerStepMap addInPlace(
MetricsContainerStepMap c1,
MetricsContainerStepMap c2) {
c1.updateAll(c2);
return c1;
}

@Override
public SparkMetricsContainer zero(SparkMetricsContainer initialValue) {
return new SparkMetricsContainer();
public MetricsContainerStepMap zero(MetricsContainerStepMap initialValue) {
return new MetricsContainerStepMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.beam.runners.spark.metrics;

import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import com.codahale.metrics.Metric;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
Expand All @@ -27,20 +29,23 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.metrics.MetricsFilter;


/**
* An adapter between the {@link SparkMetricsContainer} and Codahale's {@link Metric} interface.
* An adapter between the {@link MetricsContainerStepMap} and Codahale's {@link Metric} interface.
*/
class SparkBeamMetric implements Metric {
private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]";
private static final String ILLEGAL_CHARACTERS_AND_PERIOD = "[^A-Za-z0-9_-]";

private final SparkMetricResults metricResults = new SparkMetricResults();

Map<String, ?> renderAll() {
Map<String, Object> metrics = new HashMap<>();
MetricResults metricResults =
asAttemptedOnlyMetricResults(
MetricsAccumulator.getInstance().value());
MetricQueryResults metricQueryResults =
metricResults.queryMetrics(MetricsFilter.builder().build());
for (MetricResult<Long> metricResult : metricQueryResults.counters()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* A Spark {@link Source} that is tailored to expose a {@link SparkBeamMetric},
* wrapping an underlying {@link SparkMetricsContainer} instance.
* wrapping an underlying {@link org.apache.beam.sdk.metrics.MetricResults} instance.
*/
public class SparkBeamMetricSource implements Source {
private static final String METRIC_NAME = "Metrics";
Expand Down
Loading