Skip to content

Commit

Permalink
Implemented streaming input and output listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Nov 17, 2011
1 parent 9defcef commit fc67fa6
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ record = this.getInputChannel(this.channelToReadFrom).readRecord(target);
}
}

if (this.inputGateListeners != null) {
for (final InputGateListener inputGateListener : this.inputGateListeners) {
inputGateListener.recordReceived(record);
}
}

return record;
}

Expand Down Expand Up @@ -427,8 +433,8 @@ public void read(DataInput in) throws IOException {
AbstractInputChannel<T> eic = null;
try {
final Constructor<AbstractInputChannel<T>> constructor = (Constructor<AbstractInputChannel<T>>) c
.getDeclaredConstructor(this.getClass(), int.class, RecordDeserializer.class, ChannelID.class,
CompressionLevel.class);
.getDeclaredConstructor(this.getClass(), int.class, RecordDeserializer.class, ChannelID.class,
CompressionLevel.class);
if (constructor == null) {
throw new IOException("Constructor is null!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package eu.stratosphere.nephele.io;

import eu.stratosphere.nephele.types.Record;

/**
* This listener interface can be used to obtain information
* about the utilization of the attached {@link InputGate}.
Expand All @@ -28,4 +30,12 @@ public interface InputGateListener {
* wait because none of its channels can currently deliver new data.
*/
void waitingForAnyChannel();

/**
* This method is called by the {@link InputGate} whenever it is about to pass a new record to the task.
*
* @param record
* the record which is about to be passed to the application
*/
void recordReceived(final Record record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class OutputGate<T extends Record> extends AbstractGate<T> {
* The class of the record transported through this output gate.
*/
private final Class<T> type;

/**
* The listener objects registered for this output gate.
*/
Expand Down Expand Up @@ -114,7 +114,7 @@ public OutputGate(final JobID jobID, final GateID gateID, final Class<T> inputCl

this.isBroadcast = isBroadcast;
this.type = inputClass;

if (this.isBroadcast) {
this.channelSelector = null;
} else {
Expand All @@ -134,7 +134,7 @@ public OutputGate(final JobID jobID, final GateID gateID, final Class<T> inputCl
public final Class<T> getType() {
return this.type;
}

/**
* Adds a new output channel to the output gate.
*
Expand Down Expand Up @@ -329,9 +329,9 @@ public void requestClose() throws IOException, InterruptedException {
*/
@Override
public boolean isClosed() throws IOException, InterruptedException {

boolean allClosed = true;

for (int i = 0; i < this.getNumberOfOutputChannels(); i++) {
final AbstractOutputChannel<T> outputChannel = this.getOutputChannel(i);
if (!outputChannel.isClosed()) {
Expand Down Expand Up @@ -362,6 +362,12 @@ public void writeRecord(T record) throws IOException, InterruptedException {
throw new InterruptedException();
}

if (this.outputGateListeners != null) {
for (final OutputGateListener outputGateListener : this.outputGateListeners) {
outputGateListener.recordEmitted(record);
}
}

if (this.isBroadcast) {

if (getChannelType() == ChannelType.INMEMORY) {
Expand Down Expand Up @@ -439,7 +445,7 @@ public void read(DataInput in) throws IOException {
constructor.setAccessible(true);

eoc = constructor.newInstance(this, i, channelID, compressionLevel);

} catch (InstantiationException e) {
LOG.error(e);
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@

package eu.stratosphere.nephele.io;

import eu.stratosphere.nephele.types.Record;

public interface OutputGateListener {

void channelCapacityExhausted(int channelIndex);

/**
* This method is called to indicate that a record has just been emitted by the task.
*
* @param record
* the record which has just been emitted
*/
void recordEmitted(final Record record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.InputGateListener;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;

public class InputGateListenerImpl implements InputGateListener {

Expand Down Expand Up @@ -61,4 +62,13 @@ public ExecutionVertexID getExecutionVertexID() {
public int getGateIndex() {
return this.gateIndex;
}

/**
* {@inheritDoc}
*/
@Override
public void recordReceived(final Record record) {
// Nothing to do here

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.OutputGateListener;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;

public class OutputGateListenerImpl implements OutputGateListener {

Expand Down Expand Up @@ -59,4 +60,12 @@ public int getAndResetCounter() {

return retval;
}

/**
* {@inheritDoc}
*/
@Override
public void recordEmitted(final Record record) {
// Nothing to do here
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package eu.stratosphere.nephele.streaming;

import eu.stratosphere.nephele.io.InputGateListener;
import eu.stratosphere.nephele.io.OutputGateListener;
import eu.stratosphere.nephele.types.Record;

public class StreamingTaskListener implements InputGateListener, OutputGateListener {

@Override
public void channelCapacityExhausted(int channelIndex) {
// TODO Auto-generated method stub

}

@Override
public void recordEmitted(Record record) {
// TODO Auto-generated method stub

}

@Override
public void waitingForAnyChannel() {
// TODO Auto-generated method stub

}

@Override
public void recordReceived(Record record) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;
import eu.stratosphere.nephele.types.Record;

public class StreamingTaskManagerPlugin implements TaskManagerPlugin {

StreamingTaskManagerPlugin(final Configuration pluginConfiguration) {
System.out.println("Task Manager plugin loaded");
}

/**
Expand All @@ -41,17 +43,27 @@ public void shutdown() {
@Override
public void registerTask(final ExecutionVertexID id, final Configuration jobConfiguration,
final Environment environment) {
// TODO Auto-generated method stub

final StreamingTaskListener listener = new StreamingTaskListener();

for (int i = 0; i < environment.getNumberOfOutputGates(); ++i) {
final OutputGate<? extends Record> outputGate = environment.getOutputGate(i);
outputGate.registerOutputGateListener(listener);
}

for (int i = 0; i < environment.getNumberOfInputGates(); ++i) {
final InputGate<? extends Record> inputGate = environment.getInputGate(i);
inputGate.registerInputGateListener(listener);
}
}

/**
* {@inheritDoc}
*/
@Override
public void unregisterTask(final ExecutionVertexID id, final Environment environment) {
// TODO Auto-generated method stub

// Nothing to do here
}

}

0 comments on commit fc67fa6

Please sign in to comment.