Skip to content

Commit

Permalink
[FLINK-1883] [gelly] Connected Components example
Browse files Browse the repository at this point in the history
This is a squash of the following commits:

[FLINK-1883][gelly] Added Min Vertex Id Propagation library and example

[FLINK-1883][gelly] Renamed algorithm to Connected Components

[FLINK-1883][gelly] Made the CC library method match Spargel

[FLINK-1883][gelly] Polished the CC with Randomised Edges test

Closes apache#596
  • Loading branch information
andralungu authored and vasia committed May 1, 2015
1 parent 42acf5d commit f2e6fd3
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 47 deletions.
1 change: 1 addition & 0 deletions docs/libs/gelly_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
* Single-Source Shortest Paths
* Label Propagation
* Simple Community Detection
* Connected Components

Gelly's library methods can be used by simply calling the `run()` method on the input graph:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.example;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
import org.apache.flink.graph.library.ConnectedComponents;
import org.apache.flink.types.NullValue;

/**
* This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponents}
* library method:
* <ul>
* <li> with the edge data set given as a parameter
* <li> with default data
* </ul>
*
* The input file is a plain text file and must be formatted as follows:
* Edges are represented by tuples of srcVertexId, trgVertexId which are
* separated by tabs. Edges themselves are separated by newlines.
* For example: <code>1\t2\n1\t3\n</code> defines two edges,
* 1-2 with and 1-3.
*
* Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
* &lt;number of iterations&gt; </code><br>
* If no parameters are provided, the program is run with default data from
* {@link org.apache.flink.graph.example.utils.ConnectedComponentsExampleData}
*/
public class ConnectedComponentsExample implements ProgramDescription {

@SuppressWarnings("serial")
public static void main(String [] args) throws Exception {

if(!parseParameters(args)) {
return;
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);

Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value;
}
}, env);

DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
.run(new ConnectedComponents(maxIterations)).getVertices();

// emit result
if (fileOutput) {
verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
} else {
verticesWithMinIds.print();
}

env.execute("Connected Components Example");
}

@Override
public String getDescription() {
return "Connected Components Example";
}

// *************************************************************************
// UTIL METHODS
// *************************************************************************

private static boolean fileOutput = false;
private static String edgeInputPath = null;
private static String outputPath = null;
private static Integer maxIterations = ConnectedComponentsExampleData.MAX_ITERATIONS;

private static boolean parseParameters(String [] args) {
if(args.length > 0) {
if(args.length != 3) {
System.err.println("Usage ConnectedComponents <edge path> <output path> " +
"<num iterations>");
return false;
}

fileOutput = true;
edgeInputPath = args[0];
outputPath = args[1];
maxIterations = Integer.parseInt(args[2]);

} else {
System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
System.out.println("Provide parameters to read input data from files.");
System.out.println("Usage ConnectedComponents <edge path> <output path> " +
"<num iterations>");
}

return true;
}

private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {

if(fileOutput) {
return env.readCsvFile(edgeInputPath)
.ignoreComments("#")
.fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
@Override
public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
}
});
} else {
return ConnectedComponentsExampleData.getDefaultEdgeDataSet(env);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* <li> with default data
* </ul>
*
* The input files is a plain text file and must be formatted as follows:
* The input file is a plain text file and must be formatted as follows:
* Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
* separated by tabs. Edges themselves are separated by newlines.
* For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* using a vertex-centric iteration.
*
* The input file is expected to contain one edge per line, with long IDs
* and double weights, in CSV format:
* and double weights, separated by tabs, in the following format:
* "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
*
* If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.example.utils;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.types.NullValue;

import java.util.ArrayList;
import java.util.List;

/**
* Provides the default data sets used for the connected components example program.
* If no parameters are given to the program, the default data sets are used.
*/
public class ConnectedComponentsExampleData {

public static final Integer MAX_ITERATIONS = 4;

public static final String EDGES = "1 2\n" + "2 3\n" + "2 4\n" + "3 4";

public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance()));
edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));

return env.fromCollection(edges);
}

public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";

private ConnectedComponentsExampleData() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.library;

import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.NullValue;

/**
* Connected components algorithm.
*
* Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
* current component ID in iterations, each time adopting a new value from the received neighbor IDs,
* provided that the value is less than the current minimum.
*
* The algorithm converges when vertices no longer update their value or when the maximum number of iterations
* is reached.
*/
@SuppressWarnings("serial")
public class ConnectedComponents implements GraphAlgorithm<Long, Long, NullValue>{

private Integer maxIterations;

public ConnectedComponents(Integer maxIterations) {
this.maxIterations = maxIterations;
}

@Override
public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {

Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();

// initialize vertex values and run the Vertex Centric Iteration
return undirectedGraph.runVertexCentricIteration(new CCUpdater(),
new CCMessenger(), maxIterations);
}

/**
* Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
*/
public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {

@Override
public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
long min = Long.MAX_VALUE;

for (long msg : messages) {
min = Math.min(min, msg);
}

// update vertex value, if new minimum
if (min < currentMin) {
setNewVertexValue(min);
}
}
}

/**
* Distributes the minimum ID associated with a given vertex among all the target vertices.
*/
public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {

@Override
public void sendMessages(Long id, Long currentMin) throws Exception {
// send current minimum to neighbors
sendMessageToAllNeighbors(currentMin);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.test.example;

import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.graph.example.ConnectedComponentsExample;
import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;

@RunWith(Parameterized.class)
public class ConnectedComponentsITCase extends MultipleProgramsTestBase {

private String edgesPath;

private String resultPath;

private String expected;

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();

public ConnectedComponentsITCase(TestExecutionMode mode) {
super(mode);
}

@Before
public void before() throws Exception {
resultPath = tempFolder.newFile().toURI().toString();

File edgesFile = tempFolder.newFile();
Files.write(ConnectedComponentsExampleData.EDGES, edgesFile, Charsets.UTF_8);
edgesPath = edgesFile.toURI().toString();
}

@Test
public void testConnectedComponentsExample() throws Exception {
ConnectedComponentsExample.main(new String[]{edgesPath, resultPath, ConnectedComponentsExampleData.MAX_ITERATIONS + ""});
expected = ConnectedComponentsExampleData.VERTICES_WITH_MIN_ID;
}

@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
}
}
Loading

0 comments on commit f2e6fd3

Please sign in to comment.