Skip to content

Commit

Permalink
[FLINK-1249] [APIs] [compiler] Add custom partitioner for CoGroup
Browse files Browse the repository at this point in the history
This closes apache#228.
  • Loading branch information
StephanEwen authored and tillrohrmann committed Nov 25, 2014
1 parent 4838efe commit bcdd167
Show file tree
Hide file tree
Showing 13 changed files with 844 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.compiler.DataStatistics;
Expand All @@ -36,9 +37,9 @@ public class CoGroupNode extends TwoInputNode {

private List<OperatorDescriptorDual> dataProperties;

public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> pactContract) {
super(pactContract);
this.dataProperties = initializeDataProperties();
public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) {
super(operator);
this.dataProperties = initializeDataProperties(operator.getCustomPartitioner());
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics
// for CoGroup, we currently make no reasonable default estimates
}

private List<OperatorDescriptorDual> initializeDataProperties() {
private List<OperatorDescriptorDual> initializeDataProperties(Partitioner<?> customPartitioner) {
Ordering groupOrder1 = null;
Ordering groupOrder2 = null;

Expand All @@ -95,6 +96,11 @@ private List<OperatorDescriptorDual> initializeDataProperties() {
groupOrder2 = null;
}

return Collections.<OperatorDescriptorDual>singletonList(new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2));
CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2);
if (customPartitioner != null) {
descr.setCustomPartitioner(customPartitioner);
}

return Collections.<OperatorDescriptorDual>singletonList(descr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
Expand All @@ -41,6 +42,8 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
private final Ordering ordering1; // ordering on the first input
private final Ordering ordering2; // ordering on the second input

private Partitioner<?> customPartitioner;


public CoGroupDescriptor(FieldList keys1, FieldList keys2) {
this(keys1, keys2, null, null);
Expand Down Expand Up @@ -84,6 +87,10 @@ public CoGroupDescriptor(FieldList keys1, FieldList keys2, Ordering additionalOr
}
}

public void setCustomPartitioner(Partitioner<?> customPartitioner) {
this.customPartitioner = customPartitioner;
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.CO_GROUP;
Expand All @@ -92,9 +99,19 @@ public DriverStrategy getStrategy() {
@Override
protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
partitioned1.setHashPartitioned(this.keys1);
if (this.customPartitioner == null) {
partitioned1.setAnyPartitioning(this.keys1);
} else {
partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
}

RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
partitioned2.setHashPartitioned(this.keys2);
if (this.customPartitioner == null) {
partitioned2.setAnyPartitioning(this.keys2);
} else {
partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
}

return Collections.singletonList(new GlobalPropertiesPair(partitioned1, partitioned2));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;

@SuppressWarnings({"serial","unchecked"})
public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase {

@Test
public void testCompatiblePartitioning() {
public void testCompatiblePartitioningJoin() {
try {
final Partitioner<Long> partitioner = new Partitioner<Long>() {
@Override
Expand Down Expand Up @@ -81,4 +82,49 @@ public int partition(Long key, int numPartitions) {
fail(e.getMessage());
}
}

@Test
public void testCompatiblePartitioningCoGroup() {
try {
final Partitioner<Long> partitioner = new Partitioner<Long>() {
@Override
public int partition(Long key, int numPartitions) {
return 0;
}
};

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));

input1.partitionCustom(partitioner, 1)
.coGroup(input2.partitionCustom(partitioner, 0))
.where(1).equalTo(0)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

SinkPlanNode sink = op.getDataSinks().iterator().next();
DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitioner1 = (SingleInputPlanNode) coGroup.getInput1().getSource();
SingleInputPlanNode partitioner2 = (SingleInputPlanNode) coGroup.getInput2().getSource();

assertEquals(ShipStrategyType.FORWARD, coGroup.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, coGroup.getInput2().getShipStrategy());

assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy());
assertEquals(partitioner, partitioner1.getInput().getPartitioner());
assertEquals(partitioner, partitioner2.getInput().getPartitioner());

new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
Loading

0 comments on commit bcdd167

Please sign in to comment.