Skip to content

Commit

Permalink
[FLINK-7199] [gelly] Graph simplification does not set parallelism
Browse files Browse the repository at this point in the history
The Simplify parameter should accept and set the parallelism when
calling the Simplify algorithms.

The LocalClusteringCoefficient "count triangles" reduce now uses the
assigned ("little") parallelism as this computation is proportional to
the number of vertices (the combine computation is proportional to the
potentially much larger number of triangles).

The ignored CombineHint on the HITS all-reduces have been removed.

This closes #4346
  • Loading branch information
greghogan committed Sep 15, 2017
1 parent 9437a0f commit 2ac09c0
Show file tree
Hide file tree
Showing 22 changed files with 496 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public class Runner
.addClass(Hash.class)
.addClass(Print.class);

// parameters

private final ParameterTool parameters;

private final BooleanParameter disableObjectReuse = new BooleanParameter(this, "__disable_object_reuse");
Expand All @@ -133,6 +135,18 @@ public class Runner
private StringParameter jobName = new StringParameter(this, "__job_name")
.setDefaultValue(null);

// state

private ExecutionEnvironment env;

private DataSet result;

private String executionName;

private Driver algorithm;

private Output output;

/**
* Create an algorithm runner from the given arguments.
*
Expand All @@ -147,6 +161,26 @@ public String getName() {
return this.getClass().getSimpleName();
}

/**
* Get the ExecutionEnvironment. The ExecutionEnvironment is only available
* after calling {@link Runner#run()}.
*
* @return the ExecutionEnvironment
*/
public ExecutionEnvironment getExecutionEnvironment() {
return env;
}

/**
* Get the result DataSet. The result is only available after calling
* {@link Runner#run()}.
*
* @return the result DataSet
*/
public DataSet getResult() {
return result;
}

/**
* List available algorithms. This is displayed to the user when no valid
* algorithm is given in the program parameterization.
Expand Down Expand Up @@ -246,9 +280,17 @@ private void parameterize(Parameterized parameterized) {
}
}

public void run() throws Exception {
/**
* Setup the Flink job with the graph input, algorithm, and output.
*
* <p>To then execute the job call {@link #execute}.
*
* @return this
* @throws Exception on error
*/
public Runner run() throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();

// should not have any non-Flink data types
Expand Down Expand Up @@ -282,7 +324,7 @@ public void run() throws Exception {
}

String algorithmName = parameters.get(ALGORITHM);
Driver algorithm = driverFactory.get(algorithmName);
algorithm = driverFactory.get(algorithmName);

if (algorithm == null) {
throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName);
Expand Down Expand Up @@ -314,7 +356,7 @@ public void run() throws Exception {
}

String outputName = parameters.get(OUTPUT);
Output output = outputFactory.get(outputName);
output = outputFactory.get(outputName);

if (output == null) {
throw new ProgramParametrizationException("Unknown output type: " + outputName);
Expand Down Expand Up @@ -358,10 +400,10 @@ public void run() throws Exception {
}

// Run algorithm
DataSet results = algorithm.plan(graph);
result = algorithm.plan(graph);

// Output
String executionName = jobName.getValue() != null ? jobName.getValue() + ": " : "";
executionName = jobName.getValue() != null ? jobName.getValue() + ": " : "";

executionName += input.getIdentity() + " ⇨ " + algorithmName + " ⇨ " + output.getName();

Expand All @@ -386,18 +428,29 @@ public void run() throws Exception {
throw new ProgramParametrizationException(ex.getMessage());
}

if (results == null) {
env.execute(executionName);
} else {
if (result != null) {
// Transform output if algorithm returned result DataSet
if (transforms.size() > 0) {
Collections.reverse(transforms);
for (Transform transform : transforms) {
results = (DataSet) transform.transformResult(results);
result = (DataSet) transform.transformResult(result);
}
}
}

return this;
}

output.write(executionName.toString(), System.out, results);
/**
* Execute the Flink job.
*
* @throws Exception on error
*/
private void execute() throws Exception {
if (result == null) {
env.execute(executionName);
} else {
output.write(executionName.toString(), System.out, result);
}

System.out.println();
Expand Down Expand Up @@ -450,7 +503,7 @@ private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsP
}

public static void main(String[] args) throws Exception {
new Runner(args).run();
new Runner(args).run().execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Ex
throw new ProgramParametrizationException("Unknown type '" + type.getValue() + "'");
}

return simplify.simplify(graph);
return simplify.simplify(graph, parallelism.getValue().intValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.graph.drivers.input;

import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.transform.GraphKeyTypeTransform;
import org.apache.flink.graph.drivers.transform.Transform;
import org.apache.flink.graph.drivers.transform.Transformable;
Expand All @@ -27,8 +26,6 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Base class for generated graphs.
*
Expand All @@ -43,9 +40,6 @@ public List<Transform> getTransformers() {
return Arrays.<Transform>asList(new GraphKeyTypeTransform(vertexCount()));
}

protected LongParameter parallelism = new LongParameter(this, "__parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

/**
* The vertex count is verified to be no greater than the capacity of the
* selected data type. All vertices must be counted even if skipped or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env)

// simplify after the translation to improve the performance of the
// simplify operators by processing smaller data types
return simplify.simplify(graph);
return simplify.simplify(graph, parallelism.getValue().intValue());
}

public abstract Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.apache.flink.graph.drivers.input;

import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Base class for inputs.
*
Expand All @@ -31,4 +34,14 @@
public abstract class InputBase<K, VV, EV>
extends ParameterizedBase
implements Input<K, VV, EV> {

protected LongParameter parallelism = new LongParameter(this, "__parallelism")
.setDefaultValue(PARALLELISM_DEFAULT)
.setMinimumValue(1)
.setMaximumValue(Integer.MAX_VALUE);

@Override
public String getName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.flink.api.java.utils.ParameterTool;

/**
* A {@link Parameter} storing a {@link Long}.
* A {@link Parameter} storing a {@link Long} within <tt>min</tt> and
* <tt>max</tt> bounds (inclusive).
*
* <p>Note that the default value may be outside of these bounds.
*/
public class LongParameter
extends SimpleParameter<Long> {
Expand All @@ -46,36 +49,29 @@ public LongParameter(ParameterizedBase owner, String name) {
/**
* Set the default value.
*
* <p>The default may set to any value and is not restricted by setting the
* minimum or maximum values.
*
* @param defaultValue the default value.
* @return this
*/
public LongParameter setDefaultValue(long defaultValue) {
super.setDefaultValue(defaultValue);

if (hasMinimumValue) {
Util.checkParameter(defaultValue >= minimumValue,
"Default value (" + defaultValue + ") must be greater than or equal to minimum (" + minimumValue + ")");
}

if (hasMaximumValue) {
Util.checkParameter(defaultValue <= maximumValue,
"Default value (" + defaultValue + ") must be less than or equal to maximum (" + maximumValue + ")");
}

return this;
}

/**
* Set the minimum value.
*
* <p>If a maximum value has been set then the minimum value must not be
* greater than the maximum value.
*
* @param minimumValue the minimum value
* @return this
*/
public LongParameter setMinimumValue(long minimumValue) {
if (hasDefaultValue) {
Util.checkParameter(minimumValue <= defaultValue,
"Minimum value (" + minimumValue + ") must be less than or equal to default (" + defaultValue + ")");
} else if (hasMaximumValue) {
if (hasMaximumValue) {
Util.checkParameter(minimumValue <= maximumValue,
"Minimum value (" + minimumValue + ") must be less than or equal to maximum (" + maximumValue + ")");
}
Expand All @@ -89,14 +85,14 @@ public LongParameter setMinimumValue(long minimumValue) {
/**
* Set the maximum value.
*
* <p>If a minimum value has been set then the maximum value must not be
* less than the minimum value.
*
* @param maximumValue the maximum value
* @return this
*/
public LongParameter setMaximumValue(long maximumValue) {
if (hasDefaultValue) {
Util.checkParameter(maximumValue >= defaultValue,
"Maximum value (" + maximumValue + ") must be greater than or equal to default (" + defaultValue + ")");
} else if (hasMinimumValue) {
if (hasMinimumValue) {
Util.checkParameter(maximumValue >= minimumValue,
"Maximum value (" + maximumValue + ") must be greater than or equal to minimum (" + minimumValue + ")");
}
Expand All @@ -109,16 +105,21 @@ public LongParameter setMaximumValue(long maximumValue) {

@Override
public void configure(ParameterTool parameterTool) {
value = hasDefaultValue ? parameterTool.getLong(name, defaultValue) : parameterTool.getLong(name);

if (hasMinimumValue) {
Util.checkParameter(value >= minimumValue,
name + " must be greater than or equal to " + minimumValue);
}

if (hasMaximumValue) {
Util.checkParameter(value <= maximumValue,
name + " must be less than or equal to " + maximumValue);
if (hasDefaultValue && !parameterTool.has(name)) {
// skip checks for min and max when using default value
value = defaultValue;
} else {
value = parameterTool.getLong(name);

if (hasMinimumValue) {
Util.checkParameter(value >= minimumValue,
name + " must be greater than or equal to " + minimumValue);
}

if (hasMaximumValue) {
Util.checkParameter(value <= maximumValue,
name + " must be less than or equal to " + maximumValue);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,23 @@ public Ordering getValue() {
* @return output graph
* @throws Exception on error
*/
public <T extends Comparable<T>> Graph<T, NullValue, NullValue> simplify(Graph<T, NullValue, NullValue> graph)
public <T extends Comparable<T>> Graph<T, NullValue, NullValue> simplify(Graph<T, NullValue, NullValue> graph, int parallelism)
throws Exception {

switch (value) {
case DIRECTED:
graph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<>());
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<T, NullValue, NullValue>()
.setParallelism(parallelism));
break;
case UNDIRECTED:
graph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<>(false));
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(false)
.setParallelism(parallelism));
break;
case UNDIRECTED_CLIP_AND_FLIP:
graph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<>(true));
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<T, NullValue, NullValue>(true)
.setParallelism(parallelism));
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ public void testPrintWithRMatGraph() throws Exception {

expectedCount(parameters(8, "print"), 39276);
}

@Test
public void testParallelism() throws Exception {
TestUtils.verifyParallelism(parameters(8, "print"),
"FlatMap \\(Mirror results\\)",
"GroupReduce \\(Compute scores\\)",
"GroupReduce \\(Generate group pairs\\)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,18 @@ public void testHashWithUndirectedRMatGraph() throws Exception {
expectedOutput(parameters(8, "undirected", "undirected", "hash"),
"\n" + new Checksum(233, 0x000000743ef6d14bL) + expected);
}

@Test
public void testParallelism() throws Exception {
String[] largeOperators = new String[]{
"Combine \\(Count triangles\\)",
"FlatMap \\(Split triangle vertices\\)",
"Join \\(Triangle listing\\)",
"GroupReduce \\(Generate triplets\\)",
"DataSink \\(Count\\)"};

TestUtils.verifyParallelism(parameters(8, "directed", "directed", "print"), largeOperators);
TestUtils.verifyParallelism(parameters(8, "directed", "undirected", "print"), largeOperators);
TestUtils.verifyParallelism(parameters(8, "undirected", "undirected", "print"), largeOperators);
}
}
Loading

0 comments on commit 2ac09c0

Please sign in to comment.