Skip to content

Commit

Permalink
Introduced hybrid event model
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 1, 2012
1 parent 9d3c5a2 commit e6d7fd6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Iterator;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;

public abstract class AbstractOutputChannelContext implements OutputChannelContext {
Expand All @@ -44,10 +45,40 @@ public void queueTransferEnvelope(final TransferEnvelope transferEnvelope) {

final Iterator<AbstractEvent> it = transferEnvelope.getEventList().iterator();
while (it.hasNext()) {
this.forwardingChain.offerEvent(it.next());

final AbstractEvent event = it.next();
if (event instanceof AbstractTaskEvent) {
processEventAsynchronously(event);
} else {
processEventSynchronously(event);
}
}
}

/**
* Processes an event received from the framework in a synchronous fashion, i.e. the event processing is done by the
* thread the event is destined for (usually the task thread).
*
* @param event
* the event to be processed
*/
protected void processEventSynchronously(final AbstractEvent event) {

this.forwardingChain.offerEvent(event);
}

/**
* Processes an event received from the framework in an asynchronous fashion, i.e. the event processing is done by
* the thread which delivers the event.
*
* @param event
* the event to be processed
*/
protected void processEventAsynchronously(final AbstractEvent event) {

// The default implementation does nothing
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void processEvent(final AbstractEvent event) {
} else if (event instanceof ReceiverNotFoundEvent) {
this.lastSequenceNumberWithReceiverNotFound = ((ReceiverNotFoundEvent) event).getSequenceNumber();
} else if (event instanceof AbstractTaskEvent) {
this.byteBufferedOutputChannel.processEvent(event);
throw new IllegalStateException("Received synchronous task event " + event);
}

getNext().processEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package eu.stratosphere.nephele.taskmanager.runtime;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedOutputChannel;
Expand Down Expand Up @@ -77,4 +78,13 @@ public ChannelType getType() {

return this.byteBufferedOutputChannel.getType();
}

/**
* {@inheritDoc}
*/
@Override
protected void processEventAsynchronously(final AbstractEvent event) {

this.byteBufferedOutputChannel.processEvent(event);
}
}

0 comments on commit e6d7fd6

Please sign in to comment.