Skip to content

Commit

Permalink
Merge branch 'union' into version02_wo_dm
Browse files Browse the repository at this point in the history
Conflicts:
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/InputGate.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
  • Loading branch information
Daniel Warneke committed Dec 7, 2011
2 parents 9958569 + 70816f5 commit 38ad0ec
Show file tree
Hide file tree
Showing 10 changed files with 809 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ public class InputGate<T extends Record> extends AbstractGate<T> implements IORe
*/
private InputGateListener[] inputGateListeners = null;


* The listener object to be notified when a channel has at least one record available.
*/
private RecordAvailabilityListener<T> recordAvailabilityListener = null;

/**
* If the value of this variable is set to <code>true</code>, the input gate is closed.
*/
private boolean isClosed = false;

private final EventNotificationManager eventNotificationManager = new EventNotificationManager();

/**
* The channel to read from next.
*/
Expand Down Expand Up @@ -333,6 +345,11 @@ public T readRecord(final T target) throws IOException, InterruptedException {
while (true) {

if (this.channelToReadFrom == -1) {

if (this.isClosed()) {
return null;
}

this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
}
try {
Expand All @@ -341,6 +358,7 @@ record = this.getInputChannel(this.channelToReadFrom).readRecord(target);
// System.out.println("### Caught EOF exception at channel " + channelToReadFrom + "(" +
// this.getInputChannel(channelToReadFrom).getType().toString() + ")");
if (this.isClosed()) {
this.channelToReadFrom = -1;
return null;
}
}
Expand All @@ -365,6 +383,10 @@ public void notifyRecordIsAvailable(int channelIndex) {

this.availableChannels.add(Integer.valueOf(channelIndex));
this.availableChannels.notify();

if (this.recordAvailabilityListener != null) {
this.recordAvailabilityListener.reportRecordAvailability(this);
}
}
}

Expand Down Expand Up @@ -481,13 +503,19 @@ public void write(DataOutput out) throws IOException {
@Override
public boolean isClosed() throws IOException, InterruptedException {

if (this.isClosed) {
return true;
}

for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
if (!inputChannel.isClosed()) {
return false;
}
}

this.isClosed = true;

return true;
}

Expand Down Expand Up @@ -580,4 +608,34 @@ public void releaseAllChannelResources() {
it.next().releaseResources();
}
}

boolean hasRecordAvailable() throws IOException {

if (this.channelToReadFrom == -1) {

if (this.isClosed()) {
return true;
}

synchronized (this.availableChannels) {

return !(this.availableChannels.isEmpty());
}
}

return true;
}

void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {

synchronized (this.availableChannels) {

if (this.recordAvailabilityListener != null) {
throw new IllegalStateException(this.recordAvailabilityListener
+ " is already registered as a record availability listener");
}

this.recordAvailabilityListener = listener;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.io;

import eu.stratosphere.nephele.types.Record;

/**
* This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
* input channels has at least one record available for reading.
*
* @author warneke
* @param <T>
* the type of record transported through the corresponding input gate
*/
public interface RecordAvailabilityListener<T extends Record> {

/**
* This method is called by an input gate when one of its connected input channels has at least one record available
* for reading.
*
* @param inputGate
* the input gate which has at least one record available
*/
void reportRecordAvailability(InputGate<T> inputGate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,70 @@ public T next() throws IOException, InterruptedException {

return retVal;
}

/**
* Registers a new listener object with the assigned input gate.
*
* @param inputGateListener
* the listener object to register
*/
public void registerInputGateListener(InputGateListener inputGateListener) {

this.inputGate.registerInputGateListener(inputGateListener);
}

/**
* Subscribes the listener object to receive events of the given type.
*
* @param eventListener
* the listener object to register
* @param eventType
* the type of event to register the listener for
*/
public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {

// Delegate call to input gate
this.inputGate.subscribeToEvent(eventListener, eventType);
}

/**
* Removes the subscription for events of the given type for the listener object.
*
* @param eventListener
* the listener object to cancel the subscription for
* @param eventType
* the type of the event to cancel the subscription for
*/
public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {

// Delegate call to input gate
this.inputGate.unsubscribeFromEvent(eventListener, eventType);
}

/**
* Publishes an event.
*
* @param event
* the event to be published
* @throws IOException
* thrown if an error occurs while transmitting the event
* @throws InterruptedException
* thrown if the thread is interrupted while waiting for the event to be published
*/
public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {

// Delegate call to input gate
this.inputGate.publishEvent(event);
}

/**
* Exposes the input gate which is used by this record reader. This method should have
* package visibility only.
*
* @return the input gate used by this record reader
*/
InputGate<T> getInputGate() {

return this.inputGate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.io;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;

import eu.stratosphere.nephele.types.Record;

public final class UnionRecordReader<T extends Record> implements Reader<T>, RecordAvailabilityListener<T> {

private final Set<InputGate<T>> inputGates;

private InputGate<T> nextInputGateToReadFrom = null;

private IOException ioException = null;

private InterruptedException interruptedExecption = null;

/**
* Queue with indices of channels that store at least one available record.
*/
private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>();

private T nextRecord = null;

public UnionRecordReader(final RecordReader<T>[] recordReaders) {

if (recordReaders == null) {
throw new IllegalArgumentException("Provided argument recordReaders is null");
}

if (recordReaders.length < 2) {
throw new IllegalArgumentException(
"The union record reader must at least be initialized with two individual record readers");
}

this.inputGates = new HashSet<InputGate<T>>(recordReaders.length);
for (final RecordReader<T> rr : recordReaders) {
final InputGate<T> inputGate = rr.getInputGate();
inputGate.registerRecordAvailabilityListener(this);
this.inputGates.add(inputGate);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean hasNext() {

if (this.nextRecord != null) {
return true;
}

try {
this.nextRecord = readNextRecord();
} catch (IOException ioe) {
this.ioException = ioe;
return true;
} catch (InterruptedException ie) {
this.interruptedExecption = ie;
return true;
}

if (this.nextRecord != null) {
return true;
}

return false;
}

/**
* {@inheritDoc}
*/
@Override
public T next() throws IOException, InterruptedException {

if (this.nextRecord == null && this.inputGates.isEmpty()) {
throw new NoSuchElementException();
}

if (this.ioException != null) {
throw this.ioException;
}

if (this.interruptedExecption != null) {
throw this.interruptedExecption;
}

if (this.nextRecord == null) {
this.nextRecord = readNextRecord();
}

T retVal = this.nextRecord;
this.nextRecord = null;

return retVal;
}

/**
* Reads the next record from one of the underlying input gates.
*
* @return the next record from the underlying input gates or <code>null</code> if all underlying input gates are
* closed.
* @throws IOException
* thrown if one of the underlying input gates experienced an IOException
* @throws InterruptedException
* thrown if one of the underlying input gates experienced an InterruptedException
*/
private T readNextRecord() throws IOException, InterruptedException {

while (true) {

if (this.inputGates.isEmpty()) {
return null;
}

if (this.nextInputGateToReadFrom == null) {

synchronized (this.availableInputGates) {

while (this.availableInputGates.isEmpty()) {
this.availableInputGates.wait();
}

this.nextInputGateToReadFrom = this.availableInputGates.pop();
}
}

if (this.nextInputGateToReadFrom.hasRecordAvailable()) {

final T record = this.nextInputGateToReadFrom.readRecord();
if (record == null) { // Gate is closed
this.inputGates.remove(this.nextInputGateToReadFrom);
this.nextInputGateToReadFrom = null;
} else {
return record;
}
} else {
this.nextInputGateToReadFrom = null;
}

}

}

/**
* {@inheritDoc}
*/
@Override
public void reportRecordAvailability(final InputGate<T> inputGate) {

synchronized (this.availableInputGates) {
this.availableInputGates.add(inputGate);
this.availableInputGates.notify();

}
}
}
Loading

0 comments on commit 38ad0ec

Please sign in to comment.