Skip to content

Commit

Permalink
[BEAM-1672] Use Accumulable MetricsContainers in Flink runner.
Browse files Browse the repository at this point in the history
  • Loading branch information
aviemzur committed May 5, 2017
1 parent 71203d1 commit f943e18
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
import org.joda.time.Duration;

/**
Expand Down Expand Up @@ -72,6 +75,7 @@ public State waitUntilFinish(Duration duration) {

@Override
public MetricResults metrics() {
return new FlinkMetricResults(accumulators);
return asAttemptedOnlyMetricResults(
(MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@
*/
public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {

private final String stepName;
private final FlinkMetricContainer container;
private final DoFnRunner<InputT, OutputT> delegate;

public DoFnRunnerWithMetricsUpdate(
String stepName,
DoFnRunner<InputT, OutputT> delegate,
RuntimeContext runtimeContext) {
this.stepName = stepName;
this.delegate = delegate;
container = new FlinkMetricContainer(stepName, runtimeContext);
container = new FlinkMetricContainer(runtimeContext);
}

@Override
public void startBundle() {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
delegate.startBundle();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -58,7 +60,7 @@ public void startBundle() {
@Override
public void processElement(final WindowedValue<InputT> elem) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
delegate.processElement(elem);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -69,7 +71,7 @@ public void processElement(final WindowedValue<InputT> elem) {
public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp,
final TimeDomain timeDomain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
delegate.onTimer(timerId, window, timestamp, timeDomain);
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -79,7 +81,7 @@ public void onTimer(final String timerId, final BoundedWindow window, final Inst
@Override
public void finishBundle() {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
delegate.finishBundle();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit f943e18

Please sign in to comment.