Skip to content

Commit

Permalink
[FLINK-10022][network][metrics] add metrics for input/output buffers
Browse files Browse the repository at this point in the history
This closes apache#6551.
  • Loading branch information
NicoK authored and Nico Kruber committed Aug 15, 2018
1 parent e307f92 commit a40a659
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 6 deletions.
31 changes: 31 additions & 0 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,27 @@ Thus, in order to infer the metric identifier:
<td>The number of bytes this task reads from a remote source per second.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task</strong></th>
<td>numBuffersInLocal</td>
<td>The total number of network buffers this task has read from a local source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBuffersInLocalPerSecond</td>
<td>The number of network buffers this task reads from a local source per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBuffersInRemote</td>
<td>The total number of network buffers this task has read from a remote source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBuffersInRemotePerSecond</td>
<td>The number of network buffers this task reads from a remote source per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBytesOut</td>
<td>The total number of bytes this task has emitted.</td>
Expand All @@ -1253,6 +1274,16 @@ Thus, in order to infer the metric identifier:
<td>The number of bytes this task emits per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBuffersOut</td>
<td>The total number of network buffers this task has emitted.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBuffersOutPerSecond</td>
<td>The number of network buffers this task emits per second.</td>
<td>Meter</td>
</tr>
<tr>
<th rowspan="6"><strong>Task/Operator</strong></th>
<td>numRecordsIn</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class RecordWriter<T extends IOReadableWritable> {

private Counter numBytesOut = new SimpleCounter();

private Counter numBuffersOut = new SimpleCounter();

public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector<T>());
}
Expand Down Expand Up @@ -184,6 +186,7 @@ public void clearBuffers() {
*/
public void setMetricGroup(TaskIOMetricGroup metrics) {
numBytesOut = metrics.getNumBytesOutCounter();
numBuffersOut = metrics.getNumBuffersOutCounter();
}

/**
Expand All @@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerialize
bufferBuilders[targetChannel] = Optional.empty();

numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
serializer.clear();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public abstract class InputChannel {

protected final Counter numBytesIn;

protected final Counter numBuffersIn;

/** The current backoff (in ms) */
private int currentBackoff;

Expand All @@ -73,7 +75,8 @@ protected InputChannel(
ResultPartitionID partitionId,
int initialBackoff,
int maxBackoff,
Counter numBytesIn) {
Counter numBytesIn,
Counter numBuffersIn) {

checkArgument(channelIndex >= 0);

Expand All @@ -91,6 +94,7 @@ protected InputChannel(
this.currentBackoff = initial == 0 ? -1 : 0;

this.numBytesIn = numBytesIn;
this.numBuffersIn = numBuffersIn;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public LocalInputChannel(
int maxBackoff,
TaskIOMetricGroup metrics) {

super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter());

this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
Expand Down Expand Up @@ -194,6 +194,7 @@ Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedE
}

numBytesIn.inc(next.buffer().getSizeUnsafe());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public RemoteInputChannel(
int maxBackoff,
TaskIOMetricGroup metrics) {

super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter());
super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter());

this.connectionId = checkNotNull(connectionId);
this.connectionManager = checkNotNull(connectionManager);
Expand Down Expand Up @@ -199,6 +199,7 @@ Optional<BufferAndAvailability> getNextBuffer() throws IOException {
}

numBytesIn.inc(next.getSizeUnsafe());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public UnknownInputChannel(
int maxBackoff,
TaskIOMetricGroup metrics) {

super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null, null);

this.partitionManager = checkNotNull(partitionManager);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ private MetricNames() {
public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE;
public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE;

public static final String IO_NUM_BUFFERS_IN = "numBuffersIn";
public static final String IO_NUM_BUFFERS_IN_LOCAL = IO_NUM_BUFFERS_IN + "Local";
public static final String IO_NUM_BUFFERS_IN_REMOTE = IO_NUM_BUFFERS_IN + "Remote";
public static final String IO_NUM_BUFFERS_OUT = "numBuffersOut";
public static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = IO_NUM_BUFFERS_IN_LOCAL + SUFFIX_RATE;
public static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = IO_NUM_BUFFERS_IN_REMOTE + SUFFIX_RATE;
public static final String IO_NUM_BUFFERS_OUT_RATE = IO_NUM_BUFFERS_OUT + SUFFIX_RATE;

public static final String IO_CURRENT_INPUT_WATERMARK = "currentInputWatermark";
public static final String IO_CURRENT_INPUT_1_WATERMARK = "currentInput1Watermark";
public static final String IO_CURRENT_INPUT_2_WATERMARK = "currentInput2Watermark";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Counter numBytesInRemote;
private final SumCounter numRecordsIn;
private final SumCounter numRecordsOut;
private final Counter numBuffersOut;
private final Counter numBuffersInLocal;
private final Counter numBuffersInRemote;

private final Meter numBytesInRateLocal;
private final Meter numBytesInRateRemote;
private final Meter numBytesOutRate;
private final Meter numRecordsInRate;
private final Meter numRecordsOutRate;
private final Meter numBuffersOutRate;
private final Meter numBuffersInRateLocal;
private final Meter numBuffersInRateRemote;

public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
Expand All @@ -60,10 +66,18 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60));
this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));

this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter());
this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter());
this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));

this.numBuffersOut = counter(MetricNames.IO_NUM_BUFFERS_OUT);
this.numBuffersInLocal = counter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL);
this.numBuffersInRemote = counter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE);
this.numBuffersOutRate = meter(MetricNames.IO_NUM_BUFFERS_OUT_RATE, new MeterView(numBuffersOut, 60));
this.numBuffersInRateLocal = meter(MetricNames.IO_NUM_BUFFERS_IN_LOCAL_RATE, new MeterView(numBuffersInLocal, 60));
this.numBuffersInRateRemote = meter(MetricNames.IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
}

public IOMetrics createSnapshot() {
Expand Down Expand Up @@ -93,6 +107,18 @@ public Counter getNumRecordsOutCounter() {
return numRecordsOut;
}

public Counter getNumBuffersOutCounter() {
return numBuffersOut;
}

public Counter getNumBuffersInLocalCounter() {
return numBuffersInLocal;
}

public Counter getNumBuffersInRemoteCounter() {
return numBuffersInRemote;
}

public Meter getNumBytesInLocalRateMeter() {
return numBytesInRateLocal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private MockInputChannel(
int initialBackoff,
int maxBackoff) {

super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter(), new SimpleCounter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class TestInputChannel extends InputChannel {
private boolean isReleased = false;

TestInputChannel(SingleInputGate inputGate, int channelIndex) {
super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter());
super(inputGate, channelIndex, new ResultPartitionID(), 0, 0, new SimpleCounter(), new SimpleCounter());
}

public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ public void testTaskIOMetricGroup() {
taskIO.getNumBytesInLocalCounter().inc(100L);
taskIO.getNumBytesInRemoteCounter().inc(150L);
taskIO.getNumBytesOutCounter().inc(250L);
taskIO.getNumBuffersInLocalCounter().inc(1L);
taskIO.getNumBuffersInRemoteCounter().inc(2L);
taskIO.getNumBuffersOutCounter().inc(3L);

IOMetrics io = taskIO.createSnapshot();
assertEquals(32L, io.getNumRecordsIn());
assertEquals(64L, io.getNumRecordsOut());
assertEquals(100L, io.getNumBytesInLocal());
assertEquals(150L, io.getNumBytesInRemote());
assertEquals(250L, io.getNumBytesOut());
assertEquals(1L, taskIO.getNumBuffersInLocalCounter().getCount());
assertEquals(2L, taskIO.getNumBuffersInRemoteCounter().getCount());
assertEquals(3L, taskIO.getNumBuffersOutCounter().getCount());
}
}

0 comments on commit a40a659

Please sign in to comment.