Skip to content

Commit

Permalink
[FLINK-6393] [gelly] Add Circulant and Echo graph generators
Browse files Browse the repository at this point in the history
This closes apache#3802
  • Loading branch information
fanzhidongyzby authored and greghogan committed May 11, 2017
1 parent 54c8826 commit 3ee8c69
Show file tree
Hide file tree
Showing 12 changed files with 926 additions and 70 deletions.
91 changes: 81 additions & 10 deletions docs/dev/libs/gelly/graph_generators.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,42 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDime
</div>
</div>

## Circulant Graph

A [circulant graph](http:https://mathworld.wolfram.com/CirculantGraph.html) is an
[oriented graph](http:https://mathworld.wolfram.com/OrientedGraph.html) configured
with one or more contiguous ranges of offsets. Edges connect integer vertex IDs
whose difference equals a configured offset. The circulant graph with no offsets
is the [empty graph](#empty-graph) and the graph with the maximum range is the
[complete graph](#complete-graph).

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5;

Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, vertexCount)
.addRange(1, 2)
.generate();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.scala._
import org.apache.flink.graph.generator.CirculantGraph

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

val vertexCount = 5

val graph = new CirculantGraph(env.getJavaEnv, vertexCount).addRange(1, 2).generate()
{% endhighlight %}
</div>
</div>

## Complete Graph

An undirected graph connecting every distinct pair of vertices.
Expand All @@ -83,7 +119,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5;

Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -148,7 +184,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5;

Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, vertexCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -193,6 +229,41 @@ val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
<text x="51" y="199">4</text>
</svg>

## Echo Graph

An [echo graph](http:https://mathworld.wolfram.com/EchoGraph.html) is a
[circulant graph](#circulant-graph) with `n` vertices defined by the width of a
single range of offsets centered at `n/2`. A vertex is connected to 'far'
vertices, which connect to 'near' vertices, which connect to 'far' vertices, ....

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5;
long vertexDegree = 2;

Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree)
.generate();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.scala._
import org.apache.flink.graph.generator.EchoGraph

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

val vertexCount = 5
val vertexDegree = 2

val graph = new EchoGraph(env.getJavaEnv, vertexCount, vertexDegree).generate()
{% endhighlight %}
</div>
</div>

## Empty Graph

A graph containing no edges.
Expand All @@ -204,7 +275,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5;

Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, vertexCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -257,7 +328,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

boolean wrapEndpoints = false;

Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
.addDimension(2, wrapEndpoints)
.addDimension(4, wrapEndpoints)
.generate();
Expand Down Expand Up @@ -327,7 +398,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long dimensions = 3;

Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions)
Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -403,7 +474,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 5

Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -464,7 +535,7 @@ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory()
int vertexCount = 1 << scale;
int edgeCount = edgeFactor * vertexCount;

Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -505,7 +576,7 @@ int edgeCount = edgeFactor * vertexCount;

boolean clipAndFlip = false;

Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.setConstants(0.57f, 0.19f, 0.19f)
.setNoise(true, 0.10f)
.generate();
Expand Down Expand Up @@ -542,7 +613,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long vertexPairCount = 4

// note: configured with the number of vertex pairs
Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
.generate();
{% endhighlight %}
</div>
Expand Down Expand Up @@ -607,7 +678,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

long vertexCount = 6;

Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount)
.generate();
{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.flink.graph.drivers.JaccardIndex;
import org.apache.flink.graph.drivers.PageRank;
import org.apache.flink.graph.drivers.TriangleListing;
import org.apache.flink.graph.drivers.input.CirculantGraph;
import org.apache.flink.graph.drivers.input.CompleteGraph;
import org.apache.flink.graph.drivers.input.CycleGraph;
import org.apache.flink.graph.drivers.input.EchoGraph;
import org.apache.flink.graph.drivers.input.EmptyGraph;
import org.apache.flink.graph.drivers.input.GridGraph;
import org.apache.flink.graph.drivers.input.HypercubeGraph;
Expand Down Expand Up @@ -76,9 +78,11 @@ public class Runner {
private static final String OUTPUT = "output";

private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>()
.addClass(CirculantGraph.class)
.addClass(CompleteGraph.class)
.addClass(org.apache.flink.graph.drivers.input.CSV.class)
.addClass(CycleGraph.class)
.addClass(EchoGraph.class)
.addClass(EmptyGraph.class)
.addClass(GridGraph.class)
.addClass(HypercubeGraph.class)
Expand Down Expand Up @@ -236,7 +240,12 @@ public static void main(String[] args) throws Exception {

// Input

input.configure(parameters);
try {
input.configure(parameters);
} catch (RuntimeException ex) {
throw new ProgramParametrizationException(ex.getMessage());
}

Graph graph = input.create(env);

// Algorithm
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.drivers.input;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.generator.CirculantGraph.OffsetRange;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;

/**
* Generate a {@link org.apache.flink.graph.generator.CirculantGraph}.
*/
public class CirculantGraph
extends GeneratedGraph<LongValue> {

private static final String PREFIX = "range";

private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT);

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

private List<OffsetRange> offsetRanges = new ArrayList<>();

@Override
public String getName() {
return CirculantGraph.class.getSimpleName();
}

@Override
public String getUsage() {
return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]]"
+ super.getUsage();
}

@Override
public void configure(ParameterTool parameterTool) throws ProgramParametrizationException {
super.configure(parameterTool);

// add offset ranges as ordered by offset ID (range0, range1, range2, ...)

Map<Integer, String> offsetRangeMap = new TreeMap<>();

// first parse all offset ranges into a sorted map
for (String key : parameterTool.toMap().keySet()) {
if (key.startsWith(PREFIX)) {
int offsetId = Integer.parseInt(key.substring(PREFIX.length()));
offsetRangeMap.put(offsetId, parameterTool.get(key));
}
}

// then store offset ranges in order
for (String field : offsetRangeMap.values()) {
ProgramParametrizationException exception = new ProgramParametrizationException("Circulant offset range" +
" must use a colon to separate the integer offset and integer length:" + field + "'");

if (! field.contains(":")) {
throw exception;
}

String[] parts = field.split(":");

if (parts.length != 2) {
throw exception;
}

try {
long offset = Long.parseLong(parts[0]);
long length = Long.parseLong(parts[1]);
offsetRanges.add(new OffsetRange(offset, length));
} catch (NumberFormatException ex) {
throw exception;
}
}
}

@Override
public String getIdentity() {
return getTypeName() + " " + getName() + " (" + offsetRanges + ")";
}

@Override
protected long vertexCount() {
return vertexCount.getValue();
}

@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
org.apache.flink.graph.generator.CirculantGraph graph = new org.apache.flink.graph.generator.CirculantGraph(env,
vertexCount.getValue());

for (OffsetRange offsetRange : offsetRanges) {
graph.addRange(offsetRange.getOffset(), offsetRange.getLength());
}

return graph
.setParallelism(littleParallelism.getValue().intValue())
.generate();
}
}
Loading

0 comments on commit 3ee8c69

Please sign in to comment.