Skip to content

Commit

Permalink
Fixed shutdown logic in task unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed May 21, 2013
1 parent 07aba3c commit fff0171
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import eu.stratosphere.pact.runtime.task.PactTaskContext;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;

public class DriverTestBase<S extends Stub> implements PactTaskContext<S, PactRecord>
{
public class DriverTestBase<S extends Stub> implements PactTaskContext<S, PactRecord> {
protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;

protected static final int PAGE_SIZE = 32 * 1024;
Expand Down Expand Up @@ -131,9 +131,8 @@ public void setNumFileHandlesForSort(int numFileHandles) {
this.numFileHandles = numFileHandles;
}

public void testDriver(PactDriver<S, PactRecord> driver, Class<? extends S> stubClass)
throws Exception
{
public void testDriver(PactDriver<S, PactRecord> driver, Class<? extends S> stubClass) throws Exception {

this.driver = driver;
driver.setup(this);

Expand Down Expand Up @@ -192,49 +191,33 @@ public void testDriver(PactDriver<S, PactRecord> driver, Class<? extends S> stub
}
}

public void cancel() throws Exception
{
public void cancel() throws Exception {
this.running = false;
this.driver.cancel();
}

// --------------------------------------------------------------------------------------------

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getTaskConfig()
*/
@Override
public TaskConfig getTaskConfig() {
return this.taskConfig;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getUserCodeClassLoader()
*/
@Override
public ClassLoader getUserCodeClassLoader() {
return getClass().getClassLoader();
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getIOManager()
*/
@Override
public IOManager getIOManager() {
return this.ioManager;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getMemoryManager()
*/
@Override
public MemoryManager getMemoryManager() {
return this.memManager;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getInput(int)
*/
@Override
public <X> MutableObjectIterator<X> getInput(int index) {
MutableObjectIterator<PactRecord> in = this.inputs.get(index);
Expand All @@ -253,53 +236,35 @@ public <X> MutableObjectIterator<X> getInput(int index) {
return input;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getInputSerializer(int)
*/
@Override
public <X> TypeSerializer<X> getInputSerializer(int index) {
@SuppressWarnings("unchecked")
TypeSerializer<X> serializer = (TypeSerializer<X>) PactRecordSerializer.get();
return serializer;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getInputComparator(int)
*/
@Override
public <X> TypeComparator<X> getInputComparator(int index) {
@SuppressWarnings("unchecked")
TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
return comparator;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getStub()
*/
@Override
public S getStub() {
return this.stub;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getOutputCollector()
*/
@Override
public Collector<PactRecord> getOutputCollector() {
return this.output;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#getOwningNepheleTask()
*/
@Override
public AbstractInvokable getOwningNepheleTask() {
return this.owner;
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.task.PactTaskContext#formatLogString(java.lang.String)
*/
@Override
public String formatLogString(String message) {
return "Driver Tester: " + message;
Expand All @@ -308,26 +273,20 @@ public String formatLogString(String message) {
// --------------------------------------------------------------------------------------------

@After
public void shutdownSorters() throws Exception
{
public void shutdownAll() throws Exception {
// 1st, shutdown sorters
for (UnilateralSortMerger<?> sorter : this.sorters) {
if (sorter != null)
sorter.close();
}
this.sorters.clear();
}

@After
public void shutdownIOManager() throws Exception
{

// 2nd, shutdown I/O
this.ioManager.shutdown();
Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
}

@After
public void shutdownMemoryManager() throws Exception
{
final MemoryManager memMan = getMemoryManager();
// last, verify all memory is returned and shutdown mem manager
MemoryManager memMan = getMemoryManager();
if (memMan != null) {
Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
memMan.shutdown();
Expand All @@ -336,33 +295,26 @@ public void shutdownMemoryManager() throws Exception

// --------------------------------------------------------------------------------------------

private static final class ListOutputCollector implements Collector<PactRecord>
{
private final List<PactRecord> output;
private static final class ListOutputCollector implements Collector<PactRecord> {

private final List<PactRecord> output;

public ListOutputCollector(List<PactRecord> outputList) {
this.output = outputList;
}


/* (non-Javadoc)
* @see eu.stratosphere.pact.common.stubs.Collector#collect(java.lang.Object)
*/
@Override
public void collect(PactRecord record) {
this.output.add(record.createCopy());
}

/* (non-Javadoc)
* @see eu.stratosphere.pact.common.stubs.Collector#close()
*/
@Override
public void close() {}
}

public static final class CountingOutputCollector implements Collector<PactRecord>
{
public static final class CountingOutputCollector implements Collector<PactRecord> {
private int num;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -48,12 +49,14 @@
import eu.stratosphere.pact.test.util.minicluster.ClusterProvider;
import eu.stratosphere.pact.test.util.minicluster.ClusterProviderPool;

/**
* @author Erik Nijkamp
* @author Fabian Hueske
*/

public abstract class TestBase {

/* Make sure there is a valid log4j appender */
static {
BasicConfigurator.configure();
}

private static final int MINIMUM_HEAP_SIZE_MB = 192;

private static final Log LOG = LogFactory.getLog(TestBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -47,6 +48,11 @@

public abstract class TestBase2 {

/* Make sure there is a valid log4j appender */
static {
BasicConfigurator.configure();
}

private static final int MINIMUM_HEAP_SIZE_MB = 192;

protected final Configuration config;
Expand Down

0 comments on commit fff0171

Please sign in to comment.