Skip to content

Commit

Permalink
[FLINK-10131][network] improve logging around subpartitions
Browse files Browse the repository at this point in the history
- add task name
- add subpartition index

This closes apache#6547.
  • Loading branch information
NicoK authored and Nico Kruber committed Sep 10, 2018
1 parent d621322 commit 5f8d91f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void flush() {
@Override
public void finish() throws IOException {
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
LOG.debug("Finished {}.", this);
LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
}

private boolean add(BufferConsumer bufferConsumer, boolean finish) {
Expand Down Expand Up @@ -132,7 +132,7 @@ public void release() {
isReleased = true;
}

LOG.debug("Released {}.", this);
LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);

if (view != null) {
view.releaseAllResources();
Expand Down Expand Up @@ -224,7 +224,8 @@ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener avail
"Subpartition %s of is being (or already has been) consumed, " +
"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());

LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
parent.getOwningTaskName(), index, parent.getPartitionId());

readView = new PipelinedSubpartitionView(this, availabilityListener);
if (!buffers.isEmpty()) {
Expand Down Expand Up @@ -268,8 +269,8 @@ public String toString() {
}

return String.format(
"PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
"PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ public JobID getJobId() {
return jobId;
}

public String getOwningTaskName() {
return owningTaskName;
}

public ResultPartitionID getPartitionId() {
return partitionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public synchronized void finish() throws IOException {
if (spillWriter != null) {
spillWriter.close();
}
LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
}

@Override
Expand Down Expand Up @@ -180,6 +181,8 @@ public synchronized void release() throws IOException {
isReleased = true;
}

LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);

if (view != null) {
view.releaseAllResources();
}
Expand Down Expand Up @@ -236,8 +239,8 @@ public int releaseMemory() throws IOException {
long spilledBytes = spillFinishedBufferConsumers(isFinished);
int spilledBuffers = numberOfBuffers - buffers.size();

LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
spilledBytes, spilledBuffers, index, parent.getPartitionId());
LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());

return spilledBuffers;
}
Expand Down Expand Up @@ -300,9 +303,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {

@Override
public String toString() {
return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
}

Expand Down

0 comments on commit 5f8d91f

Please sign in to comment.