Skip to content

Commit

Permalink
Extended abstract instance to support plugin communication
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Nov 18, 2011
1 parent 1147122 commit ba452fb
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.plugins.PluginID;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
Expand Down Expand Up @@ -67,6 +70,11 @@ public abstract class AbstractInstance extends NetworkNode {
*/
private TaskOperationProtocol taskManager = null;

/**
* Stores the RPC stub object for the instance's task manager plugin component.
*/
private PluginCommunicationProtocol taskManagerPluginComponent = null;

/**
* Constructs an abstract instance object.
*
Expand Down Expand Up @@ -109,6 +117,26 @@ protected TaskOperationProtocol getTaskManager() throws IOException {
return this.taskManager;
}

/**
* Creates or returns the RPC stub object for the instance's task manager plugin component.
*
* @return the RPC stub object for the instance's task manager plugin component
* @throws IOException
* thrown if the RPC stub object for the task manager plugin component cannot be created
*/
protected PluginCommunicationProtocol getTaskManagerPluginComponent() throws IOException {

if (this.taskManagerPluginComponent == null) {

this.taskManagerPluginComponent = (PluginCommunicationProtocol) RPC.getProxy(
PluginCommunicationProtocol.class, new InetSocketAddress(
getInstanceConnectionInfo().getAddress(), getInstanceConnectionInfo().getIPCPort()), NetUtils
.getSocketFactory());
}

return this.taskManagerPluginComponent;
}

/**
* Returns the type of the instance.
*
Expand Down Expand Up @@ -305,4 +333,36 @@ public synchronized void killTaskManager() throws IOException {

getTaskManager().killTaskManager();
}

/**
* Connects to the plugin component of this instance's task manager and sends data to the plugin with the given ID.
*
* @param pluginID
* the ID of the plugin to send data to
* @param data
* the data to send
* @throws IOException
* thrown if an error occurs while sending the data from the plugin
*/
public synchronized void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {

getTaskManagerPluginComponent().sendData(pluginID, data);
}

/**
* Connects to the plugin component of this instance's task manager and requests data from the plugin with the given
* ID.
*
* @param pluginID
* the ID of the plugin to request data from
* @param data
* data to specify the request
* @return the requested data, possibly <code>null</code>
* @throws IOException
* thrown if an error occurs while requesting the data from the plugin
*/
public synchronized IOReadableWritable requestData(PluginID pluginID, IOReadableWritable data) throws IOException {

return getTaskManagerPluginComponent().requestData(pluginID, data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;

import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;

Expand All @@ -34,7 +35,7 @@ private static final class JobManagerStub implements PluginCommunication {

private final PluginID pluginID;

public JobManagerStub(final PluginCommunicationProtocol jobManager, final PluginID pluginID) {
private JobManagerStub(final PluginCommunicationProtocol jobManager, final PluginID pluginID) {
this.jobManager = jobManager;
this.pluginID = pluginID;
}
Expand Down Expand Up @@ -63,6 +64,36 @@ public IOReadableWritable requestData(final IOReadableWritable data) throws IOEx
}
}

private final static class TaskManagerStub implements PluginCommunication {

private final AbstractInstance instance;

private final PluginID pluginID;

private TaskManagerStub(final AbstractInstance instance, final PluginID pluginID) {
this.instance = instance;
this.pluginID = pluginID;
}

/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {

this.instance.sendData(this.pluginID, data);
}

/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {

return this.instance.requestData(this.pluginID, data);
}
}

/**
* {@inheritDoc}
*/
Expand All @@ -72,4 +103,13 @@ public PluginCommunication getJobManagerComponent(final PluginID pluginID) {
return new JobManagerStub(this.jobManager, pluginID);
}

/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getTaskManagerComponent(final PluginID pluginID, final AbstractInstance instance) {

return new TaskManagerStub(instance, pluginID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

package eu.stratosphere.nephele.plugins;

import eu.stratosphere.nephele.instance.AbstractInstance;

public interface PluginLookupService {

PluginCommunication getJobManagerComponent(PluginID pluginID);

PluginCommunication getTaskManagerComponent(PluginID pluginID, AbstractInstance instance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;

import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.taskmanager.TaskManager;

Expand All @@ -34,7 +35,7 @@ private static final class JobManagerStub implements PluginCommunication {

private final PluginID pluginID;

public JobManagerStub(final TaskManager taskManager, final PluginID pluginID) {
private JobManagerStub(final TaskManager taskManager, final PluginID pluginID) {
this.taskManager = taskManager;
this.pluginID = pluginID;
}
Expand Down Expand Up @@ -66,4 +67,13 @@ public PluginCommunication getJobManagerComponent(final PluginID pluginID) {

return new JobManagerStub(this.taskManager, pluginID);
}

/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getTaskManagerComponent(final PluginID pluginID, final AbstractInstance instance) {

throw new UnsupportedOperationException("getTaskManagerComponenet must not be called on this lookup service");
}
}

0 comments on commit ba452fb

Please sign in to comment.