Skip to content

Commit

Permalink
Added sample plugin implementation for SCORE
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Oct 9, 2011
1 parent 35eba7b commit bb6aad0
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@
<module>nephele</module>
<module>pact</module>
<module>build-tools</module>
<module>score</module>
<module>stratosphere-dist</module>
</modules>
</project>
68 changes: 68 additions & 0 deletions score/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http:https://maven.apache.org/POM/4.0.0" xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>stratosphere</artifactId>
<groupId>eu.stratosphere</groupId>
<version>0.2</version>
</parent>

<groupId>eu.stratosphere</groupId>
<artifactId>score</artifactId>
<version>0.2</version>
<name>score</name>

<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>nephele-server</artifactId>
<version>${version}</version>
</dependency>

</dependencies>

<reporting>
<plugins>
</plugins>
</reporting>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<systemPropertyVariables>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<log.level>WARN</log.level>
</systemPropertyVariables>
<excludes>
<exclude>**/*TestBase*.class</exclude>
</excludes>
<forkMode>once</forkMode>
</configuration>
</plugin>

<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.7</version>
<configuration>
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
<forkMode>always</forkMode>
<threadCount>1</threadCount>
<perCoreThreadCount>false</perCoreThreadCount>
</configuration>
</plugin>

</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.score;

import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.ResourceUtilizationSnapshot;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;

public class ScoreExecutionListener implements ExecutionListener {

private final ExecutionVertex executionVertex;

ScoreExecutionListener(final ExecutionVertex executionVertex) {
this.executionVertex = executionVertex;
}

/**
* {@inheritDoc}
*/
@Override
public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
final ExecutionState newExecutionState, final String optionalMessage) {

System.out.println("SCORE received execution state update for vertex " + this.executionVertex + ": "
+ newExecutionState);
}

/**
* {@inheritDoc}
*/
@Override
public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
// TODO Auto-generated method stub

}

/**
* {@inheritDoc}
*/
@Override
public void initialExecutionResourcesExhausted(final JobID jobID, final ExecutionVertexID vertexID,
final ResourceUtilizationSnapshot resourceUtilizationSnapshot) {
// TODO Auto-generated method stub

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.score;

import java.util.Iterator;

import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.plugins.JobManagerPlugin;

public final class ScoreJobManagerPlugin implements JobManagerPlugin {

/**
* {@inheritDoc}
*/
@Override
public JobGraph rewriteJobGraph(final JobGraph jobGraph) {

// Nothing to do here

return jobGraph;
}

/**
* {@inheritDoc}
*/
@Override
public ExecutionGraph rewriteExecutionGraph(final ExecutionGraph executionGraph) {

synchronized (executionGraph) {

// Register for events
final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(executionGraph, true);

while (it.hasNext()) {
final ExecutionVertex vertex = it.next();
vertex.registerExecutionListener(new ScoreExecutionListener(vertex));
}
}

return executionGraph;
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
// TODO Auto-generated method stub

}
}
65 changes: 65 additions & 0 deletions score/src/main/java/eu/stratosphere/score/ScorePluginLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.score;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.plugins.AbstractPluginLoader;
import eu.stratosphere.nephele.plugins.JobManagerPlugin;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;

/**
* A plugin loader for the SCORE (Stratosphere Continuous Re-optimization) module.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public final class ScorePluginLoader extends AbstractPluginLoader {

private ScoreJobManagerPlugin jobManagerPlugin = null;

private ScoreTaskManagerPlugin taskManagerPlugin = null;

public ScorePluginLoader(final Configuration pluginConfiguration) {
super(pluginConfiguration);
}

/**
* {@inheritDoc}
*/
@Override
public synchronized JobManagerPlugin getJobManagerPlugin() {

if (this.jobManagerPlugin == null) {
this.jobManagerPlugin = new ScoreJobManagerPlugin();
}

return this.jobManagerPlugin;
}

/**
* {@inheritDoc}
*/
@Override
public synchronized TaskManagerPlugin getTaskManagerPlugin() {

if (this.taskManagerPlugin == null) {
this.taskManagerPlugin = new ScoreTaskManagerPlugin(getPluginConfiguration());
}

return this.taskManagerPlugin;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.score;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;

public final class ScoreTaskManagerPlugin implements TaskManagerPlugin {

ScoreTaskManagerPlugin(final Configuration pluginConfiguration) {
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
// TODO Auto-generated method stub
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
#######################################################################################################################
##
## Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
##
## Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http:https://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
## an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
## specific language governing permissions and limitations under the License.
##
#######################################################################################################################
-->
<plugins>
<plugin>
<name>SCORE</name>
<class>eu.stratosphere.score.ScorePluginLoader</class>
<configuration>
<property>
<key>testkey</key>
<value>testvalue</value>
</property>
</configuration>
</plugin>
</plugins>

0 comments on commit bb6aad0

Please sign in to comment.