From 90846d7fef5df651bb5fdba27c8bce29fcae4c4e Mon Sep 17 00:00:00 2001 From: StephanEwen Date: Wed, 12 Feb 2014 02:52:51 +0100 Subject: [PATCH] Merge broadcast variable runtime. Extend runtime for iterative algorithms. Add iterative kmeans test for runtime code. --- .../common/functions/AbstractFunction.java | 2 + .../java/record/kmeans/KMeansIterative.java | 15 +- .../java/record/kmeans/KMeansSingleStep.java | 56 +++- .../task/AbstractIterativePactTask.java | 88 ++--- .../iterative/task/IterationHeadPactTask.java | 7 +- .../AbstractCachedBuildSideMatchDriver.java | 19 +- .../task/BuildFirstCachedMatchDriver.java | 10 +- .../task/BuildSecondCachedMatchDriver.java | 10 +- .../pact/runtime/task/RegularPactTask.java | 166 +++++---- .../task/chaining/ChainedCombineDriver.java | 2 +- .../runtime/task/chaining/ChainedDriver.java | 18 +- .../task/chaining/ChainedMapDriver.java | 2 +- .../SynchronousChainedCombineDriver.java | 2 +- .../pact/runtime/task/util/TaskConfig.java | 160 +++++---- .../pact/runtime/udf/RuntimeUDFContext.java | 4 - .../BroadcastVarsNepheleITCase.java | 7 +- .../KMeansIterativeNepheleITCase.java | 314 ++++++++++++++++++ .../KMeansStepITCase.java | 122 +------ .../IterativeKMeansITCase.java | 18 +- .../test/iterative/IterativeKMeansITCase.java | 50 +-- .../test/iterative/nephele/JobGraphUtils.java | 13 +- .../CustomCompensatableDanglingPageRank.java | 4 +- ...ensatableDanglingPageRankWithCombiner.java | 4 +- .../CompensatableDanglingPageRank.java | 4 +- .../ImprovedAdjacencyListInputFormat.java | 4 +- .../ImprovedDanglingPageRankInputFormat.java | 4 +- .../PackagedProgramEndToEndTest.java | 12 +- .../test/testdata/KMeansData.java | 82 +++++ 28 files changed, 772 insertions(+), 427 deletions(-) create mode 100644 stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java create mode 100644 stratosphere-tests/src/test/java/eu/stratosphere/test/testdata/KMeansData.java diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/AbstractFunction.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/AbstractFunction.java index 12c89daaf44b4..baf46f6efa670 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/AbstractFunction.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/AbstractFunction.java @@ -22,6 +22,8 @@ */ public abstract class AbstractFunction implements Function, Serializable { + private static final long serialVersionUID = 1L; + // -------------------------------------------------------------------------------------------- // Runtime context access // -------------------------------------------------------------------------------------------- diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansIterative.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansIterative.java index 5998af1e19c98..f96ef7e5f5df8 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansIterative.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansIterative.java @@ -20,17 +20,20 @@ import eu.stratosphere.api.common.operators.BulkIteration; import eu.stratosphere.api.common.operators.FileDataSink; import eu.stratosphere.api.common.operators.FileDataSource; +import eu.stratosphere.api.java.record.io.CsvInputFormat; import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.api.java.record.operators.ReduceOperator; +import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.PointBuilder; import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.RecomputeClusterCenter; import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.SelectNearestCenter; -import eu.stratosphere.example.java.record.kmeans.udfs.PointInFormat; import eu.stratosphere.example.java.record.kmeans.udfs.PointOutFormat; +import eu.stratosphere.types.DoubleValue; import eu.stratosphere.types.IntValue; public class KMeansIterative implements Program, ProgramDescription { + private static final long serialVersionUID = 1L; @Override public Plan getPlan(String... args) { @@ -42,10 +45,16 @@ public Plan getPlan(String... args) { int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 2); // create DataSourceContract for data point input - FileDataSource dataPoints = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points"); + @SuppressWarnings("unchecked") + FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points"); // create DataSourceContract for cluster center input - FileDataSource clusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers"); + @SuppressWarnings("unchecked") + FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers"); + + MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build(); + + MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build(); BulkIteration iter = new BulkIteration("k-means loop"); iter.setInput(clusterPoints); diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansSingleStep.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansSingleStep.java index c928e29be4337..72ddc1ef9dba4 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansSingleStep.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/record/kmeans/KMeansSingleStep.java @@ -30,11 +30,12 @@ import eu.stratosphere.api.common.operators.FileDataSource; import eu.stratosphere.api.java.record.functions.MapFunction; import eu.stratosphere.api.java.record.functions.ReduceFunction; +import eu.stratosphere.api.java.record.io.CsvInputFormat; +import eu.stratosphere.api.java.record.io.DelimitedOutputFormat; import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.api.java.record.operators.ReduceOperator; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.example.java.record.kmeans.udfs.PointInFormat; -import eu.stratosphere.example.java.record.kmeans.udfs.PointOutFormat; +import eu.stratosphere.types.DoubleValue; import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.Value; @@ -43,6 +44,7 @@ public class KMeansSingleStep implements Program, ProgramDescription { + private static final long serialVersionUID = 1L; @Override public Plan getPlan(String... args) { @@ -53,10 +55,16 @@ public Plan getPlan(String... args) { String output = (args.length > 3 ? args[3] : ""); // create DataSourceContract for data point input - FileDataSource dataPoints = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points"); + @SuppressWarnings("unchecked") + FileDataSource pointsSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), dataPointInput, "Data Points"); // create DataSourceContract for cluster center input - FileDataSource clusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers"); + @SuppressWarnings("unchecked") + FileDataSource clustersSource = new FileDataSource(new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class), clusterInput, "Centers"); + + MapOperator dataPoints = MapOperator.builder(new PointBuilder()).name("Build data points").input(pointsSource).build(); + + MapOperator clusterPoints = MapOperator.builder(new PointBuilder()).name("Build cluster points").input(clustersSource).build(); // create CrossOperator for distance computation MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter()) @@ -175,6 +183,7 @@ public static final class SelectNearestCenter extends MapFunction implements Ser public void open(Configuration parameters) throws Exception { Collection clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers"); + centers.clear(); for (Record r : clusterCenters) { centers.add(new PointWithId(r.getField(0, IntValue.class).getValue(), r.getField(1, Point.class))); } @@ -190,7 +199,7 @@ public void open(Configuration parameters) throws Exception { */ @Override public void map(Record dataPointRecord, Collector out) { - Point p = dataPointRecord.getField(0, Point.class); + Point p = dataPointRecord.getField(1, Point.class); double nearestDistance = Double.MAX_VALUE; int centerId = -1; @@ -202,6 +211,7 @@ public void map(Record dataPointRecord, Collector out) { // update nearest cluster if necessary if (distance < nearestDistance) { + nearestDistance = distance; centerId = center.id; } } @@ -257,4 +267,40 @@ private final Record sumPointsAndCount(Iterator dataPoints) { return next; } } + + public static final class PointBuilder extends MapFunction { + + private static final long serialVersionUID = 1L; + + @Override + public void map(Record record, Collector out) throws Exception { + double x = record.getField(1, DoubleValue.class).getValue(); + double y = record.getField(2, DoubleValue.class).getValue(); + double z = record.getField(3, DoubleValue.class).getValue(); + + record.setField(1, new Point(x, y, z)); + out.collect(record); + } + } + + public static final class PointOutFormat extends DelimitedOutputFormat { + + private static final long serialVersionUID = 1L; + + private static final String format = "%d|%.1f|%.1f|%.1f|"; + + @Override + public int serializeRecord(Record rec, byte[] target) throws Exception { + int id = rec.getField(0, IntValue.class).getValue(); + Point p = rec.getField(1, Point.class); + + byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes(); + if (bytes.length > target.length) { + return -bytes.length; + } else { + System.arraycopy(bytes, 0, target, 0, bytes.length); + return bytes.length; + } + } + } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java index 1e74a3eaa3660..11b391390ef8f 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java @@ -43,9 +43,6 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * The base class for all tasks able to participate in an iteration. @@ -54,30 +51,28 @@ public abstract class AbstractIterativePactTask extends implements Terminable { private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class); - - - private final AtomicBoolean terminationRequested = new AtomicBoolean(false); - - private RuntimeAggregatorRegistry iterationAggregators; - private List iterativeInputs = new ArrayList(); - - private String brokerKey; + protected LongSumAggregator worksetAggregator; - private int superstepNum = 1; + protected BlockingBackChannel worksetBackChannel; protected boolean isWorksetIteration; protected boolean isWorksetUpdate; protected boolean isSolutionSetUpdate; + - protected LongSumAggregator worksetAggregator; + private RuntimeAggregatorRegistry iterationAggregators; - protected BlockingBackChannel worksetBackChannel; + private String brokerKey; + + private int superstepNum = 1; + + private volatile boolean terminationRequested; // -------------------------------------------------------------------------------------------- - // Wrapping methods to supplement behavior of the regular Pact Task + // Main life cycle methods that implement the iterative behavior // -------------------------------------------------------------------------------------------- @Override @@ -93,8 +88,6 @@ protected void initialize() throws Exception { excludeFromReset(i); } } - // initialize the repeatable driver - resDriver.initialize(); } TaskConfig config = getLastTasksConfig(); @@ -118,9 +111,20 @@ protected void initialize() throws Exception { @Override public void run() throws Exception { - if (!inFirstIteration()) { + if (inFirstIteration()) { + if (this.driver instanceof ResettablePactDriver) { + // initialize the repeatable driver + ((ResettablePactDriver) this.driver).initialize(); + } + } else { reinstantiateDriver(); resetAllInputs(); + + // re-read the iterative broadcast variables + for (int i : this.iterativeBroadcastInputs) { + final String name = getTaskConfig().getBroadcastInputName(i); + readAndSetBroadcastInput(i, name, this.runtimeUdfContext); + } } // call the parent to execute the superstep @@ -138,35 +142,14 @@ protected void closeLocalStrategiesAndCaches() { try { resDriver.teardown(); } catch (Throwable t) { - log.error("Error shutting down a resettable driver.", t); + log.error("Error while shutting down an iterative operator.", t); } } } } @Override - protected MutableObjectIterator createInputIterator(int i, MutableReader inputReader, TypeSerializer serializer) { - - final MutableObjectIterator inIter = super.createInputIterator(i, inputReader, serializer); - final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i); - - if (numberOfEventsUntilInterrupt < 0) { - throw new IllegalArgumentException(); - } - else if (numberOfEventsUntilInterrupt > 0) { - inputReader.setIterative(numberOfEventsUntilInterrupt); - this.iterativeInputs.add(i); - - if (log.isDebugEnabled()) { - log.debug(formatLogString("Input [" + i + "] reads in supersteps with [" + - + numberOfEventsUntilInterrupt + "] event(s) till next superstep.")); - } - } - return inIter; - } - - @Override - public RuntimeUDFContext getRuntimeContext(String taskName) { + public RuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup()); } @@ -223,7 +206,7 @@ public RuntimeAggregatorRegistry getIterationAggregators() { protected void checkForTerminationAndResetEndOfSuperstepState() throws IOException { // sanity check that there is at least one iterative input reader - if (this.iterativeInputs.isEmpty()) + if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) throw new IllegalStateException(); // check whether this step ended due to end-of-superstep, or proper close @@ -262,6 +245,23 @@ protected void checkForTerminationAndResetEndOfSuperstepState() throws IOExcepti } } } + + for (int inputNum : this.iterativeBroadcastInputs) { + MutableReader reader = this.broadcastInputReaders[inputNum]; + + if (reader.isInputClosed()) { + anyClosed = true; + } + else { + // sanity check that the BC input is at the end of teh superstep + if (!reader.hasReachedEndOfSuperstep()) { + throw new IllegalStateException("An iterative broadcast input has not been fully consumed."); + } + + allClosed = false; + reader.startNextSuperstep(); + } + } // sanity check whether we saw the same state (end-of-superstep or termination) on all inputs if (allClosed != anyClosed) { @@ -275,12 +275,12 @@ protected void checkForTerminationAndResetEndOfSuperstepState() throws IOExcepti @Override public boolean terminationRequested() { - return this.terminationRequested.get(); + return this.terminationRequested; } @Override public void requestTermination() { - this.terminationRequested.set(true); + this.terminationRequested = true; } @Override diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java index b7e216fc3e6f6..1aafb03f0cef4 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java @@ -249,9 +249,10 @@ public void run() throws Exception { solutionSet = initHashTable(); // read the initial solution set - @SuppressWarnings("unchecked") - MutableObjectIterator solutionSetInput = (MutableObjectIterator) createInputIterator( - initialSolutionSetInput, inputReaders[initialSolutionSetInput], solutionTypeSerializer); +// @SuppressWarnings("unchecked") +// MutableObjectIterator solutionSetInput = (MutableObjectIterator) createInputIterator( +// initialSolutionSetInput, inputReaders[initialSolutionSetInput], solutionTypeSerializer); + MutableObjectIterator solutionSetInput = getInput(initialSolutionSetInput); readInitialSolutionSet(solutionSet, solutionSetInput); SolutionSetBroker.instance().handIn(brokerKey, solutionSet); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java index 8cceedf0352ea..cf9102ff7a619 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.java @@ -34,9 +34,14 @@ public abstract class AbstractCachedBuildSideMatchDriver extends M protected volatile MutableHashTable hashJoin; - protected abstract int getBuildSideIndex(); + private final int buildSideIndex; - protected abstract int getProbeSideIndex(); + private final int probeSideIndex; + + protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int probeSideIndex) { + this.buildSideIndex = buildSideIndex; + this.probeSideIndex = probeSideIndex; + } // -------------------------------------------------------------------------------------------- @@ -45,7 +50,7 @@ public boolean isInputResettable(int inputNum) { if (inputNum < 0 || inputNum > 1) { throw new IndexOutOfBoundsException(); } - return inputNum == getBuildSideIndex(); + return inputNum == buildSideIndex; } @Override @@ -66,12 +71,12 @@ public void initialize() throws Exception { List memSegments = this.taskContext.getMemoryManager().allocatePages( this.taskContext.getOwningNepheleTask(), numMemoryPages); - if (getBuildSideIndex() == 0 && getProbeSideIndex() == 1) { + if (buildSideIndex == 0 && probeSideIndex == 1) { MutableHashTable hashJoin = new MutableHashTable(serializer1, serializer2, comparator1, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memSegments, this.taskContext.getIOManager()); this.hashJoin = hashJoin; hashJoin.open(input1, EmptyMutableObjectIterator.get()); - } else if (getBuildSideIndex() == 1 && getProbeSideIndex() == 0) { + } else if (buildSideIndex == 1 && probeSideIndex == 0) { MutableHashTable hashJoin = new MutableHashTable(serializer2, serializer1, comparator2, comparator1, pairComparatorFactory.createComparator12(comparator1, comparator2), memSegments, this.taskContext.getIOManager()); this.hashJoin = hashJoin; @@ -92,7 +97,7 @@ public void run() throws Exception { final GenericJoiner matchStub = this.taskContext.getStub(); final Collector collector = this.taskContext.getOutputCollector(); - if (getBuildSideIndex() == 0) { + if (buildSideIndex == 0) { final TypeSerializer buildSideSerializer = taskContext. getInputSerializer(0); final TypeSerializer probeSideSerializer = taskContext. getInputSerializer(1); @@ -117,7 +122,7 @@ public void run() throws Exception { matchStub.join(buildSideRecordFirst, probeSideRecord, collector); } } - } else if (getBuildSideIndex() == 1) { + } else if (buildSideIndex == 1) { final TypeSerializer buildSideSerializer = taskContext. getInputSerializer(1); final TypeSerializer probeSideSerializer = taskContext. getInputSerializer(0); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildFirstCachedMatchDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildFirstCachedMatchDriver.java index 0b0462eec6f4c..b7f4809f91f26 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildFirstCachedMatchDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildFirstCachedMatchDriver.java @@ -15,13 +15,7 @@ public class BuildFirstCachedMatchDriver extends AbstractCachedBuildSideMatchDriver { - @Override - protected int getBuildSideIndex() { - return 0; - } - - @Override - protected int getProbeSideIndex() { - return 1; + public BuildFirstCachedMatchDriver() { + super(0, 1); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildSecondCachedMatchDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildSecondCachedMatchDriver.java index 82a8d8401a4bc..e4bfee9dc3e49 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildSecondCachedMatchDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/BuildSecondCachedMatchDriver.java @@ -15,13 +15,7 @@ public class BuildSecondCachedMatchDriver extends AbstractCachedBuildSideMatchDriver { - @Override - protected int getBuildSideIndex() { - return 1; - } - - @Override - protected int getProbeSideIndex() { - return 0; + public BuildSecondCachedMatchDriver() { + super(1, 0); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java index 5b9754f3aa67a..1d10b2a5cda48 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java @@ -15,7 +15,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; @@ -27,7 +26,6 @@ import eu.stratosphere.api.common.distributions.DataDistribution; import eu.stratosphere.api.common.functions.Function; import eu.stratosphere.api.common.functions.GenericReducer; -import eu.stratosphere.api.common.functions.RuntimeContext; import eu.stratosphere.api.common.typeutils.TypeComparator; import eu.stratosphere.api.common.typeutils.TypeComparatorFactory; import eu.stratosphere.api.common.typeutils.TypeSerializer; @@ -97,9 +95,14 @@ public class RegularPactTask extends AbstractTask implem protected volatile PactDriver driver; /** - * The instantiated user code of this task's main driver. + * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf. */ protected S stub; + + /** + * The udf's runtime context. + */ + protected RuntimeUDFContext runtimeUdfContext; /** * The collector that forwards the user code's results. May forward to a channel or to chained drivers within @@ -134,6 +137,10 @@ public class RegularPactTask extends AbstractTask implem */ protected MutableObjectIterator[] broadcastInputIterators; + protected int[] iterativeInputs; + + protected int[] iterativeBroadcastInputs; + /** * The local strategies that are applied on the inputs. */ @@ -151,11 +158,10 @@ public class RegularPactTask extends AbstractTask implem protected volatile SpillingResettableMutableObjectIterator[] resettableInputs; /** - * The inputs to the driver. Return the readers' data after the application of the local strategy + * The inputs to the operator. Return the readers' data after the application of the local strategy * and the temp-table barrier. */ protected MutableObjectIterator[] inputs; - /** * The serializers for the input data type. @@ -215,7 +221,7 @@ public class RegularPactTask extends AbstractTask implem protected volatile boolean running = true; // -------------------------------------------------------------------------------------------- - // Nephele Task Interface + // Task Interface // -------------------------------------------------------------------------------------------- @@ -294,8 +300,55 @@ public void invoke() throws Exception { try { int numInputs = driver.getNumberOfInputs(); int numBroadcastInputs = this.config.getNumBroadcastInputs(); + initInputsSerializersAndComparators(numInputs); initBroadcastInputsSerializers(numBroadcastInputs); + + // set the iterative status for inputs and broadcast inputs + { + List iterativeInputs = new ArrayList(); + + for (int i = 0; i < numInputs; i++) { + final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i); + + if (numberOfEventsUntilInterrupt < 0) { + throw new IllegalArgumentException(); + } + else if (numberOfEventsUntilInterrupt > 0) { + this.inputReaders[i].setIterative(numberOfEventsUntilInterrupt); + iterativeInputs.add(i); + + if (LOG.isDebugEnabled()) { + LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" + + + numberOfEventsUntilInterrupt + "] event(s) till next superstep.")); + } + } + } + this.iterativeInputs = asArray(iterativeInputs); + } + + { + List iterativeBcInputs = new ArrayList(); + + for (int i = 0; i < numBroadcastInputs; i++) { + final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i); + + if (numberOfEventsUntilInterrupt < 0) { + throw new IllegalArgumentException(); + } + else if (numberOfEventsUntilInterrupt > 0) { + this.broadcastInputReaders[i].setIterative(numberOfEventsUntilInterrupt); + iterativeBcInputs.add(i); + + if (LOG.isDebugEnabled()) { + LOG.debug(formatLogString("Broadcast input [" + i + "] reads in supersteps with [" + + + numberOfEventsUntilInterrupt + "] event(s) till next superstep.")); + } + } + } + this.iterativeBroadcastInputs = asArray(iterativeBcInputs); + } + initLocalStrategies(numInputs); } catch (Exception e) { throw new RuntimeException("Initializing the input processing failed" + @@ -310,6 +363,12 @@ public void invoke() throws Exception { // pre main-function initialization initialize(); + + // read the broadcast variables + for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) { + final String name = this.config.getBroadcastInputName(i); + readAndSetBroadcastInput(i, name, this.runtimeUdfContext); + } // the work goes here run(); @@ -359,6 +418,7 @@ public void setUserCodeClassLoader(ClassLoader cl) { // -------------------------------------------------------------------------------------------- protected void initialize() throws Exception { + // create the operator try { this.driver.setup(this); } @@ -367,6 +427,9 @@ protected void initialize() throws Exception { "' , caused an error: " + t.getMessage(), t); } + this.runtimeUdfContext = createRuntimeContext(getEnvironment().getTaskName()); + + // instantiate the UDF try { final Class userCodeFunctionType = this.driver.getStubType(); // if the class is null, the driver has no user code @@ -374,11 +437,30 @@ protected void initialize() throws Exception { this.stub = initStub(userCodeFunctionType); } } catch (Exception e) { - throw new RuntimeException("Initializing the user code and the configuration failed" + + throw new RuntimeException("Initializing the UDF" + e.getMessage() == null ? "." : ": " + e.getMessage(), e); } } + protected void readAndSetBroadcastInput(int inputNum, String bcVarName, RuntimeUDFContext context) throws IOException { + // drain the broadcast inputs + + @SuppressWarnings("unchecked") + final MutableObjectIterator reader = (MutableObjectIterator) this.broadcastInputIterators[inputNum]; + + @SuppressWarnings("unchecked") + final TypeSerializer serializer = (TypeSerializer) this.broadcastInputSerializers[inputNum]; + + ArrayList collection = new ArrayList(); + + X record = serializer.createInstance(); + while (this.running && reader.next(record)) { + collection.add(record); + record = serializer.createInstance(); + } + context.setBroadcastVariable(bcVarName, collection); + } + protected void run() throws Exception { // ---------------------------- Now, the actual processing starts ------------------------ // check for asynchronous canceling @@ -405,23 +487,6 @@ protected void run() throws Exception { return; } - // drain the broadcast inputs - for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) { - final String name = this.config.getBroadcastInputName(i); - @SuppressWarnings("unchecked") - final MutableObjectIterator reader = (MutableObjectIterator) this.broadcastInputIterators[i]; - @SuppressWarnings("unchecked") - final TypeSerializer serializer = (TypeSerializer) this.broadcastInputSerializers[i]; - - Collection collection = new ArrayList(); - Object record = serializer.createInstance(); - while (this.running && reader.next(record)) { - collection.add(record); - record = serializer.createInstance(); - } - this.stub.getRuntimeContext().setBroadcastVariable(name, collection); - } - // start all chained tasks RegularPactTask.openChainedTasks(this.chainedTasks, this); @@ -612,7 +677,7 @@ protected S initStub(Class stubSuperClass) throws Exception { throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + stubSuperClass.getName() + "' as is required."); } - stub.setRuntimeContext(getRuntimeContext(getEnvironment().getTaskName())); + stub.setRuntimeContext(this.runtimeUdfContext); return stub; } catch (ClassCastException ccex) { @@ -709,7 +774,7 @@ protected void initInputsSerializersAndComparators(int numInputs) throws Excepti this.inputComparators[i] = comparatorFactory.createComparator(); } - this.inputIterators[i] = createInputIterator(i, this.inputReaders[i], this.inputSerializers[i]); + this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]); } } @@ -725,7 +790,7 @@ protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exc final TypeSerializerFactory serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader); this.broadcastInputSerializers[i] = serializerFactory.getSerializer(); - this.broadcastInputIterators[i] = createInputIterator(i, this.broadcastInputReaders[i], this.broadcastInputSerializers[i]); + this.broadcastInputIterators[i] = createInputIterator(this.broadcastInputReaders[i], this.broadcastInputSerializers[i]); } } @@ -937,9 +1002,7 @@ private TypeComparator getLocalStrategyComparator(int inputNum) throws Ex return compFact.createComparator(); } - protected MutableObjectIterator createInputIterator(int inputIndex, - MutableReader inputReader, TypeSerializer serializer) - { + protected MutableObjectIterator createInputIterator(MutableReader inputReader, TypeSerializer serializer) { if (serializer.getClass() == RecordSerializer.class) { // record specific deserialization @SuppressWarnings("unchecked") @@ -969,7 +1032,7 @@ protected void initOutputs() throws Exception { this.output = initOutputs(this, this.userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs); } - public RuntimeUDFContext getRuntimeContext(String taskName) { + public RuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup()); } @@ -1075,7 +1138,6 @@ public TypeSerializer getInputSerializer(int index) { } - @Override public TypeComparator getInputComparator(int index) { if (this.inputComparators == null) { @@ -1089,16 +1151,6 @@ else if (index < 0 || index >= this.driver.getNumberOfInputs()) { final TypeComparator comparator = (TypeComparator) this.inputComparators[index]; return comparator; } - - /** - * Gets the serializer for the output data type of the main (i.e. the non-chained) driver. - * - * @return The serializer for the output data type of the main driver. - */ - public TypeSerializer getOutputTypeSerializer() { - TypeSerializerFactory factory = this.config.getOutputSerializer(this.userCodeClassLoader); - return factory.getSerializer(); - } // ============================================================================================ // Static Utilities @@ -1121,18 +1173,9 @@ public TypeSerializer getOutputTypeSerializer() { * * @return The string for logging. */ - public static String constructLogString(String message, String taskName, AbstractInvokable parent) - { - final StringBuilder bld = new StringBuilder(128); - bld.append(message); - bld.append(':').append(' '); - bld.append(taskName); - bld.append(' ').append('('); - bld.append(parent.getEnvironment().getIndexInSubtaskGroup() + 1); - bld.append('/'); - bld.append(parent.getEnvironment().getCurrentNumberOfSubtasks()); - bld.append(')'); - return bld.toString(); + public static String constructLogString(String message, String taskName, AbstractInvokable parent) { + return message + ": " + taskName + " (" + (parent.getEnvironment().getIndexInSubtaskGroup() + 1) + + '/' + parent.getEnvironment().getCurrentNumberOfSubtasks() + ')'; } /** @@ -1144,8 +1187,7 @@ public static String constructLogString(String message, String taskName, Abstrac * @param parent The parent task, whose information is included in the log message. * @throws Exception Always thrown. */ - public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception - { + public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception { String taskName; if (ex instanceof ExceptionInChainedStubException) { do { @@ -1457,7 +1499,17 @@ public static T instantiateUserCode(TaskConfig config, ClassLoader cl, Class return stub; } catch (ClassCastException ccex) { - throw new RuntimeException("The stub class is not a proper subclass of " + superClass.getName(), ccex); + throw new RuntimeException("The UDF class is not a proper subclass of " + superClass.getName(), ccex); + } + } + + private static int[] asArray(List list) { + int[] a = new int[list.size()]; + + int i = 0; + for (int val : list) { + a[i++] = val; } + return a; } } \ No newline at end of file diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver.java index f1813c669eb6c..fc9d50f4e6004 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver.java @@ -54,7 +54,7 @@ public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") final GenericReducer combiner = RegularPactTask.instantiateUserCode(config, userCodeClassLoader, GenericReducer.class); - combiner.setRuntimeContext(getRuntimeContext(parent, taskName)); + combiner.setRuntimeContext(getUdfRuntimeContext()); this.combiner = combiner; } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java index 5ffc787258b0b..ddcac258958bb 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedDriver.java @@ -35,6 +35,8 @@ public abstract class ChainedDriver implements Collector { protected Collector outputCollector; protected ClassLoader userCodeClassLoader; + + private RuntimeUDFContext udfContext; public void setup(TaskConfig config, String taskName, Collector outputCollector, @@ -44,6 +46,13 @@ public void setup(TaskConfig config, String taskName, Collector outputCollec this.taskName = taskName; this.outputCollector = outputCollector; this.userCodeClassLoader = userCodeClassLoader; + + if (parent instanceof RegularPactTask) { + this.udfContext = ((RegularPactTask) parent).createRuntimeContext(taskName); + } else { + Environment env = parent.getEnvironment(); + this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup()); + } setup(parent); } @@ -64,13 +73,8 @@ public void setup(TaskConfig config, String taskName, Collector outputCollec public abstract void collect(IT record); - protected RuntimeContext getRuntimeContext(AbstractInvokable parent, String name) { - if (parent instanceof RegularPactTask) { - return ((RegularPactTask) parent).getRuntimeContext(name); - } else { - Environment env = parent.getEnvironment(); - return new RuntimeUDFContext(name, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup()); - } + protected RuntimeContext getUdfRuntimeContext() { + return this.udfContext; } @SuppressWarnings("unchecked") diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedMapDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedMapDriver.java index fe3a823ac7c6f..359dc49d1fdc5 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedMapDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ChainedMapDriver.java @@ -31,7 +31,7 @@ public void setup(AbstractInvokable parent) { final GenericMapper mapper = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericMapper.class); this.mapper = mapper; - mapper.setRuntimeContext(getRuntimeContext(parent, this.taskName)); + mapper.setRuntimeContext(getUdfRuntimeContext()); } @Override diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java index 067aace276b9f..bc57766e76484 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java @@ -69,7 +69,7 @@ public void setup(AbstractInvokable parent) { final GenericReducer combiner = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericReducer.class); this.combiner = combiner; - combiner.setRuntimeContext(getRuntimeContext(parent, this.taskName)); + combiner.setRuntimeContext(getUdfRuntimeContext()); } @Override diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java index 8ffd611626f9a..de2d455440586 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java @@ -49,31 +49,31 @@ */ public class TaskConfig { - private static final String TASK_NAME = "pact.task.name"; + private static final String TASK_NAME = "taskname"; // ------------------------------------ User Code --------------------------------------------- - private static final String STUB_OBJECT = "pact.stub.udf-object"; + private static final String STUB_OBJECT = "udf"; - private static final String STUB_PARAM_PREFIX = "pact.stub.param."; + private static final String STUB_PARAM_PREFIX = "udf.param."; // -------------------------------------- Driver ---------------------------------------------- - private static final String DRIVER_CLASS = "pact.driver.class"; + private static final String DRIVER_CLASS = "driver.class"; - private static final String DRIVER_STRATEGY = "pact.driver.strategy"; + private static final String DRIVER_STRATEGY = "driver.strategy"; - private static final String DRIVER_COMPARATOR_FACTORY_PREFIX = "pact.driver.comp."; + private static final String DRIVER_COMPARATOR_FACTORY_PREFIX = "driver.comp."; - private static final String DRIVER_COMPARATOR_PARAMETERS_PREFIX = "pact.driver.comp.params."; + private static final String DRIVER_COMPARATOR_PARAMETERS_PREFIX = "driver.comp.params."; - private static final String DRIVER_PAIR_COMPARATOR_FACTORY = "pact.driver.paircomp"; + private static final String DRIVER_PAIR_COMPARATOR_FACTORY = "driver.paircomp"; // -------------------------------------- Inputs ---------------------------------------------- - private static final String NUM_INPUTS = "pact.in.num"; + private static final String NUM_INPUTS = "in.num"; - private static final String NUM_BROADCAST_INPUTS = "pact.in.broadcast.num"; + private static final String NUM_BROADCAST_INPUTS = "in.bc.num"; /* * If one input has multiple predecessors (bag union), multiple @@ -86,132 +86,134 @@ public class TaskConfig { * Hence, "pact.in.num" would be 3, "pact.size.inputGroup.0" * would be 2, and "pact.size.inputGroup.1" would be 1. */ - private static final String INPUT_GROUP_SIZE_PREFIX = "pact.in.groupsize."; + private static final String INPUT_GROUP_SIZE_PREFIX = "in.groupsize."; - private static final String BROADCAST_INPUT_GROUP_SIZE_PREFIX = "pact.in.broadcast.groupsize."; + private static final String BROADCAST_INPUT_GROUP_SIZE_PREFIX = "in.bc.groupsize."; - private static final String INPUT_TYPE_SERIALIZER_FACTORY_PREFIX = "pact.in.serializer."; + private static final String INPUT_TYPE_SERIALIZER_FACTORY_PREFIX = "in.serializer."; - private static final String BROADCAST_INPUT_TYPE_SERIALIZER_FACTORY_PREFIX = "pact.in.broadcast.serializer."; + private static final String BROADCAST_INPUT_TYPE_SERIALIZER_FACTORY_PREFIX = "in.bc.serializer."; - private static final String INPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "pact.in.serializer.param."; + private static final String INPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "in.serializer.param."; - private static final String BROADCAST_INPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "pact.in.broadcast.serializer.param."; + private static final String BROADCAST_INPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "in.bc.serializer.param."; - private static final String INPUT_LOCAL_STRATEGY_PREFIX = "pact.in.strategy."; + private static final String INPUT_LOCAL_STRATEGY_PREFIX = "in.strategy."; - private static final String INPUT_STRATEGY_COMPARATOR_FACTORY_PREFIX = "pact.in.comparator."; + private static final String INPUT_STRATEGY_COMPARATOR_FACTORY_PREFIX = "in.comparator."; - private static final String INPUT_STRATEGY_COMPARATOR_PARAMETERS_PREFIX = "pact.in.comparator.param."; + private static final String INPUT_STRATEGY_COMPARATOR_PARAMETERS_PREFIX = "in.comparator.param."; - private static final String INPUT_DAM_PREFIX = "pact.in.dam."; + private static final String INPUT_DAM_PREFIX = "in.dam."; - private static final String INPUT_REPLAYABLE_PREFIX = "pact.in.dam.replay."; + private static final String INPUT_REPLAYABLE_PREFIX = "in.dam.replay."; - private static final String INPUT_DAM_MEMORY_PREFIX = "pact.in.dam.mem."; + private static final String INPUT_DAM_MEMORY_PREFIX = "in.dam.mem."; - private static final String BROADCAST_INPUT_NAME_PREFIX = "pact.in.broadcast.name."; + private static final String BROADCAST_INPUT_NAME_PREFIX = "in.broadcast.name."; // -------------------------------------- Outputs --------------------------------------------- - private static final String OUTPUTS_NUM = "pact.out.num"; + private static final String OUTPUTS_NUM = "out.num"; - private static final String OUTPUT_TYPE_SERIALIZER_FACTORY = "pact.out.serializer"; + private static final String OUTPUT_TYPE_SERIALIZER_FACTORY = "out.serializer"; - private static final String OUTPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "pact.out.serializer.param."; + private static final String OUTPUT_TYPE_SERIALIZER_PARAMETERS_PREFIX = "out.serializer.param."; - private static final String OUTPUT_SHIP_STRATEGY_PREFIX = "pact.out.shipstrategy."; + private static final String OUTPUT_SHIP_STRATEGY_PREFIX = "out.shipstrategy."; - private static final String OUTPUT_TYPE_COMPARATOR_FACTORY_PREFIX = "pact.out.comp."; + private static final String OUTPUT_TYPE_COMPARATOR_FACTORY_PREFIX = "out.comp."; - private static final String OUTPUT_TYPE_COMPARATOR_PARAMETERS_PREFIX = "pact.out.comp.param."; + private static final String OUTPUT_TYPE_COMPARATOR_PARAMETERS_PREFIX = "out.comp.param."; - private static final String OUTPUT_DATA_DISTRIBUTION_CLASS = "pact.out.distribution.class"; + private static final String OUTPUT_DATA_DISTRIBUTION_CLASS = "out.distribution.class"; - private static final String OUTPUT_DATA_DISTRIBUTION_PREFIX = "pact.out.distribution."; + private static final String OUTPUT_DATA_DISTRIBUTION_PREFIX = "out.distribution."; // ------------------------------------- Chaining --------------------------------------------- - private static final String CHAINING_NUM_STUBS = "pact.chaining.num"; + private static final String CHAINING_NUM_STUBS = "chaining.num"; - private static final String CHAINING_TASKCONFIG_PREFIX = "pact.chaining.taskconfig."; + private static final String CHAINING_TASKCONFIG_PREFIX = "chaining.taskconfig."; - private static final String CHAINING_TASK_PREFIX = "pact.chaining.task."; + private static final String CHAINING_TASK_PREFIX = "chaining.task."; - private static final String CHAINING_TASKNAME_PREFIX = "pact.chaining.taskname."; + private static final String CHAINING_TASKNAME_PREFIX = "chaining.taskname."; // ------------------------------------ Memory & Co ------------------------------------------- - private static final String MEMORY_DRIVER = "pact.memory.driver"; + private static final String MEMORY_DRIVER = "memory.driver"; - private static final String MEMORY_INPUT_PREFIX = "pact.memory.input."; + private static final String MEMORY_INPUT_PREFIX = "memory.input."; - private static final String FILEHANDLES_DRIVER = "pact.filehandles.driver"; + private static final String FILEHANDLES_DRIVER = "filehandles.driver"; - private static final String FILEHANDLES_INPUT_PREFIX = "pact.filehandles.input."; + private static final String FILEHANDLES_INPUT_PREFIX = "filehandles.input."; - private static final String SORT_SPILLING_THRESHOLD_DRIVER = "pact.sort-spill-threshold.driver"; + private static final String SORT_SPILLING_THRESHOLD_DRIVER = "sort-spill-threshold.driver"; - private static final String SORT_SPILLING_THRESHOLD_INPUT_PREFIX = "pact.sort-spill-threshold.input."; + private static final String SORT_SPILLING_THRESHOLD_INPUT_PREFIX = "sort-spill-threshold.input."; // ----------------------------------- Iterations --------------------------------------------- - private static final String NUMBER_OF_ITERATIONS = "pact.iterative.num-iterations"; + private static final String NUMBER_OF_ITERATIONS = "iterative.num-iterations"; - private static final String NUMBER_OF_EOS_EVENTS_PREFIX = "pact.iterative.num-eos-events."; + private static final String NUMBER_OF_EOS_EVENTS_PREFIX = "iterative.num-eos-events."; - private static final String ITERATION_HEAD_ID = "pact.iterative.head.id"; + private static final String NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX = "iterative.num-eos-events.bc."; - private static final String ITERATION_WORKSET_MARKER = "pact.iterative.is-workset"; + private static final String ITERATION_HEAD_ID = "iterative.head.id"; - private static final String ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION = "pact.iterative.head.ps-input-index"; + private static final String ITERATION_WORKSET_MARKER = "iterative.is-workset"; - private static final String ITERATION_HEAD_INDEX_OF_SOLUTIONSET = "pact.iterative.head.ss-input-index"; + private static final String ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION = "iterative.head.ps-input-index"; - private static final String ITERATION_HEAD_BACKCHANNEL_MEMORY = "pact.iterative.head.backchannel-memory"; + private static final String ITERATION_HEAD_INDEX_OF_SOLUTIONSET = "iterative.head.ss-input-index"; - private static final String ITERATION_HEAD_SOLUTION_SET_MEMORY = "pact.iterative.head.solutionset-memory"; + private static final String ITERATION_HEAD_BACKCHANNEL_MEMORY = "iterative.head.backchannel-memory"; - private static final String ITERATION_HEAD_FINAL_OUT_CONFIG_PREFIX = "pact.iterative.head.out."; + private static final String ITERATION_HEAD_SOLUTION_SET_MEMORY = "iterative.head.solutionset-memory"; - private static final String ITERATION_HEAD_SYNC_OUT_INDEX = "pact.iterative.head.sync-index."; + private static final String ITERATION_HEAD_FINAL_OUT_CONFIG_PREFIX = "iterative.head.out."; - private static final String ITERATION_CONVERGENCE_CRITERION = "pact.iterative.terminationCriterion"; + private static final String ITERATION_HEAD_SYNC_OUT_INDEX = "iterative.head.sync-index."; - private static final String ITERATION_CONVERGENCE_CRITERION_AGG_NAME = "pact.iterative.terminationCriterion.agg.name"; + private static final String ITERATION_CONVERGENCE_CRITERION = "iterative.terminationCriterion"; - private static final String ITERATION_NUM_AGGREGATORS = "pact.iterative.num-aggs"; + private static final String ITERATION_CONVERGENCE_CRITERION_AGG_NAME = "iterative.terminationCriterion.agg.name"; - private static final String ITERATION_AGGREGATOR_NAME_PREFIX = "pact.iterative.agg.name."; + private static final String ITERATION_NUM_AGGREGATORS = "iterative.num-aggs"; - private static final String ITERATION_AGGREGATOR_PREFIX = "pact.iterative.agg.data."; + private static final String ITERATION_AGGREGATOR_NAME_PREFIX = "iterative.agg.name."; - private static final String ITERATION_SOLUTION_SET_SERIALIZER = "pact.iterative.ss-serializer"; + private static final String ITERATION_AGGREGATOR_PREFIX = "iterative.agg.data."; - private static final String ITERATION_SOLUTION_SET_SERIALIZER_PARAMETERS = "pact.iterative.ss-serializer.params"; + private static final String ITERATION_SOLUTION_SET_SERIALIZER = "iterative.ss-serializer"; - private static final String ITERATION_SOLUTION_SET_COMPARATOR = "pact.iterative.ss-comparator"; + private static final String ITERATION_SOLUTION_SET_SERIALIZER_PARAMETERS = "iterative.ss-serializer.params"; - private static final String ITERATION_SOLUTION_SET_COMPARATOR_PARAMETERS = "pact.iterative.ss-comparator.params"; + private static final String ITERATION_SOLUTION_SET_COMPARATOR = "iterative.ss-comparator"; - private static final String ITERATION_SOLUTION_SET_PROBER_SERIALIZER = "pact.iterative.ss-prober-serializer"; + private static final String ITERATION_SOLUTION_SET_COMPARATOR_PARAMETERS = "iterative.ss-comparator.params"; - private static final String ITERATION_SOLUTION_SET_PROBER_SERIALIZER_PARAMETERS = "pact.iterative.ss-prober-serializer.params"; + private static final String ITERATION_SOLUTION_SET_PROBER_SERIALIZER = "iterative.ss-prober-serializer"; - private static final String ITERATION_SOLUTION_SET_PROBER_COMPARATOR = "pact.iterative.ss-prober-comparator"; + private static final String ITERATION_SOLUTION_SET_PROBER_SERIALIZER_PARAMETERS = "iterative.ss-prober-serializer.params"; - private static final String ITERATION_SOLUTION_SET_PROBER_COMPARATOR_PARAMETERS = "pact.iterative.ss-prober-comparator.params"; + private static final String ITERATION_SOLUTION_SET_PROBER_COMPARATOR = "iterative.ss-prober-comparator"; - private static final String ITERATION_SOLUTION_SET_PAIR_COMPARATOR = "pact.iterative.ss-pair-comparator"; + private static final String ITERATION_SOLUTION_SET_PROBER_COMPARATOR_PARAMETERS = "iterative.ss-prober-comparator.params"; - private static final String ITERATION_SOLUTION_SET_UPDATE = "pact.iterative.ss-update"; + private static final String ITERATION_SOLUTION_SET_PAIR_COMPARATOR = "iterative.ss-pair-comparator"; - private static final String ITERATION_SOLUTION_SET_UPDATE_SKIP_REPROBE = "pact.iterative.ss-update-fast"; + private static final String ITERATION_SOLUTION_SET_UPDATE = "iterative.ss-update"; + + private static final String ITERATION_SOLUTION_SET_UPDATE_SKIP_REPROBE = "iterative.ss-update-fast"; - private static final String ITERATION_SOLUTION_SET_UPDATE_WAIT = "pact.iterative.ss-wait"; + private static final String ITERATION_SOLUTION_SET_UPDATE_WAIT = "iterative.ss-wait"; - private static final String ITERATION_WORKSET_UPDATE = "pact.iterative.ws-update"; + private static final String ITERATION_WORKSET_UPDATE = "iterative.ws-update"; // ---------------------------------- Miscellaneous ------------------------------------------- @@ -265,8 +267,7 @@ public void setStubWrapper(UserCodeWrapper wrapper) { } @SuppressWarnings("unchecked") - public UserCodeWrapper getStubWrapper(ClassLoader cl) - { + public UserCodeWrapper getStubWrapper(ClassLoader cl) { try { return (UserCodeWrapper) InstantiationUtil.readObjectFromConfig(this.config, STUB_OBJECT, cl); } catch (ClassNotFoundException e) { @@ -791,6 +792,23 @@ public int getNumberOfEventsUntilInterruptInIterativeGate(int inputGateIndex) { return this.config.getInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, 0); } + public void setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(int bcGateIndex, int numEvents) { + if (bcGateIndex < 0) { + throw new IllegalArgumentException(); + } + if (numEvents <= 0) { + throw new IllegalArgumentException(); + } + this.config.setInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, numEvents); + } + + public int getNumberOfEventsUntilInterruptInIterativeBroadcastGate(int bcGateIndex) { + if (bcGateIndex < 0) { + throw new IllegalArgumentException(); + } + return this.config.getInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, 0); + } + public void setIterationId(int id) { if (id < 0) { throw new IllegalArgumentException(); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/udf/RuntimeUDFContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/udf/RuntimeUDFContext.java index 11d4331fbf4db..5b73c9f5ca09a 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/udf/RuntimeUDFContext.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/udf/RuntimeUDFContext.java @@ -122,10 +122,6 @@ private Accumulator getAccumulator(String name, } public void setBroadcastVariable(String name, Collection value) { - if (this.broadcastVars.containsKey(name)) { - throw new IllegalArgumentException("The broadcast variable '" + name - + "' already exists and cannot be added."); - } this.broadcastVars.put(name, value); } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java index 7845e02ea7774..bfbcc0add1a39 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java @@ -67,9 +67,8 @@ public class BroadcastVarsNepheleITCase extends TestBase2 { protected String resultPath; - public BroadcastVarsNepheleITCase() { - super(new Configuration()); - } + + public static final String getInputPoints(int numPoints, int numDimensions, long seed) { if (numPoints < 1 || numPoints > 1000000) @@ -176,6 +175,8 @@ protected void postSubmit() throws Exception { public static final class DotProducts extends MapFunction { + private static final long serialVersionUID = 1L; + private final Record result = new Record(3); private final LongValue lft = new LongValue(); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java new file mode 100644 index 0000000000000..96b0ba80e69fd --- /dev/null +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java @@ -0,0 +1,314 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.test.broadcastvars; + +import eu.stratosphere.api.common.io.FileOutputFormat; +import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; +import eu.stratosphere.api.common.typeutils.TypeComparatorFactory; +import eu.stratosphere.api.common.typeutils.TypeSerializerFactory; +import eu.stratosphere.api.java.record.io.CsvInputFormat; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.PointBuilder; +import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.PointOutFormat; +import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.RecomputeClusterCenter; +import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.SelectNearestCenter; +import eu.stratosphere.nephele.io.DistributionPattern; +import eu.stratosphere.nephele.io.channels.ChannelType; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; +import eu.stratosphere.nephele.jobgraph.JobInputVertex; +import eu.stratosphere.nephele.jobgraph.JobOutputVertex; +import eu.stratosphere.nephele.jobgraph.JobTaskVertex; +import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask; +import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparatorFactory; +import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordSerializerFactory; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.DriverStrategy; +import eu.stratosphere.pact.runtime.task.MapDriver; +import eu.stratosphere.pact.runtime.task.NoOpDriver; +import eu.stratosphere.pact.runtime.task.ReduceDriver; +import eu.stratosphere.pact.runtime.task.chaining.ChainedMapDriver; +import eu.stratosphere.pact.runtime.task.util.LocalStrategy; +import eu.stratosphere.pact.runtime.task.util.TaskConfig; +import eu.stratosphere.test.iterative.nephele.JobGraphUtils; +import eu.stratosphere.test.testdata.KMeansData; +import eu.stratosphere.test.util.TestBase2; +import eu.stratosphere.types.DoubleValue; +import eu.stratosphere.types.IntValue; +import eu.stratosphere.util.LogUtils; + +public class KMeansIterativeNepheleITCase extends TestBase2 { + + private static final int ITERATION_ID = 42; + + private static final int MEMORY_PER_CONSUMER = 2; + + protected String dataPath; + protected String clusterPath; + protected String resultPath; + + public KMeansIterativeNepheleITCase() { + LogUtils.initializeDefaultConsoleLogger(); + } + + @Override + protected void preSubmit() throws Exception { + dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS); + clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultPath); + } + + @Override + protected JobGraph getJobGraph() throws Exception { + return createJobGraph(dataPath, clusterPath, this.resultPath, 1, 20); + } + + // ------------------------------------------------------------------------------------------------------------- + // Job vertex builder methods + // ------------------------------------------------------------------------------------------------------------- + + private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { + @SuppressWarnings("unchecked") + CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class); + JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks, numSubTasks); + { + TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration()); + taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + taskConfig.setOutputSerializer(serializer); + + TaskConfig chainedMapper = new TaskConfig(new Configuration()); + chainedMapper.setDriverStrategy(DriverStrategy.MAP); + chainedMapper.setStubWrapper(new UserCodeObjectWrapper(new PointBuilder())); + chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD); + chainedMapper.setOutputSerializer(serializer); + + taskConfig.addChainedTask(ChainedMapDriver.class, chainedMapper, "Build points"); + } + + return pointsInput; + } + + private static JobInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory serializer) { + @SuppressWarnings("unchecked") + CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class); + JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks, numSubTasks); + + { + TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration()); + taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + taskConfig.setOutputSerializer(serializer); + + TaskConfig chainedMapper = new TaskConfig(new Configuration()); + chainedMapper.setDriverStrategy(DriverStrategy.MAP); + chainedMapper.setStubWrapper(new UserCodeObjectWrapper(new PointBuilder())); + chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD); + chainedMapper.setOutputSerializer(serializer); + + taskConfig.addChainedTask(ChainedMapDriver.class, chainedMapper, "Build centers"); + } + + return modelsInput; + } + + private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { + + JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks); + + { + TaskConfig taskConfig = new TaskConfig(output.getConfiguration()); + taskConfig.addInputToGroup(0); + taskConfig.setInputSerializer(serializer, 0); + + taskConfig.setStubWrapper(new UserCodeObjectWrapper(new PointOutFormat())); + taskConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, resultPath); + } + + return output; + } + + private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory serializer) { + JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks); + + TaskConfig headConfig = new TaskConfig(head.getConfiguration()); + headConfig.setIterationId(ITERATION_ID); + + // initial input / partial solution + headConfig.addInputToGroup(0); + headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0); + headConfig.setInputSerializer(serializer, 0); + + // back channel / iterations + headConfig.setBackChannelMemory(MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE); + + // output into iteration. broadcasting the centers + headConfig.setOutputSerializer(serializer); + headConfig.addOutputShipStrategy(ShipStrategyType.BROADCAST); + + // final output + TaskConfig headFinalOutConfig = new TaskConfig(new Configuration()); + headFinalOutConfig.setOutputSerializer(serializer); + headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig); + + // the sync + headConfig.setIterationHeadIndexOfSyncOutput(2); + + // the driver + headConfig.setDriver(NoOpDriver.class); + headConfig.setDriverStrategy(DriverStrategy.UNARY_NO_OP); + + return head; + } + + private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory inputSerializer, + TypeSerializerFactory broadcastVarSerializer, TypeSerializerFactory outputSerializer, + TypeComparatorFactory outputComparator) + { + JobTaskVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class, + "Map (Select nearest center)", jobGraph, numSubTasks, numSubTasks); + + TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration()); + intermediateConfig.setIterationId(ITERATION_ID); + + intermediateConfig.setDriver(MapDriver.class); + intermediateConfig.setDriverStrategy(DriverStrategy.MAP); + intermediateConfig.addInputToGroup(0); + intermediateConfig.setInputSerializer(inputSerializer, 0); + + intermediateConfig.setOutputSerializer(outputSerializer); + intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + intermediateConfig.setOutputComparator(outputComparator, 0); + + intermediateConfig.setBroadcastInputName("centers", 0); + intermediateConfig.addBroadcastInputToGroup(0); + intermediateConfig.setBroadcastInputSerializer(broadcastVarSerializer, 0); + + // the udf + intermediateConfig.setStubWrapper(new UserCodeObjectWrapper(new SelectNearestCenter())); + + return mapper; + } + + private static JobTaskVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory inputSerializer, + TypeComparatorFactory inputComparator, TypeSerializerFactory outputSerializer) + { + // ---------------- the tail (co group) -------------------- + + JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph, + numSubTasks, numSubTasks); + + TaskConfig tailConfig = new TaskConfig(tail.getConfiguration()); + tailConfig.setIterationId(ITERATION_ID); + tailConfig.setIsWorksetUpdate(); + + // inputs and driver + tailConfig.setDriver(ReduceDriver.class); + tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP); + tailConfig.addInputToGroup(0); + tailConfig.setInputSerializer(inputSerializer, 0); + tailConfig.setDriverComparator(inputComparator, 0); + + tailConfig.setInputLocalStrategy(0, LocalStrategy.SORT); + tailConfig.setInputComparator(inputComparator, 0); + tailConfig.setMemoryInput(0, MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE); + tailConfig.setFilehandlesInput(0, 128); + tailConfig.setSpillingThresholdInput(0, 0.9f); + + // output + tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + tailConfig.setOutputSerializer(outputSerializer); + + // the udf + tailConfig.setStubWrapper(new UserCodeObjectWrapper(new RecomputeClusterCenter())); + + return tail; + } + + public JobOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) { + JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop); + TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); + syncConfig.setNumberOfIterations(numIterations); + syncConfig.setIterationId(ITERATION_ID); + return sync; + } + + // ------------------------------------------------------------------------------------------------------------- + // Unified solution set and workset tail update + // ------------------------------------------------------------------------------------------------------------- + + private JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) throws JobGraphDefinitionException { + + // -- init ------------------------------------------------------------------------------------------------- + final TypeSerializerFactory serializer = RecordSerializerFactory.get(); + @SuppressWarnings("unchecked") + final TypeComparatorFactory int0Comparator = new RecordComparatorFactory(new int[] { 0 }, new Class[] { IntValue.class }); + + JobGraph jobGraph = new JobGraph("KMeans Iterative"); + + // -- vertices --------------------------------------------------------------------------------------------- + JobInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer); + JobInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer); + + JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer); + JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator); + + JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer); + + JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks); + + JobOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks); + + JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + + // -- edges ------------------------------------------------------------------------------------------------ + JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(centers, head, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(head, mapper, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + new TaskConfig(mapper.getConfiguration()).setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks); + new TaskConfig(mapper.getConfiguration()).setInputCached(0, true); + new TaskConfig(mapper.getConfiguration()).setInputMaterializationMemory(0, MEMORY_PER_CONSUMER * JobGraphUtils.MEGABYTE); + + JobGraphUtils.connect(mapper, reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks); + + JobGraphUtils.connect(reducer, fakeTailOutput, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + + + + // -- instance sharing ------------------------------------------------------------------------------------- + points.setVertexToShareInstancesWith(output); + centers.setVertexToShareInstancesWith(output); + head.setVertexToShareInstancesWith(output); + mapper.setVertexToShareInstancesWith(output); + reducer.setVertexToShareInstancesWith(output); + fakeTailOutput.setVertexToShareInstancesWith(output); + sync.setVertexToShareInstancesWith(output); + + return jobGraph; + } +} diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleRecordPrograms/KMeansStepITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleRecordPrograms/KMeansStepITCase.java index a4124ca7c9a29..4ab9352eafb70 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleRecordPrograms/KMeansStepITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleRecordPrograms/KMeansStepITCase.java @@ -15,111 +15,39 @@ import java.io.BufferedReader; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.StringTokenizer; import org.junit.Assert; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import eu.stratosphere.api.common.Plan; -import eu.stratosphere.api.common.operators.GenericDataSink; -import eu.stratosphere.api.java.record.operators.CrossOperator; -import eu.stratosphere.api.java.record.operators.ReduceOperator; -import eu.stratosphere.compiler.PactCompiler; -import eu.stratosphere.configuration.Configuration; import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep; +import eu.stratosphere.test.testdata.KMeansData; import eu.stratosphere.test.util.TestBase2; -@RunWith(Parameterized.class) public class KMeansStepITCase extends TestBase2 { - public final static String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n" - + "3|1.60|70.11|16.32|\n" + "4|2.43|19.81|89.56|\n" + "5|67.99|9.00|14.48|\n" + "6|87.80|84.49|55.83|\n" - + "7|90.26|42.99|53.29|\n" + "8|51.36|6.16|9.35|\n" + "9|12.43|9.52|12.54|\n" + "10|80.01|8.78|29.74|\n" - + "11|92.76|2.93|80.07|\n" + "12|46.32|100.00|22.98|\n" + "13|34.11|45.61|58.60|\n" - + "14|68.82|16.36|96.60|\n" + "15|81.47|76.45|28.40|\n" + "16|65.55|40.21|43.43|\n" - + "17|84.22|88.56|13.31|\n" + "18|36.99|68.36|57.12|\n" + "19|28.87|37.69|91.04|\n" - + "20|31.56|13.22|86.00|\n" + "21|18.49|34.45|54.52|\n" + "22|13.33|94.02|92.07|\n" - + "23|91.19|81.62|55.06|\n" + "24|85.78|39.02|25.58|\n" + "25|94.41|47.07|78.23|\n" - + "26|90.62|10.43|80.20|\n" + "27|31.52|85.81|39.79|\n" + "28|24.65|77.98|26.35|\n" - + "29|69.34|75.79|63.96|\n" + "30|22.56|78.61|66.66|\n" + "31|91.74|83.82|73.92|\n" - + "32|76.64|89.53|44.66|\n" + "33|36.02|73.01|92.32|\n" + "34|87.86|18.94|10.74|\n" - + "35|91.94|34.61|5.20|\n" + "36|12.52|47.01|95.29|\n" + "37|44.01|26.19|78.50|\n" - + "38|26.20|73.36|10.08|\n" + "39|15.21|17.37|54.33|\n" + "40|27.96|94.81|44.41|\n" - + "41|26.44|44.81|70.88|\n" + "42|53.29|26.69|2.40|\n" + "43|23.94|11.50|1.71|\n" - + "44|19.00|25.48|50.80|\n" + "45|82.26|1.88|58.08|\n" + "46|47.56|82.54|82.73|\n" - + "47|51.54|35.10|32.95|\n" + "48|86.71|55.51|19.08|\n" + "49|54.16|23.68|32.41|\n" - + "50|71.81|32.83|46.66|\n" + "51|20.70|14.19|64.96|\n" + "52|57.17|88.56|55.23|\n" - + "53|91.39|49.38|70.55|\n" + "54|47.90|62.07|76.03|\n" + "55|55.70|37.77|30.15|\n" - + "56|87.87|74.62|25.95|\n" + "57|95.70|45.04|15.27|\n" + "58|41.61|89.37|24.45|\n" - + "59|82.19|20.84|11.13|\n" + "60|49.88|2.62|18.62|\n" + "61|16.42|53.30|74.13|\n" - + "62|38.37|72.62|35.16|\n" + "63|43.26|49.59|92.56|\n" + "64|28.96|2.36|78.49|\n" - + "65|88.41|91.43|92.55|\n" + "66|98.61|79.58|33.03|\n" + "67|4.94|18.65|30.78|\n" - + "68|75.89|79.30|63.90|\n" + "69|93.18|76.26|9.50|\n" + "70|73.43|70.50|76.49|\n" - + "71|78.64|90.87|34.49|\n" + "72|58.47|63.07|8.82|\n" + "73|69.74|54.36|64.43|\n" - + "74|38.47|36.60|33.39|\n" + "75|51.07|14.75|2.54|\n" + "76|24.18|16.85|15.00|\n" - + "77|7.56|50.72|93.45|\n" + "78|64.28|97.01|57.31|\n" + "79|85.30|24.13|76.57|\n" - + "80|72.78|30.78|13.11|\n" + "81|18.42|17.45|32.20|\n" + "82|87.44|74.98|87.90|\n" - + "83|38.30|17.77|37.33|\n" + "84|63.62|7.90|34.23|\n" + "85|8.84|67.87|30.65|\n" - + "86|76.12|51.83|80.12|\n" + "87|32.30|74.79|4.39|\n" + "88|41.73|45.34|18.66|\n" - + "89|58.13|18.43|83.38|\n" + "90|98.10|33.46|83.07|\n" + "91|17.76|4.10|88.51|\n" - + "92|60.58|18.15|59.96|\n" + "93|50.11|33.25|85.64|\n" + "94|97.74|60.93|38.97|\n" - + "95|76.31|52.50|95.43|\n" + "96|7.71|85.85|36.26|\n" + "97|9.32|72.21|42.17|\n" - + "98|71.29|51.88|57.62|\n" + "99|31.39|7.27|88.74|"; - - public final static String CLUSTERCENTERS = "0|1.96|65.04|20.82|\n" + "1|53.99|84.23|81.59|\n" + "2|97.28|74.50|40.32|\n" - + "3|63.57|24.53|87.07|\n" + "4|28.10|43.27|86.53|\n" + "5|99.51|62.70|64.48|\n" + "6|30.31|30.36|80.46|"; - - private final String NEWCLUSTERCENTERS = "0|28.47|54.80|21.88|\n" + "1|52.74|80.10|73.03|\n" - + "2|83.92|60.45|25.17|\n" + "3|70.73|20.18|67.06|\n" + "4|22.51|47.19|86.23|\n" + "5|82.70|53.79|68.68|\n" - + "6|29.74|19.17|59.16|"; - protected String dataPath; protected String clusterPath; protected String resultPath; - public KMeansStepITCase(Configuration config) { - super(config); - } - protected String getNewCenters() { - return NEWCLUSTERCENTERS; + return KMeansData.CENTERS_AFTER_ONE_STEP; } @Override protected void preSubmit() throws Exception { - final String dataPointDir = "dataPoints"; - - dataPath = getTempDirPath(dataPointDir); - resultPath = getTempFilePath("iter_1"); - - int numPartitions = 4; - String[] splits = splitInputString(DATAPOINTS, '\n', numPartitions); - - for (int i = 0; i < numPartitions; i++) { - String split = splits[i]; - createTempFile(dataPointDir + "/part_" + i + ".txt", split); - } - - // create cluster path and copy data - clusterPath = createTempFile("iter_0", CLUSTERCENTERS); + dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS); + clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS); + resultPath = getTempDirPath("result"); } - @Override protected Plan getTestJob() { KMeansSingleStep kmi = new KMeansSingleStep(); - - Plan plan = kmi.getPlan(config.getString("KMeansIterationTest#NoSubtasks", "1"), - dataPath, clusterPath, resultPath); - - setParameterToCross(plan, config.getString("KMeansIterationTest#ForwardSide", null), PactCompiler.HINT_SHIP_STRATEGY_FORWARD); - return plan; + return kmi.getPlan("4", dataPath, clusterPath, resultPath); } @@ -208,42 +136,4 @@ public int compare(String o1, String o2) { } } } - - @Parameters - public static Collection getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("KMeansIterationTest#NoSubtasks", 4); - config1.setString("KMeansIterationTest#ForwardSide", PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT); - - Configuration config2 = new Configuration(); - config2.setInteger("KMeansIterationTest#NoSubtasks", 4); - config2.setString("KMeansIterationTest#ForwardSide", PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT); - - return toParameterList(config1, config2); - } - - private String[] splitInputString(String inputString, char splitChar, int noSplits) { - String splitString = inputString.toString(); - String[] splits = new String[noSplits]; - int partitionSize = (splitString.length() / noSplits) - 2; - - // split data file and copy parts - for (int i = 0; i < noSplits - 1; i++) { - int cutPos = splitString.indexOf(splitChar, (partitionSize < splitString.length() ? partitionSize - : (splitString.length() - 1))); - splits[i] = splitString.substring(0, cutPos) + "\n"; - splitString = splitString.substring(cutPos + 1); - } - splits[noSplits - 1] = splitString; - - return splits; - } - - public static void setParameterToCross(Plan p, String key, String value) { - GenericDataSink sink = p.getDataSinks().iterator().next(); - ReduceOperator reduce2 = (ReduceOperator) sink.getInputs().get(0); - ReduceOperator reduce1 = (ReduceOperator) reduce2.getInputs().get(0); - CrossOperator cross = (CrossOperator) reduce1.getInputs().get(0); - cross.getParameters().setString(key, value); - } } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/IterativeKMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/IterativeKMeansITCase.java index c3eb1e679dd0a..14e03f62c427a 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/IterativeKMeansITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleScalaPrograms/IterativeKMeansITCase.java @@ -17,7 +17,7 @@ import eu.stratosphere.api.common.Plan; import eu.stratosphere.examples.scala.datamining.KMeans; -import eu.stratosphere.test.exampleRecordPrograms.KMeansStepITCase; +import eu.stratosphere.test.testdata.KMeansData; import eu.stratosphere.test.util.TestBase2; public class IterativeKMeansITCase extends TestBase2 { @@ -33,14 +33,14 @@ public class IterativeKMeansITCase extends TestBase2 { @Override protected void preSubmit() throws Exception { - pointsPath = createTempFile("datapoints.txt", KMeansStepITCase.DATAPOINTS); - clusterPath = createTempFile("initial_centers.txt", KMeansStepITCase.CLUSTERCENTERS); + pointsPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS); + clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS); resultPath = getTempDirPath("resulting_centers"); } @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(CENTERS_AFTER_20_ITERATIONS, resultPath); + compareResultsByLinesInMemory(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultPath); } @@ -49,14 +49,4 @@ protected Plan getTestJob() { KMeans kmi = new KMeans(); return kmi.getScalaPlan(4, pointsPath, clusterPath, resultPath, 20); } - - - private static final String CENTERS_AFTER_20_ITERATIONS = - "0|38.3|54.5|19.3|\n" + - "1|32.1|83.0|50.4|\n" + - "2|87.5|56.6|20.3|\n" + - "3|75.4|18.6|67.5|\n" + - "4|24.9|29.2|77.6|\n" + - "5|78.7|66.1|70.8|\n" + - "6|39.5|14.0|18.7|\n"; } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java index 561e147615cae..d30fa3362f794 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/IterativeKMeansITCase.java @@ -13,65 +13,21 @@ package eu.stratosphere.test.iterative; -import java.util.Collection; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - import eu.stratosphere.api.common.Plan; -import eu.stratosphere.configuration.Configuration; import eu.stratosphere.example.java.record.kmeans.KMeansIterative; import eu.stratosphere.test.exampleRecordPrograms.KMeansStepITCase; +import eu.stratosphere.test.testdata.KMeansData; -@RunWith(Parameterized.class) public class IterativeKMeansITCase extends KMeansStepITCase { - public IterativeKMeansITCase(Configuration config) { - super(config); - } - @Override protected Plan getTestJob() { - KMeansIterative kmi = new KMeansIterative(); - - Plan plan = kmi.getPlan(config.getString("IterativeKMeansITCase#NoSubtasks", "1"), - dataPath, clusterPath, resultPath, - config.getString("IterativeKMeansITCase#NumIterations", "1")); - - return plan; - } - - - @Parameters - public static Collection getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("IterativeKMeansITCase#NoSubtasks", 4); - config1.setString("IterativeKMeansITCase#NumIterations", "20"); - return toParameterList(config1); + return kmi.getPlan("4", dataPath, clusterPath, resultPath, "20"); } - @Override protected String getNewCenters() { - return CENTERS_AFTER_20_ITERATIONS; - } - - @Override - protected void postSubmit() throws Exception { - super.resultPath = super.resultPath+"/centers"; - super.postSubmit(); + return KMeansData.CENTERS_AFTER_20_ITERATIONS_DOUBLE_DIGIT; } - - - - public static final String CENTERS_AFTER_20_ITERATIONS = - "0|38.25|54.52|19.34|\n" + - "1|32.14|83.04|50.35|\n" + - "2|87.48|56.57|20.27|\n" + - "3|75.40|18.65|67.49|\n" + - "4|24.93|29.25|77.56|\n" + - "5|78.67|66.07|70.82|\n" + - "6|39.51|14.04|18.74|\n"; } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java index 3fea078813eb6..bac36bdc4e74b 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java @@ -15,7 +15,6 @@ import java.io.IOException; -import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper; import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; import eu.stratosphere.api.common.operators.util.UserCodeWrapper; import eu.stratosphere.api.common.io.FileInputFormat; @@ -55,15 +54,10 @@ public static > JobInputVertex createInput(T stub, int degreeOfParallelism, int numSubTasksPerInstance) { stub.setFilePath(path); - return createInput(new UserCodeObjectWrapper(stub), null, name, graph, degreeOfParallelism, numSubTasksPerInstance); - } - - public static > JobInputVertex createInput(Class stub, String path, String name, JobGraph graph, - int degreeOfParallelism, int numSubTasksPerInstance) { - return createInput(new UserCodeClassWrapper(stub), path, name, graph, degreeOfParallelism, numSubTasksPerInstance); + return createInput(new UserCodeObjectWrapper(stub), name, graph, degreeOfParallelism, numSubTasksPerInstance); } - private static > JobInputVertex createInput(UserCodeWrapper stub, String configPath, String name, JobGraph graph, + private static > JobInputVertex createInput(UserCodeWrapper stub, String name, JobGraph graph, int degreeOfParallelism, int numSubTasksPerInstance) { JobInputVertex inputVertex = new JobInputVertex(name, graph); @@ -77,9 +71,6 @@ private static > JobInputVertex createInput(UserCodeW TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration()); inputConfig.setStubWrapper(stub); - if (configPath != null) { - inputConfig.setStubParameter("pact.input.file.path", configPath); - } return inputVertex; } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java index 748fddd5ca018..37cb09138a098 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java @@ -132,7 +132,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(CustomImprovedDanglingPageRankInputFormat.class, + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -141,7 +141,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(CustomImprovedAdjacencyListInputFormat.class, + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java index 76a68d57a49f2..126f0b13340e0 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java @@ -132,7 +132,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(CustomImprovedDanglingPageRankInputFormat.class, + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -141,7 +141,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(CustomImprovedAdjacencyListInputFormat.class, + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java index 4890be8431a69..eed6bdda1c254 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java @@ -112,7 +112,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(ImprovedDanglingPageRankInputFormat.class, + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -121,7 +121,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(ImprovedAdjacencyListInputFormat.class, + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java index e79863a100c05..8b3aa4ebe53a9 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java @@ -13,11 +13,11 @@ package eu.stratosphere.test.iterative.nephele.danglingpagerank; -import eu.stratosphere.api.java.record.io.TextInputFormat; +import eu.stratosphere.api.java.record.io.DelimitedInputFormat; import eu.stratosphere.types.LongValue; import eu.stratosphere.types.Record; -public class ImprovedAdjacencyListInputFormat extends TextInputFormat { +public class ImprovedAdjacencyListInputFormat extends DelimitedInputFormat { private static final long serialVersionUID = 1L; private final LongValue vertexID = new LongValue(); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java index 98caf6c1e26aa..f1d9d4744f82f 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java @@ -13,14 +13,14 @@ package eu.stratosphere.test.iterative.nephele.danglingpagerank; -import eu.stratosphere.api.java.record.io.TextInputFormat; +import eu.stratosphere.api.java.record.io.DelimitedInputFormat; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.test.iterative.nephele.ConfigUtils; import eu.stratosphere.types.DoubleValue; import eu.stratosphere.types.LongValue; import eu.stratosphere.types.Record; -public class ImprovedDanglingPageRankInputFormat extends TextInputFormat { +public class ImprovedDanglingPageRankInputFormat extends DelimitedInputFormat { private static final long serialVersionUID = 1L; private LongValue vertexID = new LongValue(); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndTest.java index 2c4e8e2482716..646e117993132 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndTest.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndTest.java @@ -22,7 +22,7 @@ import eu.stratosphere.client.RemoteExecutor; import eu.stratosphere.client.localDistributed.LocalDistributedExecutor; -import eu.stratosphere.test.exampleRecordPrograms.KMeansStepITCase; +import eu.stratosphere.test.testdata.KMeansData; // When the API changes WordCountForTest needs to be rebuilt and the WordCountForTest.jar in resources needs // to be replaced with the new one. @@ -43,11 +43,11 @@ public void testEverything() { outFile.delete(); FileWriter fwPoints = new FileWriter(points); - fwPoints.write(KMeansStepITCase.DATAPOINTS); + fwPoints.write(KMeansData.DATAPOINTS); fwPoints.close(); FileWriter fwClusters = new FileWriter(clusters); - fwClusters.write(KMeansStepITCase.CLUSTERCENTERS); + fwClusters.write(KMeansData.INITIAL_CENTERS); fwClusters.close(); URL jarFileURL = getClass().getResource("/KMeansForTest.jar"); @@ -61,9 +61,9 @@ public void testEverything() { ex.executeJar(jarPath, "eu.stratosphere.examples.scala.datamining.KMeansForTest", new String[] {"4", - "file://" + points.getAbsolutePath(), - "file://" + clusters.getAbsolutePath(), - "file://" + outFile.getAbsolutePath(), + points.toURI().toString(), + clusters.toURI().toString(), + outFile.toURI().toString(), "1"}); points.delete(); diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/testdata/KMeansData.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/testdata/KMeansData.java new file mode 100644 index 0000000000000..09c590a6cd6e1 --- /dev/null +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/testdata/KMeansData.java @@ -0,0 +1,82 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.test.testdata; + + +public class KMeansData { + + public final static String DATAPOINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n" + + "3|1.60|70.11|16.32|\n" + "4|2.43|19.81|89.56|\n" + "5|67.99|9.00|14.48|\n" + "6|87.80|84.49|55.83|\n" + + "7|90.26|42.99|53.29|\n" + "8|51.36|6.16|9.35|\n" + "9|12.43|9.52|12.54|\n" + "10|80.01|8.78|29.74|\n" + + "11|92.76|2.93|80.07|\n" + "12|46.32|100.00|22.98|\n" + "13|34.11|45.61|58.60|\n" + + "14|68.82|16.36|96.60|\n" + "15|81.47|76.45|28.40|\n" + "16|65.55|40.21|43.43|\n" + + "17|84.22|88.56|13.31|\n" + "18|36.99|68.36|57.12|\n" + "19|28.87|37.69|91.04|\n" + + "20|31.56|13.22|86.00|\n" + "21|18.49|34.45|54.52|\n" + "22|13.33|94.02|92.07|\n" + + "23|91.19|81.62|55.06|\n" + "24|85.78|39.02|25.58|\n" + "25|94.41|47.07|78.23|\n" + + "26|90.62|10.43|80.20|\n" + "27|31.52|85.81|39.79|\n" + "28|24.65|77.98|26.35|\n" + + "29|69.34|75.79|63.96|\n" + "30|22.56|78.61|66.66|\n" + "31|91.74|83.82|73.92|\n" + + "32|76.64|89.53|44.66|\n" + "33|36.02|73.01|92.32|\n" + "34|87.86|18.94|10.74|\n" + + "35|91.94|34.61|5.20|\n" + "36|12.52|47.01|95.29|\n" + "37|44.01|26.19|78.50|\n" + + "38|26.20|73.36|10.08|\n" + "39|15.21|17.37|54.33|\n" + "40|27.96|94.81|44.41|\n" + + "41|26.44|44.81|70.88|\n" + "42|53.29|26.69|2.40|\n" + "43|23.94|11.50|1.71|\n" + + "44|19.00|25.48|50.80|\n" + "45|82.26|1.88|58.08|\n" + "46|47.56|82.54|82.73|\n" + + "47|51.54|35.10|32.95|\n" + "48|86.71|55.51|19.08|\n" + "49|54.16|23.68|32.41|\n" + + "50|71.81|32.83|46.66|\n" + "51|20.70|14.19|64.96|\n" + "52|57.17|88.56|55.23|\n" + + "53|91.39|49.38|70.55|\n" + "54|47.90|62.07|76.03|\n" + "55|55.70|37.77|30.15|\n" + + "56|87.87|74.62|25.95|\n" + "57|95.70|45.04|15.27|\n" + "58|41.61|89.37|24.45|\n" + + "59|82.19|20.84|11.13|\n" + "60|49.88|2.62|18.62|\n" + "61|16.42|53.30|74.13|\n" + + "62|38.37|72.62|35.16|\n" + "63|43.26|49.59|92.56|\n" + "64|28.96|2.36|78.49|\n" + + "65|88.41|91.43|92.55|\n" + "66|98.61|79.58|33.03|\n" + "67|4.94|18.65|30.78|\n" + + "68|75.89|79.30|63.90|\n" + "69|93.18|76.26|9.50|\n" + "70|73.43|70.50|76.49|\n" + + "71|78.64|90.87|34.49|\n" + "72|58.47|63.07|8.82|\n" + "73|69.74|54.36|64.43|\n" + + "74|38.47|36.60|33.39|\n" + "75|51.07|14.75|2.54|\n" + "76|24.18|16.85|15.00|\n" + + "77|7.56|50.72|93.45|\n" + "78|64.28|97.01|57.31|\n" + "79|85.30|24.13|76.57|\n" + + "80|72.78|30.78|13.11|\n" + "81|18.42|17.45|32.20|\n" + "82|87.44|74.98|87.90|\n" + + "83|38.30|17.77|37.33|\n" + "84|63.62|7.90|34.23|\n" + "85|8.84|67.87|30.65|\n" + + "86|76.12|51.83|80.12|\n" + "87|32.30|74.79|4.39|\n" + "88|41.73|45.34|18.66|\n" + + "89|58.13|18.43|83.38|\n" + "90|98.10|33.46|83.07|\n" + "91|17.76|4.10|88.51|\n" + + "92|60.58|18.15|59.96|\n" + "93|50.11|33.25|85.64|\n" + "94|97.74|60.93|38.97|\n" + + "95|76.31|52.50|95.43|\n" + "96|7.71|85.85|36.26|\n" + "97|9.32|72.21|42.17|\n" + + "98|71.29|51.88|57.62|\n" + "99|31.39|7.27|88.74|"; + + public static final String INITIAL_CENTERS = "0|1.96|65.04|20.82|\n" + "1|53.99|84.23|81.59|\n" + "2|97.28|74.50|40.32|\n" + + "3|63.57|24.53|87.07|\n" + "4|28.10|43.27|86.53|\n" + "5|99.51|62.70|64.48|\n" + "6|30.31|30.36|80.46|"; + + public static final String CENTERS_AFTER_ONE_STEP = "0|28.47|54.80|21.88|\n" + "1|52.74|80.10|73.03|\n" + + "2|83.92|60.45|25.17|\n" + "3|70.73|20.18|67.06|\n" + "4|22.51|47.19|86.23|\n" + "5|82.70|53.79|68.68|\n" + + "6|29.74|19.17|59.16|"; + + public static final String CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT = + "0|38.3|54.5|19.3|\n" + + "1|32.1|83.0|50.4|\n" + + "2|87.5|56.6|20.3|\n" + + "3|75.4|18.6|67.5|\n" + + "4|24.9|29.2|77.6|\n" + + "5|78.7|66.1|70.8|\n" + + "6|39.5|14.0|18.7|\n"; + + public static final String CENTERS_AFTER_20_ITERATIONS_DOUBLE_DIGIT = + "0|38.25|54.52|19.34|\n" + + "1|32.14|83.04|50.35|\n" + + "2|87.48|56.57|20.27|\n" + + "3|75.40|18.65|67.49|\n" + + "4|24.93|29.25|77.56|\n" + + "5|78.67|66.07|70.82|\n" + + "6|39.51|14.04|18.74|\n"; + + + + private KMeansData() {} +}