Skip to content

Commit

Permalink
Adjusted kmeans examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 13, 2014
1 parent 3c43e58 commit 4353134
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 453 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,37 @@
import eu.stratosphere.api.common.accumulators.Histogram;
import eu.stratosphere.api.common.accumulators.IntCounter;
import eu.stratosphere.api.common.accumulators.LongCounter;
import eu.stratosphere.types.Record;

/**
*
* A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
* of the function will have a context through which it can access static contextual information (such as
* the current degree of parallelism) and other constructs like accumulators and broadcast variables.
* <p>
* A function can, during runtime, obtain the RuntimeContext via a call to
* {@link eu.stratosphere.api.common.functions.AbstractFunction#getRuntimeContext()}.
*/
public interface RuntimeContext {

/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
*
* @return The name of the task in which the UDF runs.
*/
String getTaskName();

/**
* Gets the degree of parallelism with which the parallel task runs.
*
* @return The degree of parallelism with which the parallel task runs.
*/
int getNumberOfParallelSubtasks();

/**
* Gets the number of the parallel subtask. The numbering starts from 1 and goes up to the degree-of-parallelism,
* as returned by {@link #getNumberOfParallelSubtasks()}.
*
* @return The number of the parallel subtask.
*/
int getIndexOfThisSubtask();

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -109,12 +129,6 @@ public interface RuntimeContext {

// --------------------------------------------------------------------------------------------

/**
* Sets the value of the broadcast variable identified by the given
* {@code name}.
*/
void setBroadcastVariable(String name, Collection<?> value);

/**
* Returns the result bound to the broadcast variable identified by the
* given {@code name}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public UserCodeWrapper<T> getUserCodeWrapper() {

// --------------------------------------------------------------------------------------------

// TODO: add delegates for the parameter input setters to the Operator builders

/**
* Returns the input, or null, if none is set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,97 +13,64 @@

package eu.stratosphere.example.java.record.kmeans;

import java.util.ArrayList;
import java.util.List;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.java.record.operators.CrossOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.example.java.record.kmeans.udfs.ComputeDistance;
import eu.stratosphere.example.java.record.kmeans.udfs.FindNearestCenter;
import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.RecomputeClusterCenter;
import eu.stratosphere.example.java.record.kmeans.KMeansSingleStep.SelectNearestCenter;
import eu.stratosphere.example.java.record.kmeans.udfs.PointInFormat;
import eu.stratosphere.example.java.record.kmeans.udfs.PointOutFormat;
import eu.stratosphere.example.java.record.kmeans.udfs.RecomputeClusterCenter;
import eu.stratosphere.types.IntValue;


public class KMeansIterative implements Program, ProgramDescription {


@Override
public Plan getPlan(String... args) {
// parse job parameters
final int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
final String dataPointInput = (args.length > 1 ? args[1] : "");
final String clusterInput = (args.length > 2 ? args[2] : "");
final String output = (args.length > 3 ? args[3] : "");
final int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 1);
int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String dataPointInput = (args.length > 1 ? args[1] : "");
String clusterInput = (args.length > 2 ? args[2] : "");
String output = (args.length > 3 ? args[3] : "");
int numIterations = (args.length > 4 ? Integer.parseInt(args[4]) : 2);

// create DataSourceContract for cluster center input
FileDataSource initialClusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers");
initialClusterPoints.setDegreeOfParallelism(1);

BulkIteration iteration = new BulkIteration("K-Means Loop");
iteration.setInput(initialClusterPoints);
iteration.setMaximumNumberOfIterations(numIterations);

// create DataSourceContract for data point input
FileDataSource dataPoints = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points");

// create CrossOperator for distance computation
CrossOperator computeDistance = CrossOperator.builder(new ComputeDistance())
.input1(dataPoints)
.input2(iteration.getPartialSolution())
.name("Compute Distances")
.build();
// create DataSourceContract for cluster center input
FileDataSource clusterPoints = new FileDataSource(new PointInFormat(), clusterInput, "Centers");

BulkIteration iter = new BulkIteration("k-means loop");
iter.setInput(clusterPoints);
iter.setMaximumNumberOfIterations(numIterations);

// create ReduceOperator for finding the nearest cluster centers
ReduceOperator findNearestClusterCenters = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
.input(computeDistance)
.name("Find Nearest Centers")
.build();
// create CrossOperator for distance computation
MapOperator findNearestClusterCenters = MapOperator.builder(new SelectNearestCenter())
.setBroadcastVariable("centers", iter.getPartialSolution())
.input(dataPoints)
.name("Find Nearest Centers")
.build();

// create ReduceOperator for computing new cluster positions
ReduceOperator recomputeClusterCenter = ReduceOperator.builder(new RecomputeClusterCenter(), IntValue.class, 0)
.input(findNearestClusterCenters)
.name("Recompute Center Positions")
.build();
iteration.setNextPartialSolution(recomputeClusterCenter);

// create DataSourceContract for data point input
FileDataSource dataPoints2 = new FileDataSource(new PointInFormat(), dataPointInput, "Data Points 2");
.input(findNearestClusterCenters)
.name("Recompute Center Positions")
.build();

// compute distance of points to final clusters
CrossOperator computeFinalDistance = CrossOperator.builder(new ComputeDistance())
.input1(dataPoints2)
.input2(iteration)
.name("Compute Final Distances")
.build();

// find nearest final cluster for point
ReduceOperator findNearestFinalCluster = ReduceOperator.builder(new FindNearestCenter(), IntValue.class, 0)
.input(computeFinalDistance)
.name("Find Nearest Final Centers")
.build();
iter.setNextPartialSolution(recomputeClusterCenter);

// create DataSinkContract for writing the new cluster positions
FileDataSink finalClusters = new FileDataSink(new PointOutFormat(), output+"/centers", iteration, "Cluster Positions");
FileDataSink newClusterPoints = new FileDataSink(new PointOutFormat(), output, recomputeClusterCenter, "New Center Positions");

// write assigned clusters
FileDataSink clusterAssignments = new FileDataSink(new PointOutFormat(), output+"/points", findNearestFinalCluster, "Cluster Assignments");

List<GenericDataSink> sinks = new ArrayList<GenericDataSink>();
sinks.add(finalClusters);
sinks.add(clusterAssignments);

// return the PACT plan
Plan plan = new Plan(sinks, "Iterative KMeans");
Plan plan = new Plan(newClusterPoints, "KMeans Iteration");
plan.setDefaultParallelism(numSubTasks);
return plan;
}
Expand All @@ -112,19 +79,4 @@ public Plan getPlan(String... args) {
public String getDescription() {
return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
}

public static void main(String[] args) throws Exception {
KMeansIterative kmi = new KMeansIterative();

if (args.length < 5) {
System.err.println(kmi.getDescription());
System.exit(1);
}

Plan plan = kmi.getPlan(args);

// This will execute the kMeans clustering job embedded in a local context.
LocalExecutor.execute(plan);

}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 4353134

Please sign in to comment.