From bdcbb5e5a8db3615cad224e19f783615313fb140 Mon Sep 17 00:00:00 2001 From: sewen Date: Wed, 18 Jul 2012 14:32:27 +0200 Subject: [PATCH] Fixed #258 --- .../AbstractByteBufferedInputChannel.java | 27 ++++++++++--------- .../runtime/RuntimeInputChannelContext.java | 2 +- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java index 5ec5a5c792c8a..dcc0f48eb8a2c 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java @@ -18,6 +18,9 @@ import java.io.EOFException; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import eu.stratosphere.nephele.event.task.AbstractEvent; import eu.stratosphere.nephele.event.task.AbstractTaskEvent; import eu.stratosphere.nephele.io.InputGate; @@ -43,8 +46,10 @@ * @param * the type of record that can be transported through this channel */ -public abstract class AbstractByteBufferedInputChannel extends AbstractInputChannel { - +public abstract class AbstractByteBufferedInputChannel extends AbstractInputChannel +{ + private static final Log LOG = LogFactory.getLog(AbstractByteBufferedInputChannel.class); + /** * The deserializer used to deserialize records. */ @@ -191,16 +196,14 @@ public void close() throws IOException, InterruptedException { } // This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed - if (this.getType() == ChannelType.NETWORK) { - if (!this.brokerAggreedToCloseChannel) { - while (!this.brokerAggreedToCloseChannel) { - - requestReadBufferFromBroker(); - if (this.dataBuffer != null) { - releasedConsumedReadBuffer(); - } - Thread.sleep(500); + if (this.getType() == ChannelType.NETWORK || this.getType() == ChannelType.INMEMORY) { + while (!this.brokerAggreedToCloseChannel) { + requestReadBufferFromBroker(); + if (this.dataBuffer != null) { + releasedConsumedReadBuffer(); + continue; } + Thread.sleep(200); } } @@ -267,7 +270,7 @@ public void processEvent(AbstractEvent event) { .getCurrentInternalCompressionLibraryIndex()); } else { // TODO: Handle unknown event - System.out.println("Received unknown event:" + event); + LOG.error("Received unknown event: " + event); } } diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java index 1c20142d2b019..9df1d8008dc47 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java @@ -154,7 +154,7 @@ public void releaseConsumedReadBuffer(final Buffer buffer) { this.envelopeConsumptionLog.reportEnvelopeConsumed(this.byteBufferedInputChannel); if (buffer.remaining() > 0) { - LOG.error("consumedReadBuffer has " + buffer.remaining() + " unconsumed bytes left!!"); + LOG.warn("ConsumedReadBuffer has " + buffer.remaining() + " unconsumed bytes left (early end of reading?)."); } // Recycle consumed read buffer