Skip to content

Commit

Permalink
Extended plugin interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Nov 17, 2011
1 parent 8d485ba commit 9defcef
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public byte[] getTag() {
return this.tag;
}

/**
* {@inheritDoc}
*/
@Override
public void write(final DataOutput out) throws IOException {

Expand All @@ -45,6 +48,9 @@ public void write(final DataOutput out) throws IOException {

}

/**
* {@inheritDoc}
*/
@Override
public void read(final DataInput in) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,34 @@

package eu.stratosphere.nephele.plugins;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;

public interface TaskManagerPlugin {

/**
* Registers a new incoming task with this task manager plugin.
*
* @param id
* the ID of the vertex representing the task
* @param jobConfiguration
* the job configuration
* @param environment
* the environment of the task
*/
void registerTask(ExecutionVertexID id, Configuration jobConfiguration, Environment environment);

/**
* Unregisters a finished, canceled, or failed task from this task manager plugin.
*
* @param id
* the ID of the vertex representing the task
* @param environment
* the environment of the task
*/
void unregisterTask(ExecutionVertexID id, Environment environment);

/**
* Called by the task manager to indicate that Nephele is about to shut down.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class TaskManager implements TaskOperationProtocol {
private final HardwareDescription hardwareDescription;

private final List<TaskManagerPlugin> taskManagerPlugins;

/**
* Stores whether the task manager has already been shut down.
*/
Expand Down Expand Up @@ -310,7 +310,7 @@ public TaskManager(String configDir) throws Exception {

// Load the plugins
this.taskManagerPlugins = PluginManager.getTaskManagerPlugins(configDir);

// Add shutdown hook for clean up tasks
Runtime.getRuntime().addShutdownHook(new TaskManagerCleanUp(this));
}
Expand Down Expand Up @@ -572,6 +572,13 @@ private TaskSubmissionResult registerTask(final ExecutionVertexID id, final Conf
}
}

// Allow plugins to register their listeners for this task
if (!this.taskManagerPlugins.isEmpty()) {
for (final TaskManagerPlugin plugin : this.taskManagerPlugins) {
plugin.registerTask(id, jobConfiguration, ee);
}
}

// The environment itself will put the task into the running task map

return null;
Expand Down Expand Up @@ -641,6 +648,13 @@ private void unregisterTask(final ExecutionVertexID id, final Task task) {
this.memoryManager.releaseAll(task.getEnvironment().getInvokable());
}

// Allow plugins to unregister their listeners for this task
if (!this.taskManagerPlugins.isEmpty()) {
for (final TaskManagerPlugin plugin : this.taskManagerPlugins) {
plugin.unregisterTask(id, task.getEnvironment());
}
}

// Check if there are still vertices running that belong to the same job
int numberOfVerticesBelongingToThisJob = 0;
synchronized (this.runningTasks) {
Expand Down Expand Up @@ -794,10 +808,10 @@ public synchronized void shutdown() {
}

// Shut down the plugins
for(final TaskManagerPlugin plugin: this.taskManagerPlugins) {
for (final TaskManagerPlugin plugin : this.taskManagerPlugins) {
plugin.shutdown();
}

this.isShutDown = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

package eu.stratosphere.nephele.streaming;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.plugins.JobManagerPlugin;

public class StreamingJobManagerPlugin implements JobManagerPlugin {

StreamingJobManagerPlugin(final Configuration pluginConfiguration) {
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public StreamingPluginLoader(final Configuration pluginConfiguration) {
public synchronized JobManagerPlugin getJobManagerPlugin() {

if (this.jobManagerPlugin == null) {
this.jobManagerPlugin = new StreamingJobManagerPlugin();
this.jobManagerPlugin = new StreamingJobManagerPlugin(getPluginConfiguration());
}

return this.jobManagerPlugin;
Expand All @@ -50,7 +50,7 @@ public synchronized JobManagerPlugin getJobManagerPlugin() {
public synchronized TaskManagerPlugin getTaskManagerPlugin() {

if (this.taskManagerPlugin == null) {
this.taskManagerPlugin = new StreamingTaskManagerPlugin();
this.taskManagerPlugin = new StreamingTaskManagerPlugin(getPluginConfiguration());
}

return this.taskManagerPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@

package eu.stratosphere.nephele.streaming;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;

public class StreamingTaskManagerPlugin implements TaskManagerPlugin {

StreamingTaskManagerPlugin(final Configuration pluginConfiguration) {
System.out.println("Task Manager plugin loaded");
}

/**
* {@inheritDoc}
*/
Expand All @@ -28,4 +35,23 @@ public void shutdown() {

}

/**
* {@inheritDoc}
*/
@Override
public void registerTask(final ExecutionVertexID id, final Configuration jobConfiguration,
final Environment environment) {
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public void unregisterTask(final ExecutionVertexID id, final Environment environment) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package eu.stratosphere.score;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;

public final class ScoreTaskManagerPlugin implements TaskManagerPlugin {
Expand All @@ -30,4 +32,23 @@ public final class ScoreTaskManagerPlugin implements TaskManagerPlugin {
public void shutdown() {
// TODO Auto-generated method stub
}

/**
* {@inheritDoc}
*/
@Override
public void registerTask(final ExecutionVertexID id, final Configuration jobConfiguration,
final Environment environment) {
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public void unregisterTask(final ExecutionVertexID id, final Environment environment) {
// TODO Auto-generated method stub

}
}

0 comments on commit 9defcef

Please sign in to comment.