Skip to content

Commit

Permalink
[FLINK-333] [apis] Forward crossWithSmall and crossWithLarge hints to…
Browse files Browse the repository at this point in the history
… optimizer.
  • Loading branch information
StephanEwen committed Jan 8, 2015
1 parent 39fb7c9 commit 0954c4e
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
Expand All @@ -48,21 +49,16 @@ public class CrossNode extends TwoInputNode {
public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) {
super(operation);

// check small / large hints to decide upon which side is to be broadcasted
boolean allowBCfirst = true;
boolean allowBCsecond = true;

if (operation instanceof CrossOperatorBase.CrossWithSmall) {
allowBCfirst = false;
}
else if (operation instanceof CrossOperatorBase.CrossWithLarge) {
allowBCsecond = false;
}

Configuration conf = operation.getParameters();
String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);

CrossHint hint = operation.getCrossHint();

if (localStrategy != null) {

final boolean allowBCfirst = hint != CrossHint.SECOND_IS_SMALL;
final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL;

final OperatorDescriptorDual fixedDriverStrat;
if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
Expand All @@ -78,13 +74,13 @@ else if (operation instanceof CrossOperatorBase.CrossWithLarge) {

this.dataProperties = Collections.singletonList(fixedDriverStrat);
}
else if (operation instanceof CrossOperatorBase.CrossWithSmall) {
else if (hint == CrossHint.SECOND_IS_SMALL) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterSecondDescriptor(false, true));
list.add(new CrossStreamOuterFirstDescriptor(false, true));
this.dataProperties = list;
}
else if (operation instanceof CrossOperatorBase.CrossWithLarge) {
else if (hint == CrossHint.FIRST_IS_SMALL) {
ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
list.add(new CrossBlockOuterFirstDescriptor(true, false));
list.add(new CrossStreamOuterSecondDescriptor(true, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,32 @@
*/
public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {

/**
* The cross hint tells the system which sizes to expect from the data sets
*/
public static enum CrossHint {

OPTIMIZER_CHOOSES,

FIRST_IS_SMALL,

SECOND_IS_SMALL
}

// --------------------------------------------------------------------------------------------

private CrossHint hint = CrossHint.OPTIMIZER_CHOOSES;


public CrossOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
super(udf, operatorInfo, name);

if (this instanceof CrossWithSmall) {
setCrossHint(CrossHint.SECOND_IS_SMALL);
}
else if (this instanceof CrossWithLarge) {
setCrossHint(CrossHint.FIRST_IS_SMALL);
}
}

public CrossOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
Expand All @@ -48,6 +72,15 @@ public CrossOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1,
this(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
}


public void setCrossHint(CrossHint hint) {
this.hint = hint == null ? CrossHint.OPTIMIZER_CHOOSES : hint;
}

public CrossHint getCrossHint() {
return hint;
}

// --------------------------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
import org.apache.flink.api.java.aggregation.Aggregations;
Expand Down Expand Up @@ -791,7 +792,7 @@ public <R> CoGroupOperator.CoGroupOperatorSets<T, R> coGroup(DataSet<R> other) {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
return new CrossOperator.DefaultCross<T, R>(this, other, CrossHint.OPTIMIZER_CHOOSES, Utils.getCallLocationName());
}

/**
Expand Down Expand Up @@ -821,7 +822,7 @@ public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other) {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
return new CrossOperator.DefaultCross<T, R>(this, other, CrossHint.SECOND_IS_SMALL, Utils.getCallLocationName());
}

/**
Expand Down Expand Up @@ -851,7 +852,7 @@ public <R> CrossOperator.DefaultCross<T, R> crossWithTiny(DataSet<R> other) {
* @see Tuple2
*/
public <R> CrossOperator.DefaultCross<T, R> crossWithHuge(DataSet<R> other) {
return new CrossOperator.DefaultCross<T, R>(this, other, Utils.getCallLocationName());
return new CrossOperator.DefaultCross<T, R>(this, other, CrossHint.FIRST_IS_SMALL, Utils.getCallLocationName());
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 0954c4e

Please sign in to comment.