Skip to content

Commit

Permalink
[FLINK-12982][metrics] improve DescriptiveStatisticsHistogramStatisti…
Browse files Browse the repository at this point in the history
…cs performance

Instead of redirecting DescriptiveStatisticsHistogramStatistics calls to
DescriptiveStatistics, it takes a point-in-time snapshot using an own
UnivariateStatistic implementation that
a) calculates min, max, mean, and standard deviation in one go (as opposed to
   four iterations over the values array!)
b) caches pivots for the percentile calculation to speed up retrieval of
   multiple percentiles/quartiles

As a result, this roughly increases value retrieval performance by 120% when
accessing typical statistics in a metrics reporter, e.g. the InfluxDB reporter:
count, min, max, mean, stddev, p50, p75, p95, p98, p99, p999.
  • Loading branch information
NicoK committed Aug 21, 2019
1 parent e59b9d2 commit 4452be3
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.metrics.HistogramStatistics;

import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.rank.Percentile;
import org.apache.commons.math3.stat.ranking.NaNStrategy;

/**
* The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}.
Expand All @@ -35,9 +33,6 @@ public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.

public DescriptiveStatisticsHistogram(int windowSize) {
this.descriptiveStatistics = new DescriptiveStatistics(windowSize);
// since we are storing Long values, we won't have NaN values
Percentile percentileImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);
descriptiveStatistics.setPercentileImpl(percentileImpl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,183 @@

import org.apache.flink.metrics.HistogramStatistics;

import org.apache.commons.math3.exception.MathIllegalArgumentException;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.UnivariateStatistic;
import org.apache.commons.math3.stat.descriptive.moment.SecondMoment;
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import org.apache.commons.math3.stat.descriptive.rank.Percentile;
import org.apache.commons.math3.stat.ranking.NaNStrategy;

import java.util.Arrays;

/**
* DescriptiveStatistics histogram statistics implementation returned by {@link DescriptiveStatisticsHistogram}.
* The statistics class wraps a {@link DescriptiveStatistics} instance and forwards the method calls accordingly.
*
* <p>The statistics takes a point-in-time snapshot of a {@link DescriptiveStatistics} instance and
* allows optimised metrics retrieval from this.
*/
public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistics {
private final DescriptiveStatistics descriptiveStatistics;
private final CommonMetricsSnapshot statisticsSummary = new CommonMetricsSnapshot();

public DescriptiveStatisticsHistogramStatistics(DescriptiveStatistics latencyHistogram) {
this.descriptiveStatistics = latencyHistogram;
latencyHistogram.apply(statisticsSummary);
}

@Override
public double getQuantile(double quantile) {
return descriptiveStatistics.getPercentile(quantile * 100);
return statisticsSummary.getPercentile(quantile * 100);
}

@Override
public long[] getValues() {
return Arrays.stream(descriptiveStatistics.getValues()).mapToLong(i -> (long) i).toArray();
return Arrays.stream(statisticsSummary.getValues()).mapToLong(i -> (long) i).toArray();
}

@Override
public int size() {
return (int) descriptiveStatistics.getN();
return (int) statisticsSummary.getCount();
}

@Override
public double getMean() {
return descriptiveStatistics.getMean();
return statisticsSummary.getMean();
}

@Override
public double getStdDev() {
return descriptiveStatistics.getStandardDeviation();
return statisticsSummary.getStandardDeviation();
}

@Override
public long getMax() {
return (long) descriptiveStatistics.getMax();
return (long) statisticsSummary.getMax();
}

@Override
public long getMin() {
return (long) descriptiveStatistics.getMin();
return (long) statisticsSummary.getMin();
}

/**
* Function to extract several commonly used metrics in an optimised way, i.e. with as few runs
* over the data / calculations as possible.
*
* <p>Note that calls to {@link #evaluate(double[])} or {@link #evaluate(double[], int, int)}
* will not return a value but instead populate this class so that further values can be
* retrieved from it.
*/
private static class CommonMetricsSnapshot implements UnivariateStatistic {
private long count = 0;
private double min = Double.NaN;
private double max = Double.NaN;
private double mean = Double.NaN;
private double stddev = Double.NaN;
private Percentile percentilesImpl = new Percentile().withNaNStrategy(NaNStrategy.FIXED);

@Override
public double evaluate(final double[] values) throws MathIllegalArgumentException {
return evaluate(values, 0, values.length);
}

@Override
public double evaluate(double[] values, int begin, int length)
throws MathIllegalArgumentException {
this.count = length;
percentilesImpl.setData(values, begin, length);

SimpleStats secondMoment = new SimpleStats();
secondMoment.evaluate(values, begin, length);
this.mean = secondMoment.getMean();
this.min = secondMoment.getMin();
this.max = secondMoment.getMax();

this.stddev = new StandardDeviation(secondMoment).getResult();

return Double.NaN;
}

@Override
public CommonMetricsSnapshot copy() {
CommonMetricsSnapshot result = new CommonMetricsSnapshot();
result.count = count;
result.min = min;
result.max = max;
result.mean = mean;
result.stddev = stddev;
result.percentilesImpl = percentilesImpl.copy();
return result;
}

long getCount() {
return count;
}

double getMin() {
return min;
}

double getMax() {
return max;
}

double getMean() {
return mean;
}

double getStandardDeviation() {
return stddev;
}

double getPercentile(double p) {
return percentilesImpl.evaluate(p);
}

double[] getValues() {
return percentilesImpl.getData();
}
}

/**
* Calculates min, max, mean (first moment), as well as the second moment in one go over
* the value array.
*/
private static class SimpleStats extends SecondMoment {
private static final long serialVersionUID = 1L;

private double min = Double.NaN;
private double max = Double.NaN;

@Override
public void increment(double d) {
if (d < min || Double.isNaN(min)) {
min = d;
}
if (d > max || Double.isNaN(max)) {
max = d;
}
super.increment(d);
}

@Override
public SecondMoment copy() {
SimpleStats result = new SimpleStats();
SecondMoment.copy(this, result);
result.min = min;
result.max = max;
return result;
}

public double getMin() {
return min;
}

public double getMax() {
return max;
}

public double getMean() {
return m1;
}
}
}

0 comments on commit 4452be3

Please sign in to comment.