Skip to content

Commit

Permalink
Fixed apache#258
Browse files Browse the repository at this point in the history
  • Loading branch information
sewen committed Jul 18, 2012
1 parent ecf79bc commit bdcbb5e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.io.EOFException;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.io.InputGate;
Expand All @@ -43,8 +46,10 @@
* @param <T>
* the type of record that can be transported through this channel
*/
public abstract class AbstractByteBufferedInputChannel<T extends Record> extends AbstractInputChannel<T> {

public abstract class AbstractByteBufferedInputChannel<T extends Record> extends AbstractInputChannel<T>
{
private static final Log LOG = LogFactory.getLog(AbstractByteBufferedInputChannel.class);

/**
* The deserializer used to deserialize records.
*/
Expand Down Expand Up @@ -191,16 +196,14 @@ public void close() throws IOException, InterruptedException {
}

// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
if (this.getType() == ChannelType.NETWORK) {
if (!this.brokerAggreedToCloseChannel) {
while (!this.brokerAggreedToCloseChannel) {

requestReadBufferFromBroker();
if (this.dataBuffer != null) {
releasedConsumedReadBuffer();
}
Thread.sleep(500);
if (this.getType() == ChannelType.NETWORK || this.getType() == ChannelType.INMEMORY) {
while (!this.brokerAggreedToCloseChannel) {
requestReadBufferFromBroker();
if (this.dataBuffer != null) {
releasedConsumedReadBuffer();
continue;
}
Thread.sleep(200);
}
}

Expand Down Expand Up @@ -267,7 +270,7 @@ public void processEvent(AbstractEvent event) {
.getCurrentInternalCompressionLibraryIndex());
} else {
// TODO: Handle unknown event
System.out.println("Received unknown event:" + event);
LOG.error("Received unknown event: " + event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void releaseConsumedReadBuffer(final Buffer buffer) {
this.envelopeConsumptionLog.reportEnvelopeConsumed(this.byteBufferedInputChannel);

if (buffer.remaining() > 0) {
LOG.error("consumedReadBuffer has " + buffer.remaining() + " unconsumed bytes left!!");
LOG.warn("ConsumedReadBuffer has " + buffer.remaining() + " unconsumed bytes left (early end of reading?).");
}

// Recycle consumed read buffer
Expand Down

0 comments on commit bdcbb5e

Please sign in to comment.