Skip to content

Commit

Permalink
Remove Record API dependencies from CC iteration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fhueske committed Nov 24, 2015
1 parent 8543dd9 commit b640c01
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import java.io.BufferedReader;

import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.test.util.JavaProgramTestBase;


public class ConnectedComponentsITCase extends RecordAPITestBase {
public class ConnectedComponentsITCase extends JavaProgramTestBase {

private static final long SEED = 0xBADC0FFEEBEEFL;

Expand All @@ -40,22 +46,44 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
protected String edgesPath;
protected String resultPath;

public ConnectedComponentsITCase(){
setTaskManagerNumSlots(parallelism);
}


@Override
protected void preSubmit() throws Exception {
verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
resultPath = getTempFilePath("results");
}

@Override
protected Plan getTestJob() {
WorksetConnectedComponents cc = new WorksetConnectedComponents();
return cc.getPlan(Integer.valueOf(parallelism).toString(), verticesPath, edgesPath, resultPath, "100");
protected void testProgram() throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);

DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new ConnectedComponents.UndirectEdge());

// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);

// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new ConnectedComponents.ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

result.writeAsCsv(resultPath, "\n", " ");

// execute program
env.execute("Connected Components Example");
}

@Override
Expand All @@ -64,4 +92,12 @@ protected void postSubmit() throws Exception {
ConnectedComponentsData.checkOddEvenResult(reader);
}
}

public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> {

@Override
public Tuple2<T, T> map(Tuple1<T> vertex) {
return new Tuple2<>(vertex.f0, vertex.f0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,28 @@
package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.io.Serializable;
import java.util.Collection;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.DeltaIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.MinimumComponentIDReduce;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@SuppressWarnings("deprecation")
@RunWith(Parameterized.class)
public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBase {
public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTestBase {

private static final long SEED = 0xBADC0FFEEBEEFL;

Expand All @@ -65,20 +57,64 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa

public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(parallelism);
}

@Override
protected void preSubmit() throws Exception {
verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
resultPath = getTempFilePath("results");
}

@Override
protected Plan getTestJob() {
protected void testProgram() throws Exception {
boolean extraMapper = config.getBoolean("ExtraMapper", false);
return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100, extraMapper);

// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);

DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new ConnectedComponents.UndirectEdge());

// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);

// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
.join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new UpdateComponentIdMatchNonPreserving());

DataSet<Tuple2<Long,Long>> delta;
if(extraMapper) {
delta = changes.map(
// ID Mapper
new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> v) throws Exception {
return v;
}
});
}
else {
delta = changes;
}

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(delta, changes);

result.writeAsCsv(resultPath, "\n", " ");

// execute program
env.execute("Connected Components Example");
}

@Override
Expand All @@ -98,84 +134,21 @@ public static Collection<Object[]> getConfigurations() {

return toParameterList(config1, config2);
}

@SuppressWarnings("unchecked")
public static Plan getPlan(int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations, boolean extraMap) {

// data source for initial vertices
FileDataSource initialVertices = new FileDataSource(new CsvInputFormat(' ', LongValue.class), verticesInput, "Vertices");

MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(initialVertices).name("Assign Vertex Ids").build();

// the loop takes the vertices as the solution set and changed vertices as the workset
// initially, all vertices are changed
DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
iteration.setInitialSolutionSet(verticesWithId);
iteration.setInitialWorkset(verticesWithId);
iteration.setMaximumNumberOfIterations(maxIterations);

// data source for the edges
FileDataSource edges = new FileDataSource(new CsvInputFormat(' ', LongValue.class, LongValue.class), edgeInput, "Edges");

// join workset (changed vertices) with the edges to propagate changes to neighbors
JoinOperator joinWithNeighbors = JoinOperator.builder(new NeighborWithComponentIDJoin(), LongValue.class, 0, 0)
.input1(iteration.getWorkset())
.input2(edges)
.name("Join Candidate Id With Neighbor")
.build();

// find for each neighbor the smallest of all candidates
ReduceOperator minCandidateId = ReduceOperator.builder(new MinimumComponentIDReduce(), LongValue.class, 0)
.input(joinWithNeighbors)
.name("Find Minimum Candidate Id")
.build();

// join candidates with the solution set and update if the candidate component-id is smaller
JoinOperator updateComponentId = JoinOperator.builder(new UpdateComponentIdMatchNonPreserving(), LongValue.class, 0, 0)
.input1(minCandidateId)
.input2(iteration.getSolutionSet())
.name("Update Component Id")
.build();

if (extraMap) {
MapOperator mapper = MapOperator.builder(IdentityMap.class).input(updateComponentId).name("idmap").build();
iteration.setSolutionSetDelta(mapper);
} else {
iteration.setSolutionSetDelta(updateComponentId);
}

iteration.setNextWorkset(updateComponentId);

// sink is the iteration result
FileDataSink result = new FileDataSink(new CsvOutputFormat("\n", " ", LongValue.class, LongValue.class), output, iteration, "Result");

// return the PACT plan
Plan plan = new Plan(result, "Workset Connected Components");
plan.setDefaultParallelism(numSubTasks);
return plan;
}

public static final class UpdateComponentIdMatchNonPreserving extends JoinFunction implements Serializable {
public static final class UpdateComponentIdMatchNonPreserving
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;

@Override
public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record> out){

long candidateComponentID = newVertexWithComponent.getField(1, LongValue.class).getValue();
long currentComponentID = currentVertexWithComponent.getField(1, LongValue.class).getValue();
if (candidateComponentID < currentComponentID) {
out.collect(newVertexWithComponent);
public void join(
Tuple2<Long, Long> candidate,
Tuple2<Long, Long> current,
Collector<Tuple2<Long, Long>> out) throws Exception {

if(candidate.f1 < current.f1) {
out.collect(candidate);
}
}
}

public static final class IdentityMap extends MapFunction {
private static final long serialVersionUID = 1L;

@Override
public void map(Record record, Collector<Record> out) throws Exception {
out.collect(record);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected void testProgram() throws Exception {
.flatMap(new UndirectEdge());

// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
Expand All @@ -98,11 +98,4 @@ protected void testProgram() throws Exception {
env.execute("Connected Components Example");
}

public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>, Tuple2<T, T>> {

@Override
public Tuple2<T, T> map(Tuple1<T> vertex) {
return new Tuple2<T, T>(vertex.f0, vertex.f0);
}
}
}
Loading

0 comments on commit b640c01

Please sign in to comment.