Skip to content

Commit

Permalink
[hotfix] [network] Add DEBUG log messages to intermediate results
Browse files Browse the repository at this point in the history
This adds log messages about created result partition consumers and
spilling.
  • Loading branch information
uce committed Jul 13, 2016
1 parent d17fe4f commit 565f941
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,11 @@ public boolean isReleased() {
public Throwable getFailureCause() {
return parent.getFailureCause();
}

@Override
public String toString() {
return String.format("PipelinedSubpartitionView(index: %d) of ResultPartition %s",
parent.index,
parent.parent.getPartitionId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,11 @@ public ResultSubpartitionView createSubpartitionView(int index, BufferProvider b

checkElementIndex(index, subpartitions.length, "Subpartition not found.");

return subpartitions[index].createReadView(bufferProvider);
ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider);

LOG.debug("Created {}", readView);

return readView;
}

public Throwable getFailureCause() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,16 @@ public int releaseMemory() throws IOException {

final int numberOfBuffers = buffers.size();

long spilledBytes = 0;

// Spill all buffers
for (int i = 0; i < numberOfBuffers; i++) {
spillWriter.writeBlock(buffers.remove(0));
Buffer buffer = buffers.remove(0);
spilledBytes += buffer.getSize();
spillWriter.writeBlock(buffer);
}

LOG.debug("Spilling {} buffers of {}.", numberOfBuffers, this);
LOG.debug("Spilled {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());

return numberOfBuffers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,11 @@ public boolean isReleased() {
public Throwable getFailureCause() {
return parent.getFailureCause();
}

@Override
public String toString() {
return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
parent.index,
parent.parent.getPartitionId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
/** Flag indicating whether we reached EOF at the file reader. */
private volatile boolean hasReachedEndOfFile;

/** Spilled file size */
private final long fileSize;

SpilledSubpartitionViewAsyncIO(
ResultSubpartition parent,
BufferProvider bufferProvider,
Expand Down Expand Up @@ -114,6 +117,8 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {

this.readBatchSize = readBatchSize;

this.fileSize = asyncFileReader.getSize();

// Trigger the initial read requests
readNextBatchAsync();
}
Expand Down Expand Up @@ -347,6 +352,14 @@ public void requestFailed(Buffer buffer, IOException error) {
}
}

@Override
public String toString() {
return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of ResultPartition %s",
parent.index,
fileSize,
parent.parent.getPartitionId());
}

/**
* Callback from the buffer provider.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
/** Flag indicating whether all resources have been released. */
private AtomicBoolean isReleased = new AtomicBoolean();

/** Spilled file size */
private final long fileSize;

SpilledSubpartitionViewSyncIO(
ResultSubpartition parent,
int memorySegmentSize,
Expand All @@ -71,6 +74,8 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
if (initialSeekPosition > 0) {
fileReader.seekToPosition(initialSeekPosition);
}

this.fileSize = fileReader.getSize();
}

@Override
Expand Down Expand Up @@ -117,6 +122,14 @@ public Throwable getFailureCause() {
return parent.getFailureCause();
}

@Override
public String toString() {
return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of ResultPartition %s",
parent.index,
fileSize,
parent.parent.getPartitionId());
}

/**
* A buffer pool to provide buffer to read the file into.
*
Expand Down

0 comments on commit 565f941

Please sign in to comment.