Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1672] Accumulable MetricsContainers. #2649

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[BEAM-1672] Make MetricsContainers accumulable.
  • Loading branch information
aviemzur committed May 5, 2017
commit e0ca7a71587ea65d7976d2c0335e3d52e715ad57
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public boolean apply(@Nullable WindowedValue<V> input) {
.isBefore(timerInternals.currentInputWatermarkTime());
if (expired) {
// The element is too late for this window.
droppedDueToLateness.inc();
droppedDueToLateness.update(1L);
WindowTracing.debug(
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
+ "window: {} since it is too far behind inputWatermark: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* indirection.
*/
@Experimental(Kind.METRICS)
public class CounterCell implements MetricCell<Long> {
public class CounterCell implements MetricCell<Counter, Long> {

private final DirtyState dirty = new DirtyState();
private final AtomicLong value = new AtomicLong();
Expand All @@ -41,12 +41,25 @@ public class CounterCell implements MetricCell<Long> {
*/
CounterCell() {}

/** Increment the counter by the given amount. */
private void add(long n) {
/**
* Increment the counter by the given amount.
* @param n value to increment by. Can be negative to decrement.
*/
public void update(long n) {
value.addAndGet(n);
dirty.afterModification();
}

@Override
public void update(Long n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also solve this using the approach taken in Sum Long CombineFn. Specifically, instead of MetricCell<Counter, Long> make this MetricCell<Counter, long[]>, where the array is expected to be one element. This avoids the boxing/unboxing.

Alternative, the boxing/unboxing here may be OK as long as we aren't using a Long as the accumulator/value holder.

throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used"
+ " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead.");
}

@Override public void update(MetricCell<Counter, Long> other) {
update((long) other.getCumulative());
}

@Override
public DirtyState getDirty() {
return dirty;
Expand All @@ -56,12 +69,4 @@ public DirtyState getDirty() {
public Long getCumulative() {
return value.get();
}

public void inc() {
add(1);
}

public void inc(long n) {
add(n);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.beam.sdk.metrics;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Expand All @@ -41,7 +42,7 @@
* completed.
*/
@Experimental(Kind.METRICS)
class DirtyState {
class DirtyState implements Serializable {
private enum State {
/** Indicates that there have been changes to the MetricCell since last commit. */
DIRTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* of indirection.
*/
@Experimental(Kind.METRICS)
public class DistributionCell implements MetricCell<DistributionData> {
public class DistributionCell implements MetricCell<Distribution, DistributionData> {

private final DirtyState dirty = new DirtyState();
private final AtomicReference<DistributionData> value =
Expand All @@ -42,15 +42,25 @@ public class DistributionCell implements MetricCell<DistributionData> {
*/
DistributionCell() {}

/** Increment the counter by the given amount. */
/** Increment the distribution by the given amount. */
public void update(long n) {
update(DistributionData.singleton(n));
}

@Override
public void update(DistributionData data) {
DistributionData original;
do {
original = value.get();
} while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n))));
} while (!value.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}

@Override
public void update(MetricCell<Distribution, DistributionData> other) {
update(other.getCumulative());
}

@Override
public DirtyState getDirty() {
return dirty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,33 @@
* of indirection.
*/
@Experimental(Experimental.Kind.METRICS)
public class GaugeCell implements MetricCell<GaugeData> {
public class GaugeCell implements MetricCell<Gauge, GaugeData> {

private final DirtyState dirty = new DirtyState();
private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());

/** Set the gauge to the given value. */
public void set(long value) {
update(GaugeData.create(value));
}

@Override
public void update(GaugeData data) {
GaugeData original;
do {
original = gaugeValue.get();
} while (!gaugeValue.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}

@Override
public void update(MetricCell<Gauge, GaugeData> other) {
GaugeData original;
do {
original = gaugeValue.get();
} while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value))));
} while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative())));
dirty.afterModification();
update(other.getCumulative());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@
*/
package org.apache.beam.sdk.metrics;

import java.io.Serializable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;

/**
* A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
* specific metric name in a single context.
*
* @param <UserT> The type of the user interface for reporting changes to this cell.
* @param <DataT> The type of metric data stored (and extracted) from this cell.
*/
@Experimental(Kind.METRICS)
public interface MetricCell<DataT> {
public interface MetricCell<UserT extends Metric, DataT> extends Serializable {

/**
* Update value of this cell.
*/
void update(DataT data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do a set or a merge with the given data? If it does a set, then the method below should be called merge or something else to distinguish the behaviors. Either way, doc should indicate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does a set, which is why I chose update over merge, void return type and stated in the Javadoc that it updates the value of the cell.
Is this unclear? Should I use a different method name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be tempted to call this set or setValue then, or at least update the Javadoc to say that it sets the value of this cell to {@code data} or something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is it isn't a set it combines the input with the existing value, which is why I opted for update as the name.
We could change this to combineAndSet or mergeAndSet if that is clearer.


/**
* Update value of this cell by merging the value of another cell.
*/
void update(MetricCell<UserT, DataT> other);

/**
* Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private DelegatingCounter(MetricName name) {
@Override public void inc(long n) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
container.getCounter(name).inc(n);
container.getCounter(name).update(n);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Expand All @@ -37,7 +38,7 @@
* cumulative values/updates.
*/
@Experimental(Kind.METRICS)
public class MetricsContainer {
public class MetricsContainer implements Serializable {

private final String stepName;

Expand Down Expand Up @@ -96,7 +97,7 @@ public GaugeCell getGauge(MetricName metricName) {
return gauges.get(metricName);
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
Expand All @@ -120,8 +121,8 @@ public MetricUpdates getUpdates() {
extractUpdates(gauges));
}

private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
for (MetricCell<?> cell : cells.values()) {
private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
for (MetricCell<?, ?> cell : cells.values()) {
cell.getDirty().afterCommit();
}
}
Expand All @@ -133,9 +134,10 @@ private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells
public void commitUpdates() {
commitUpdates(counters);
commitUpdates(distributions);
commitUpdates(gauges);
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
Expand All @@ -156,4 +158,21 @@ public MetricUpdates getCumulative() {
extractCumulatives(distributions),
extractCumulatives(gauges));
}

/**
* Update values of this {@link MetricsContainer} by merging the value of another cell.
*/
public void update(MetricsContainer other) {
updateCells(counters, other.counters);
updateCells(distributions, other.distributions);
updateCells(gauges, other.gauges);
}

private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, DataT>> void updateCells(
MetricsMap<MetricName, CellT> current,
MetricsMap<MetricName, CellT> updates) {
for (Map.Entry<MetricName, CellT> counter : updates.entries()) {
current.get(counter.getKey()).update(counter.getValue());
}
}
}
Loading