Skip to content

Commit

Permalink
[FLINK-1254] [compiler] Fix compiler bug for pipeline breaker placement
Browse files Browse the repository at this point in the history
This closes apache#216
  • Loading branch information
StephanEwen committed Nov 19, 2014
1 parent 54aa41b commit ce822bf
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.compiler.plan;

import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
Expand All @@ -36,7 +35,7 @@
import org.apache.flink.util.Visitor;

/**
*
* A node in the execution, representing a workset iteration (delta iteration).
*/
public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {

Expand Down Expand Up @@ -66,7 +65,7 @@ public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName,
SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
{
super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.NONE);
super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
this.solutionSetPlanNode = solutionSetPlanNode;
this.worksetPlanNode = worksetPlanNode;
this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -227,6 +228,40 @@ public void testIterationPushingWorkOut() throws Exception {
}
}

@Test
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);

// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);

DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());

DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);

initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.print();

Plan p = env.createProgramPlan();
compileNoStats(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

// --------------------------------------------------------------------------------------------

public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {

// open a bulk iteration
Expand Down Expand Up @@ -270,6 +305,8 @@ public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long,

}

// --------------------------------------------------------------------------------------------

public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;

import com.google.common.base.Preconditions;

/**
* A DataSet represents a collection of elements of the same type.<br/>
* A DataSet can be transformed into another DataSet by applying a transformation as for example
Expand Down Expand Up @@ -847,6 +849,9 @@ public IterativeDataSet<T> iterate(int maxIterations) {
* @see org.apache.flink.api.java.operators.DeltaIteration
*/
public <R> DeltaIteration<T, R> iterateDelta(DataSet<R> workset, int maxIterations, int... keyPositions) {
Preconditions.checkNotNull(workset);
Preconditions.checkNotNull(keyPositions);

Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<T>(keyPositions, getType(), false);
return new DeltaIteration<T, R>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations);
}
Expand Down

0 comments on commit ce822bf

Please sign in to comment.