Skip to content

Commit

Permalink
Implemented piggybacking of lookup information
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 10, 2012
1 parent a17486d commit f79b81e
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package eu.stratosphere.nephele.taskmanager.bytebuffered;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -145,7 +146,14 @@ public void register(final Task task, final Set<ChannelID> activeOutputChannels)

// Add routing entry to receiver cache to reduce latency
if (outputChannelContext.getType() == ChannelType.INMEMORY) {
addReceiverListHint(outputChannelContext);
addReceiverListHint(outputChannelContext.getChannelID(),
outputChannelContext.getConnectedChannelID());
}

// Add routing entry to receiver cache to save lookup for data arriving at the output channel
if (outputChannelContext.getType() == ChannelType.NETWORK) {
addReceiverListHint(outputChannelContext.getConnectedChannelID(),
outputChannelContext.getChannelID());
}

if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -173,7 +181,7 @@ public void register(final Task task, final Set<ChannelID> activeOutputChannels)

// Add routing entry to receiver cache to reduce latency
if (inputChannelContext.getType() == ChannelType.INMEMORY) {
addReceiverListHint(inputChannelContext);
addReceiverListHint(inputChannelContext.getChannelID(), inputChannelContext.getConnectedChannelID());
}

this.registeredChannels.put(inputChannelContext.getChannelID(), inputChannelContext);
Expand Down Expand Up @@ -406,6 +414,12 @@ private void processEnvelopeWithBuffer(final TransferEnvelope transferEnvelope,
if (receiverList.hasRemoteReceivers()) {

final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();

// Generate sender hint before sending the first envelope over the network
if (transferEnvelope.getSequenceNumber() == 0) {
generateSenderHint(transferEnvelope, remoteReceivers);
}

for (final RemoteReceiver remoteReceiver : remoteReceivers) {

TransferEnvelope dup = null;
Expand All @@ -432,7 +446,7 @@ private void processEnvelopeWithBuffer(final TransferEnvelope transferEnvelope,
}
}

private boolean processEnvelopeEnvelopeWithoutBuffer(final TransferEnvelope transferEnvelope,
private void processEnvelopeEnvelopeWithoutBuffer(final TransferEnvelope transferEnvelope,
final TransferEnvelopeReceiverList receiverList) throws IOException, InterruptedException {

// No need to copy anything
Expand All @@ -450,23 +464,71 @@ private boolean processEnvelopeEnvelopeWithoutBuffer(final TransferEnvelope tran
channelContext.queueTransferEnvelope(transferEnvelope);
}

final Iterator<RemoteReceiver> remoteIt = receiverList.getRemoteReceivers().iterator();
if (!receiverList.hasRemoteReceivers()) {
return;
}

// Generate sender hint before sending the first envelope over the network
final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();
if (transferEnvelope.getSequenceNumber() == 0) {
generateSenderHint(transferEnvelope, remoteReceivers);
}

final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();

while (remoteIt.hasNext()) {

final RemoteReceiver remoteReceiver = remoteIt.next();
this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, transferEnvelope);
}
}

private void addReceiverListHint(final ChannelID source, final ChannelID localReceiver) {

final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(localReceiver);

return true;
if (this.receiverCache.put(source, receiverList) != null) {
LOG.warn("Receiver cache already contained entry for " + source);
}
}

private void addReceiverListHint(final ChannelContext channelContext) {
private void addReceiverListHint(final ChannelID source, final RemoteReceiver remoteReceiver) {

TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(channelContext);
final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(remoteReceiver);

if (this.receiverCache.put(channelContext.getChannelID(), receiverList) != null) {
LOG.warn("Receiver cache already contained entry for " + channelContext.getChannelID());
if (this.receiverCache.put(source, receiverList) != null) {
LOG.warn("Receiver cache already contained entry for " + source);
}
}

private void generateSenderHint(final TransferEnvelope transferEnvelope, final List<RemoteReceiver> remoteReceivers) {

final ChannelContext channelContext = this.registeredChannels.get(transferEnvelope.getSource());
if (channelContext == null) {
LOG.error("Cannot find channel context for channel ID " + transferEnvelope.getSource());
return;
}

// Only generate sender hints for output channels
if (channelContext.isInputChannel()) {
return;
}

final ChannelID remoteSourceID = channelContext.getConnectedChannelID();
final int connectionIndex = remoteReceivers.get(0).getConnectionIndex();
final InetSocketAddress isa = new InetSocketAddress(this.localConnectionInfo.getAddress(),
this.localConnectionInfo.getDataPort());

final RemoteReceiver remoteReceiver = new RemoteReceiver(isa, connectionIndex);
final TransferEnvelope senderHint = SenderHintEvent.createEnvelopeWithEvent(transferEnvelope, remoteSourceID,
remoteReceiver);

final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();

while (remoteIt.hasNext()) {

final RemoteReceiver rr = remoteIt.next();
this.networkConnectionManager.queueEnvelopeForTransfer(rr, senderHint);
}
}

Expand All @@ -486,39 +548,41 @@ private TransferEnvelopeReceiverList getReceiverList(final JobID jobID, final Ch

TransferEnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);

if (receiverList == null) {
if (receiverList != null) {
return receiverList;
}

while (true) {
while (true) {

if (Thread.currentThread().isInterrupted()) {
break;
}
if (Thread.currentThread().isInterrupted()) {
break;
}

ConnectionInfoLookupResponse lookupResponse;
synchronized (this.channelLookupService) {
lookupResponse = this.channelLookupService.lookupConnectionInfo(
this.localConnectionInfo, jobID, sourceChannelID);
}
ConnectionInfoLookupResponse lookupResponse;
synchronized (this.channelLookupService) {
lookupResponse = this.channelLookupService.lookupConnectionInfo(
this.localConnectionInfo, jobID, sourceChannelID);
}

if (lookupResponse.isJobAborting()) {
break;
}
if (lookupResponse.isJobAborting()) {
break;
}

if (lookupResponse.receiverNotFound()) {
LOG.error("Cannot find task(s) waiting for data from source channel with ID " + sourceChannelID);
break;
}
if (lookupResponse.receiverNotFound()) {
LOG.error("Cannot find task(s) waiting for data from source channel with ID " + sourceChannelID);
break;
}

if (lookupResponse.receiverNotReady()) {
Thread.sleep(500);
continue;
}
if (lookupResponse.receiverNotReady()) {
Thread.sleep(500);
continue;
}

if (lookupResponse.receiverReady()) {
receiverList = new TransferEnvelopeReceiverList(lookupResponse);
break;
}
if (lookupResponse.receiverReady()) {
receiverList = new TransferEnvelopeReceiverList(lookupResponse);
break;
}

}

if (receiverList != null) {
Expand Down Expand Up @@ -581,6 +645,18 @@ public void processEnvelopeFromInputChannel(final TransferEnvelope transferEnvel
public void processEnvelopeFromNetwork(final TransferEnvelope transferEnvelope, boolean freeSourceBuffer)
throws IOException, InterruptedException {

// Check if the envelope is the special envelope with the sender hint event
if (SenderHintEvent.isSenderHintEvent(transferEnvelope)) {

// Check if this is the final destination of the sender hint event before adding it
final SenderHintEvent seh = (SenderHintEvent) transferEnvelope.getEventList().get(0);
if (this.registeredChannels.get(seh.getSource()) != null) {

addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
return;
}
}

processEnvelope(transferEnvelope, freeSourceBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.taskmanager.bytebuffered;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.EventList;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;

public final class SenderHintEvent extends AbstractEvent {

/**
* The sequence number that will be set for transfer envelopes which contain the sender hint event.
*/
private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;

private final ChannelID source;

private final RemoteReceiver remoteReceiver;

SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {

if (source == null) {
throw new IllegalArgumentException("Argument source must not be null");
}

if (remoteReceiver == null) {
throw new IllegalArgumentException("Argument remoteReceiver must not be null");
}

this.source = source;
this.remoteReceiver = remoteReceiver;
}

public SenderHintEvent() {

this.source = new ChannelID();
this.remoteReceiver = new RemoteReceiver();
}

public ChannelID getSource() {

return this.source;
}

public RemoteReceiver getRemoteReceiver() {

return this.remoteReceiver;
}

/**
* {@inheritDoc}
*/
@Override
public void write(final DataOutput out) throws IOException {

this.source.write(out);
this.remoteReceiver.write(out);
}

/**
* {@inheritDoc}
*/
@Override
public void read(final DataInput in) throws IOException {

this.source.read(in);
this.remoteReceiver.read(in);
}

public static TransferEnvelope createEnvelopeWithEvent(final TransferEnvelope originalEnvelope,
final ChannelID source, final RemoteReceiver remoteReceiver) {

final TransferEnvelope transferEnvelope = new TransferEnvelope(SENDER_HINT_SEQUENCE_NUMBER,
originalEnvelope.getJobID(), originalEnvelope.getSource());

final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
transferEnvelope.addEvent(senderEvent);

return transferEnvelope;
}

static boolean isSenderHintEvent(final TransferEnvelope transferEnvelope) {

if (transferEnvelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
return false;
}

if (transferEnvelope.getBuffer() != null) {
return false;
}

final EventList eventList = transferEnvelope.getEventList();
if (eventList == null) {
return false;
}

if (eventList.size() != 1) {
return false;
}

if (!(eventList.get(0) instanceof SenderHintEvent)) {
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.List;

import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;

Expand All @@ -47,20 +45,24 @@ public TransferEnvelopeReceiverList(final ConnectionInfoLookupResponse cilr) {
this.remoteReceivers = Collections.unmodifiableList(cilr.getRemoteTargets());
}

public TransferEnvelopeReceiverList(final ChannelContext channelContext) {

if (channelContext.getType() != ChannelType.INMEMORY) {
throw new IllegalArgumentException(
"Transfer envelope receiver lists can only be constructed from in-memory channels.");
}
public TransferEnvelopeReceiverList(final ChannelID localReceiver) {

final List<ChannelID> lr = new ArrayList<ChannelID>(1);
lr.add(channelContext.getConnectedChannelID());
lr.add(localReceiver);

this.localReceivers = Collections.unmodifiableList(lr);
this.remoteReceivers = Collections.emptyList();
}

public TransferEnvelopeReceiverList(final RemoteReceiver remoteReceiver) {

final List<RemoteReceiver> rr = new ArrayList<RemoteReceiver>(1);
rr.add(remoteReceiver);

this.localReceivers = Collections.emptyList();
this.remoteReceivers = Collections.unmodifiableList(rr);
}

public boolean hasLocalReceivers() {

return (!this.localReceivers.isEmpty());
Expand Down

0 comments on commit f79b81e

Please sign in to comment.