Skip to content

Commit

Permalink
[FLINK-3208] [gelly] rename vertex-centric iteration model to scatter…
Browse files Browse the repository at this point in the history
…-gather

This closes apache#1514
  • Loading branch information
vasia committed Feb 2, 2016
1 parent 233c014 commit f867746
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 174 deletions.
104 changes: 52 additions & 52 deletions docs/libs/gelly_guide.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
import org.apache.flink.{graph => jg}
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
Expand Down Expand Up @@ -1027,39 +1027,39 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
}

/**
* Runs a Vertex-Centric iteration on the graph.
* Runs a scatter-gather iteration on the graph.
* No configuration options are provided.
*
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maxIterations maximum number of iterations to perform
*
* @return the updated Graph after the vertex-centric iteration has converged or
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
messagingFunction: MessagingFunction[K, VV, M, EV],
maxIterations: Int): Graph[K, VV, EV] = {
wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
maxIterations))
}

/**
* Runs a Vertex-Centric iteration on the graph with configuration options.
* Runs a scatter-gather iteration on the graph with configuration options.
*
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maxIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
*
* @return the updated Graph after the vertex-centric iteration has converged or
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
def runScatterGatherIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
messagingFunction: MessagingFunction[K, VV, M, EV],
maxIterations: Int, parameters: VertexCentricConfiguration):
maxIterations: Int, parameters: ScatterGatherConfiguration):
Graph[K, VV, EV] = {
wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
wrapGraph(jgraph.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
maxIterations, parameters))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData

/**
* This example shows how to use Gelly's vertex-centric iterations.
* This example shows how to use Gelly's scatter-gather iterations.
*
* It is an implementation of the Single-Source-Shortest-Paths algorithm.
*
Expand All @@ -54,8 +54,8 @@ object SingleSourceShortestPaths {
val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)

// Execute the vertex-centric iteration
val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
// Execute the scatter-gather iteration
val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
new MinDistanceMessenger, maxIterations)

// Extract the vertices as the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.ScatterGatherIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
Expand Down Expand Up @@ -1585,42 +1585,42 @@ public void coGroup(Iterable<Edge<K, EV>> edgesLeft, Iterable<Edge<K, EV>> edges
}

/**
* Runs a Vertex-Centric iteration on the graph.
* Runs a ScatterGather iteration on the graph.
* No configuration options are provided.
*
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform
*
* @return the updated Graph after the vertex-centric iteration has converged or
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runVertexCentricIteration(
public <M> Graph<K, VV, EV> runScatterGatherIteration(
VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations) {

return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
return this.runScatterGatherIteration(vertexUpdateFunction, messagingFunction,
maximumNumberOfIterations, null);
}

/**
* Runs a Vertex-Centric iteration on the graph with configuration options.
* Runs a ScatterGather iteration on the graph with configuration options.
*
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
*
* @return the updated Graph after the vertex-centric iteration has converged or
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
public <M> Graph<K, VV, EV> runVertexCentricIteration(
public <M> Graph<K, VV, EV> runScatterGatherIteration(
VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
int maximumNumberOfIterations, ScatterGatherConfiguration parameters) {

VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
ScatterGatherIteration<K, VV, M, EV> iteration = ScatterGatherIteration.withEdges(
edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);

iteration.configure(parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
throw new RuntimeException("The edges input file cannot be null!");
}

@SuppressWarnings("serial")
DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.flink.graph.example.utils.IncrementalSSSPData;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;

/**
* This example illustrates how to
* <ul>
* <li> create a Graph directly from CSV files
* <li> use the vertex-centric iteration's messaging direction configuration option
* <li> use the scatter-gather iteration's messaging direction configuration option
* </ul>
*
* Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
Expand Down Expand Up @@ -89,15 +89,15 @@ public static void main(String [] args) throws Exception {
graph.removeEdge(edgeToBeRemoved);

// configure the iteration
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();

if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {

parameters.setDirection(EdgeDirection.IN);
parameters.setOptDegrees(true);

// run the vertex centric iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
// run the scatter-gather iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);

DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;

/**
* This example shows how to use Gelly's vertex-centric iterations.
* This example shows how to use Gelly's scatter-gather iterations.
*
* It is an implementation of the Single-Source-Shortest-Paths algorithm.
* For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}.
Expand Down Expand Up @@ -60,8 +60,8 @@ public static void main(String[] args) throws Exception {

Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);

// Execute the vertex-centric iteration
Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
// Execute the scatter-gather iteration
Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);

// Extract the vertices as the result
Expand Down Expand Up @@ -196,6 +196,6 @@ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment

@Override
public String getDescription() {
return "Vertex-centric Single Source Shortest Paths";
return "Scatter-gather Single Source Shortest Paths";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();

return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
return graphWithScoredVertices.runScatterGatherIteration(new VertexLabelUpdater<K>(delta),
new LabelMessenger<K>(), maxIterations)
.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.types.NullValue;

/**
* A vertex-centric implementation of the Weakly Connected Components algorithm.
* A scatter-gather implementation of the Weakly Connected Components algorithm.
*
* This implementation assumes that the vertex values of the input Graph are initialized with Long component IDs.
* The vertices propagate their current component ID in iterations.
Expand Down Expand Up @@ -66,8 +66,8 @@ public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
.getUndirected();

// initialize vertex values and run the Vertex Centric Iteration
return undirectedGraph.runVertexCentricIteration(
// initialize vertex values and run the Scatter-Gather Iteration
return undirectedGraph.runScatterGatherIteration(
new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
.getVertices();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public DataSet<Vertex<K, VV>> run(Graph<K, VV, EV> input) {
// iteratively adopt the most frequent label among the neighbors of each vertex
return input
.mapEdges(new NullValueEdgeMapper<K, EV>())
.runVertexCentricIteration(
.runScatterGatherIteration(
new UpdateVertexLabel<K, VV>(), new SendNewLabelToNeighbors<K, VV>(valueType), maxIterations)
.getVertices();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.graph.spargel.VertexUpdateFunction;

/**
* This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
* This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
* The user can define the damping factor and the maximum number of iterations.
* If the number of vertices of the input graph is known, it should be provided as a parameter
* to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
Expand Down Expand Up @@ -88,7 +88,7 @@ public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws E
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());

return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
new RankMessenger<K>(numberOfVertices), maxIterations)
.getVertices();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.graph.spargel.VertexUpdateFunction;

/**
* This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
* This is an implementation of the Single-Source-Shortest Paths algorithm, using a scatter-gather iteration.
*/
@SuppressWarnings("serial")
public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
Expand All @@ -52,7 +52,7 @@ public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {

return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
.runScatterGatherIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
maxIterations).getVertices();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.util.Collector;

/**
* The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
* The base class for functions that produce messages between vertices as a part of a {@link ScatterGatherIteration}.
*
* @param <K> The type of the vertex key (the vertex identifier).
* @param <VV> The type of the vertex value (the state of the vertex).
Expand Down Expand Up @@ -66,7 +66,7 @@ void setNumberOfVertices(long numberOfVertices) {

// --------------------------------------------------------------------------------------------
// Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
// the vertex centric iteration.
// the scatter gather iteration.
// --------------------------------------------------------------------------------------------

private EdgeDirection direction;
Expand Down Expand Up @@ -233,7 +233,7 @@ public <T extends Value> T getPreviousIterationAggregate(String name) {
/**
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
* {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
* {@link org.apache.flink.graph.spargel.ScatterGatherConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
*
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
import java.util.List;

/**
* A VertexCentricConfiguration object can be used to set the iteration name and
* A ScatterGatherConfiguration object can be used to set the iteration name and
* degree of parallelism, to register aggregators and use broadcast sets in
* the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
*
* The VertexCentricConfiguration object is passed as an argument to
* {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
* {@link org.apache.flink.graph.Graph#runScatterGatherIteration (
* org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
* VertexCentricConfiguration)}.
* ScatterGatherConfiguration)}.
*/
public class VertexCentricConfiguration extends IterationConfiguration {
public class ScatterGatherConfiguration extends IterationConfiguration {

/** the broadcast variables for the update function **/
private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
Expand All @@ -50,7 +50,7 @@ public class VertexCentricConfiguration extends IterationConfiguration {
/** the direction in which the messages should be sent **/
private EdgeDirection direction = EdgeDirection.OUT;

public VertexCentricConfiguration() {}
public ScatterGatherConfiguration() {}

/**
* Adds a data set as a broadcast set to the messaging function.
Expand Down
Loading

0 comments on commit f867746

Please sign in to comment.