Skip to content

Commit

Permalink
[FLINK-12983][metrics] replace descriptive histogram's storage back-end
Browse files Browse the repository at this point in the history
Instead of using `DescriptiveStatistics` and its `ResizableDoubleArray` storage
back-end, implement an own circular array based on a fixed-size double array
that wraps around. This significantly improves the time needed to add new
values. It also ensures that access is synchronized which `ResizableDoubleArray`
does not guarantee.
  • Loading branch information
NicoK committed Aug 21, 2019
1 parent 4452be3 commit f57a615
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,63 @@
*/
public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram {

private final DescriptiveStatistics descriptiveStatistics;

private long elementsSeen = 0L;
private final CircularDoubleArray descriptiveStatistics;

public DescriptiveStatisticsHistogram(int windowSize) {
this.descriptiveStatistics = new DescriptiveStatistics(windowSize);
this.descriptiveStatistics = new CircularDoubleArray(windowSize);
}

@Override
public void update(long value) {
elementsSeen += 1L;
this.descriptiveStatistics.addValue(value);
}

@Override
public long getCount() {
return this.elementsSeen;
return this.descriptiveStatistics.getElementsSeen();
}

@Override
public HistogramStatistics getStatistics() {
return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);
}

/**
* Fixed-size array that wraps around at the end and has a dynamic start position.
*/
static class CircularDoubleArray {
private final double[] backingArray;
private int nextPos = 0;
private boolean fullSize = false;
private long elementsSeen = 0;

CircularDoubleArray(int windowSize) {
this.backingArray = new double[windowSize];
}

synchronized void addValue(double value) {
backingArray[nextPos] = value;
++elementsSeen;
++nextPos;
if (nextPos == backingArray.length) {
nextPos = 0;
fullSize = true;
}
}

synchronized double[] toUnsortedArray() {
final int size = getSize();
double[] result = new double[size];
System.arraycopy(backingArray, 0, result, 0, result.length);
return result;
}

private synchronized int getSize() {
return fullSize ? backingArray.length : nextPos;
}

private synchronized long getElementsSeen() {
return elementsSeen;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
public class DescriptiveStatisticsHistogramStatistics extends HistogramStatistics {
private final CommonMetricsSnapshot statisticsSummary = new CommonMetricsSnapshot();

public DescriptiveStatisticsHistogramStatistics(DescriptiveStatistics latencyHistogram) {
latencyHistogram.apply(statisticsSummary);
public DescriptiveStatisticsHistogramStatistics(
DescriptiveStatisticsHistogram.CircularDoubleArray histogramValues) {
statisticsSummary.evaluate(histogramValues.toUnsortedArray());
}

@Override
Expand Down

0 comments on commit f57a615

Please sign in to comment.