Skip to content

Commit

Permalink
Finished implementation of plugin communication from job manager to t…
Browse files Browse the repository at this point in the history
…ask maangers
  • Loading branch information
Daniel Warneke committed Nov 18, 2011
1 parent ba452fb commit a169e82
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,6 @@ public void deploy(final JobID jobID, final AbstractInstance instance,
return;
}

// Method executionGraph field of vertex is immutable, so no need to synchronized access
final ExecutionGraph eg = verticesToBeDeployed.get(0).getExecutionGraph();

for (final ExecutionVertex vertex : verticesToBeDeployed) {

// Check vertex state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private JobManagerStub(final TaskManager taskManager, final PluginID pluginID) {
@Override
public void sendData(final IOReadableWritable data) throws IOException {

this.taskManager.sendData(this.pluginID, data);
this.taskManager.sendDataToJobManager(this.pluginID, data);
}

/**
Expand All @@ -55,7 +55,7 @@ public void sendData(final IOReadableWritable data) throws IOException {
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {

return this.taskManager.requestData(this.pluginID, data);
return this.taskManager.requestDataFromJobManager(this.pluginID, data);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;

public interface TaskManagerPlugin {
public interface TaskManagerPlugin extends PluginCommunication {

/**
* Registers a new incoming task with this task manager plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
*
* @author warneke
*/
public class TaskManager implements TaskOperationProtocol {
public class TaskManager implements TaskOperationProtocol, PluginCommunicationProtocol {

private static final Log LOG = LogFactory.getLog(TaskManager.class);

Expand Down Expand Up @@ -937,7 +937,7 @@ public void run() {
* @throws IOException
* thrown if an I/O error occurs during the RPC call
*/
public void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
public void sendDataToJobManager(final PluginID pluginID, final IOReadableWritable data) throws IOException {

synchronized (this.pluginCommunicationService) {
this.pluginCommunicationService.sendData(pluginID, data);
Expand All @@ -955,10 +955,42 @@ public void sendData(final PluginID pluginID, final IOReadableWritable data) thr
* @throws IOException
* thrown if an I/O error occurs during the RPC call
*/
public IOReadableWritable requestData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
public IOReadableWritable requestDataFromJobManager(final PluginID pluginID, final IOReadableWritable data)
throws IOException {

synchronized (this.pluginCommunicationService) {
return this.pluginCommunicationService.requestData(pluginID, data);
}
}

/**
* {@inheritDoc}
*/
@Override
public void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {

final TaskManagerPlugin tmp = this.taskManagerPlugins.get(pluginID);
if (tmp == null) {
LOG.error("Cannot find task manager plugin for plugin ID " + pluginID);
return;
}

tmp.sendData(data);

}

/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final PluginID pluginID, final IOReadableWritable data) throws IOException {

final TaskManagerPlugin tmp = this.taskManagerPlugins.get(pluginID);
if (tmp == null) {
LOG.error("Cannot find task manager plugin for plugin ID " + pluginID);
return null;
}

return tmp.requestData(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

package eu.stratosphere.nephele.streaming;

import java.io.IOException;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.jobgraph.JobID;
Expand Down Expand Up @@ -130,4 +133,24 @@ public void unregisterTask(final ExecutionVertexID id, final Environment environ
// Nothing to do here
}

/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {

// TODO Implement me
}

/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {

// TODO Implement me

return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@

package eu.stratosphere.score;

import java.io.IOException;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;

public final class ScoreTaskManagerPlugin implements TaskManagerPlugin {
Expand Down Expand Up @@ -51,4 +54,22 @@ public void unregisterTask(final ExecutionVertexID id, final Environment environ
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {
// TODO Auto-generated method stub
return null;
}
}

0 comments on commit a169e82

Please sign in to comment.