Skip to content

Commit

Permalink
Merge branch 'stratosphere'
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabian Hueske committed Mar 28, 2011
2 parents ce2cfb6 + ff20d8a commit f7d9b82
Show file tree
Hide file tree
Showing 24 changed files with 806 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package eu.stratosphere.nephele.io;

import java.io.IOException;
import java.util.List;

import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
import eu.stratosphere.nephele.types.Record;

/**
Expand All @@ -32,6 +30,4 @@ public interface Reader<T extends Record> {
boolean hasNext();

T next() throws IOException, InterruptedException;

List<AbstractInputChannel<T>> getInputChannels();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
package eu.stratosphere.nephele.io;

import java.io.IOException;
import java.util.List;

import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.nephele.types.Record;
Expand All @@ -35,7 +33,6 @@
* the type of the record that can be read from this record reader
*/

// FIXME added Writer<T> to make this at least mock-able ... still requires refactoring (en)
public class RecordReader<T extends Record> implements Reader<T> {

/**
Expand Down Expand Up @@ -306,15 +303,6 @@ public T next() throws IOException, InterruptedException {
return retVal;
}

/**
* Returns the list of InputChannels that feed this RecordReader.
*
* @return the list of InputChannels that feed this RecordReader
*/
public List<AbstractInputChannel<T>> getInputChannels() {
return this.inputGate.getInputChannels();
}

/**
* Registers a new listener object with the assigned input gate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,14 @@ public abstract class AbstractByteBufferedInputChannel<T extends Record> extends

private Buffer compressedDataBuffer = null;

/**
* The buffered record to be returned next on a read request.
*/
private T bufferedRecord = null;

private ByteBufferedInputChannelBroker inputChannelBroker = null;

private final Object synchronisationObject = new Object();

private boolean brokerAggreedToCloseChannel = false;

private T bufferedRecord = null;

/**
* The Decompressor-Object to decompress incoming data
*/
Expand Down Expand Up @@ -107,43 +104,40 @@ public AbstractByteBufferedInputChannel(InputGate<T> inputGate, int channelIndex
*/
private T deserializeNextRecord() throws IOException {

T nextRecord = null;

while (true) {

if (this.uncompressedDataBuffer == null) {

synchronized (this.synchronisationObject) {

if (this.ioException != null) {
throw this.ioException;
}
if(this.bufferedRecord != null) {
final T record = this.bufferedRecord;
this.bufferedRecord = null;
return record;
}

if (this.uncompressedDataBuffer == null) {

requestReadBuffersFromBroker();
}
synchronized (this.synchronisationObject) {

if (this.uncompressedDataBuffer == null) {
break;
if (this.ioException != null) {
throw this.ioException;
}

if (this.decompressor != null) {
this.decompressor.decompress();
}
requestReadBuffersFromBroker();
}

nextRecord = this.deserializationBuffer.readData(this.uncompressedDataBuffer);

// If deserialization was successful, exit the loop
if (nextRecord != null) {
break;
if (this.uncompressedDataBuffer == null) {
return null;
}

// Make sure the loop will eventually terminate
if (this.uncompressedDataBuffer.remaining() == 0) {
releasedConsumedReadBuffer();
if (this.decompressor != null) {
this.decompressor.decompress();
}
}

final T nextRecord = this.deserializationBuffer.readData(this.uncompressedDataBuffer);

if (this.uncompressedDataBuffer.remaining() == 0) {
releasedConsumedReadBuffer();
this.bufferedRecord = nextRecord;
return null;
}

return nextRecord;
}

Expand Down Expand Up @@ -171,17 +165,11 @@ private void requestReadBuffersFromBroker() {
@Override
public T readRecord() throws IOException {

T returnRecord = null;

if (isClosed()) {
throw new EOFException();
}

returnRecord = this.bufferedRecord;

this.bufferedRecord = deserializeNextRecord();

return returnRecord;
return deserializeNextRecord();
}

/**
Expand Down Expand Up @@ -220,7 +208,6 @@ public void close() throws IOException, InterruptedException {
if (this.uncompressedDataBuffer != null) {
releasedConsumedReadBuffer();
}
this.bufferedRecord = null;

/*
* Send close event to indicate the input channel has successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public ExecutionState getExecutionState() {
* @param instanceName
* the name of the instance this vertex currently runs on
*/
public void setInstanceName(String instanceName) {
public void setInstanceName(final String instanceName) {
this.instanceName = instanceName;
}

Expand All @@ -278,7 +278,7 @@ public void setInstanceName(String instanceName) {
* @param instanceType
* the type of instance this vertex currently runs on
*/
public void setInstanceType(String instanceType) {
public void setInstanceType(final String instanceType) {
this.instanceType = instanceType;
}

Expand Down
Loading

0 comments on commit f7d9b82

Please sign in to comment.