Skip to content

Commit

Permalink
[FLINK-19174][metrics] Fix idle and backpressured time accuracy on lo…
Browse files Browse the repository at this point in the history
…ng sleeps

In particularly, if task is idling forever, as there are no new records incomming
previous version would report idleTime as 0% and busyTime as 100% which is incorrect.

In this version, idleTime metric is aware that idling period has started and can take
that into account when updating it's value.
  • Loading branch information
pnowojski committed Jan 6, 2021
1 parent b71f858 commit 7aafc4c
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -125,8 +126,9 @@ public void testAddCounter() throws Exception {
public void testAddGauge() throws Exception {
String gaugeName = "gauge";

int gaugesBefore = reporter.getGauges().size();
taskMetricGroup.gauge(gaugeName, null);
assertTrue(reporter.getGauges().isEmpty());
assertThat(reporter.getGauges().size(), is(gaugesBefore));

Gauge<Long> gauge = () -> null;
taskMetricGroup.gauge(gaugeName, gauge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.function.SupplierWithException;

Expand Down Expand Up @@ -64,7 +62,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
/** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */
private BufferBuilder broadcastBufferBuilder;

private Meter backPressuredTimeMsPerSecond = new MeterView(new SimpleCounter());
private TimerGauge backPressuredTimeMsPerSecond = new TimerGauge();

public BufferWritingResultPartition(
String owningTaskName,
Expand Down Expand Up @@ -335,11 +333,10 @@ private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
return bufferBuilder;
}

backPressuredTimeMsPerSecond.markStart();
try {
long start = System.currentTimeMillis();
bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
long backPressuredTime = System.currentTimeMillis() - start;
backPressuredTimeMsPerSecond.markEvent(backPressuredTime);
backPressuredTimeMsPerSecond.markEnd();
return bufferBuilder;
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for buffer");
Expand Down Expand Up @@ -378,7 +375,7 @@ private void ensureBroadcastMode() {
}

@VisibleForTesting
public Meter getBackPressuredTimeMsPerSecond() {
public TimerGauge getBackPressuredTimeMsPerSecond() {
return backPressuredTimeMsPerSecond;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.metrics;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.View;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

/**
* {@link TimerGauge} measures how much time is spent in a given state, with entry into that state
* being signaled by {@link #markStart()}. Measuring is stopped by {@link #markEnd()}. This class in
* particularly takes care of the case, when {@link #update()} is called when some measurement
* started but has not yet finished. For example even if next {@link #markEnd()} call is expected to
* happen in a couple of hours, the returned value will account for this ongoing measurement.
*/
public class TimerGauge implements Gauge<Long>, View {
private final Clock clock;

private long previousCount;
private long currentCount;
private long currentMeasurementStart;

public TimerGauge() {
this(SystemClock.getInstance());
}

public TimerGauge(Clock clock) {
this.clock = clock;
}

public synchronized void markStart() {
if (currentMeasurementStart == 0) {
currentMeasurementStart = clock.absoluteTimeMillis();
}
}

public synchronized void markEnd() {
if (currentMeasurementStart != 0) {
currentCount += clock.absoluteTimeMillis() - currentMeasurementStart;
currentMeasurementStart = 0;
}
}

@Override
public synchronized void update() {
if (currentMeasurementStart != 0) {
long now = clock.absoluteTimeMillis();
currentCount += now - currentMeasurementStart;
currentMeasurementStart = now;
}
previousCount = Math.max(Math.min(currentCount / UPDATE_INTERVAL_SECONDS, 1000), 0);
currentCount = 0;
}

@Override
public synchronized Long getValue() {
return previousCount;
}

@VisibleForTesting
public synchronized long getCount() {
return currentCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.TimerGauge;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,9 +47,9 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numRecordsInRate;
private final Meter numRecordsOutRate;
private final Meter numBuffersOutRate;
private final Meter idleTimePerSecond;
private final TimerGauge idleTimePerSecond;
private final Gauge busyTimePerSecond;
private final Meter backPressuredTimePerSecond;
private final TimerGauge backPressuredTimePerSecond;

private volatile boolean busyTimeEnabled;

Expand All @@ -71,10 +72,9 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
this.numBuffersOutRate =
meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut));

this.idleTimePerSecond =
meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter()));
this.idleTimePerSecond = gauge(MetricNames.TASK_IDLE_TIME, new TimerGauge());
this.backPressuredTimePerSecond =
meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter()));
gauge(MetricNames.TASK_BACK_PRESSURED_TIME, new TimerGauge());
this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
}

Expand Down Expand Up @@ -106,11 +106,11 @@ public Counter getNumBuffersOutCounter() {
return numBuffersOut;
}

public Meter getIdleTimeMsPerSecond() {
public TimerGauge getIdleTimeMsPerSecond() {
return idleTimePerSecond;
}

public Meter getBackPressuredTimePerSecond() {
public TimerGauge getBackPressuredTimePerSecond() {
return backPressuredTimePerSecond;
}

Expand All @@ -119,7 +119,7 @@ public void setEnableBusyTime(boolean enabled) {
}

private double getBusyTimePerSecond() {
double busyTime = idleTimePerSecond.getRate() + backPressuredTimePerSecond.getRate();
double busyTime = idleTimePerSecond.getValue() + backPressuredTimePerSecond.getValue();
return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -510,7 +511,7 @@ public void testIdleAndBackPressuredTime() throws IOException, InterruptedExcept
assertNotNull(buffer);

// back-pressured time is zero when there is buffer available.
assertEquals(0, resultPartition.getBackPressuredTimeMsPerSecond().getCount());
assertThat(resultPartition.getBackPressuredTimeMsPerSecond().getValue(), equalTo(0L));

CountDownLatch syncLock = new CountDownLatch(1);
final Thread requestThread =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// ----------------------------------------------------------------------------
// This class is largely adapted from "com.google.common.base.Preconditions",
// which is part of the "Guava" library.
//
// Because of frequent issues with dependency conflicts, this class was
// added to the Flink code base to reduce dependency on Guava.
// ----------------------------------------------------------------------------

package org.apache.flink.runtime.metrics;

import org.apache.flink.metrics.View;
import org.apache.flink.util.clock.ManualClock;

import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/** Tests for {@link TimerGauge}. */
public class TimerGaugeTest {
private static final long SLEEP = 10;

@Test
public void testBasicUsage() {
ManualClock clock = new ManualClock(42_000_000);
TimerGauge gauge = new TimerGauge(clock);

gauge.update();
assertThat(gauge.getValue(), is(0L));

gauge.markStart();
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
gauge.markEnd();
gauge.update();

assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS));
}

@Test
public void testUpdateWithoutMarkingEnd() {
ManualClock clock = new ManualClock(42_000_000);
TimerGauge gauge = new TimerGauge(clock);

gauge.markStart();
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);
gauge.update();

assertThat(gauge.getValue(), greaterThanOrEqualTo(SLEEP / View.UPDATE_INTERVAL_SECONDS));
}

@Test
public void testGetWithoutUpdate() {
ManualClock clock = new ManualClock(42_000_000);
TimerGauge gauge = new TimerGauge(clock);

gauge.markStart();
clock.advanceTime(SLEEP, TimeUnit.MILLISECONDS);

assertThat(gauge.getValue(), is(0L));

gauge.markEnd();

assertThat(gauge.getValue(), is(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import org.junit.Test;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

/** Tests for the {@link TaskIOMetricGroup}. */
public class TaskIOMetricGroupTest {
@Test
public void testTaskIOMetricGroup() {
public void testTaskIOMetricGroup() throws InterruptedException {
TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
TaskIOMetricGroup taskIO = task.getIOMetricGroup();

Expand All @@ -52,14 +54,21 @@ public void testTaskIOMetricGroup() {
taskIO.getNumBytesInCounter().inc(100L);
taskIO.getNumBytesOutCounter().inc(250L);
taskIO.getNumBuffersOutCounter().inc(3L);
taskIO.getIdleTimeMsPerSecond().markEvent(2L);
taskIO.getIdleTimeMsPerSecond().markStart();
taskIO.getBackPressuredTimePerSecond().markStart();
long sleepTime = 2L;
Thread.sleep(sleepTime);
taskIO.getIdleTimeMsPerSecond().markEnd();
taskIO.getBackPressuredTimePerSecond().markEnd();

IOMetrics io = taskIO.createSnapshot();
assertEquals(32L, io.getNumRecordsIn());
assertEquals(64L, io.getNumRecordsOut());
assertEquals(100L, io.getNumBytesIn());
assertEquals(250L, io.getNumBytesOut());
assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
assertEquals(2L, taskIO.getIdleTimeMsPerSecond().getCount());
assertThat(taskIO.getIdleTimeMsPerSecond().getCount(), greaterThanOrEqualTo(sleepTime));
assertThat(
taskIO.getBackPressuredTimePerSecond().getCount(), greaterThanOrEqualTo(sleepTime));
}
}
Loading

0 comments on commit 7aafc4c

Please sign in to comment.