Skip to content

Commit

Permalink
Simplified and fixed estimate computation.
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 27, 2014
1 parent aa3d14d commit fe2a2ab
Show file tree
Hide file tree
Showing 33 changed files with 264 additions and 1,090 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;

import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.costs.CostEstimator;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.util.Visitor;
Expand All @@ -34,7 +35,6 @@ protected AbstractPartialSolutionNode(Operator contract) {
// --------------------------------------------------------------------------------------------

protected void copyEstimates(OptimizerNode node) {
this.estimatedCardinality = node.estimatedCardinality;
this.estimatedNumRecords = node.estimatedNumRecords;
this.estimatedOutputSize = node.estimatedOutputSize;
}
Expand Down Expand Up @@ -65,6 +65,11 @@ public List<PactConnection> getIncomingConnections() {
@Override
public void setInputs(Map<Operator, OptimizerNode> contractToNode) {}

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
// we do nothing here, because the estimates can only be copied from the iteration input
}

@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// no children, so nothing to compute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ protected List<OperatorDescriptorDual> getPossibleProperties() {
return new ArrayList<OperatorDescriptorDual>();
}

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2;

long size1 = getFirstPredecessorNode().getEstimatedOutputSize();
long size2 = getSecondPredecessorNode().getEstimatedOutputSize();
this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2;
}

@Override
public void computeUnionOfInterestingPropertiesFromSuccessors() {
super.computeUnionOfInterestingPropertiesFromSuccessors();
Expand Down Expand Up @@ -297,25 +308,6 @@ public void computeOutputEstimates(DataStatistics statistics) {
in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
}

@Override
protected long computeNumberOfStubCalls() {
return this.estimatedNumRecords;
}

@Override
protected double computeAverageRecordWidth() {
if (this.estimatedNumRecords == -1 || this.estimatedOutputSize == -1) return -1;

final double width = this.estimatedOutputSize / (double) this.estimatedNumRecords;

// a record must have at least one byte...
if(width < 1) {
return 1;
} else {
return width;
}
}

// ------------------------------------------------------------------------
// Mock classes that represents a contract without behavior.
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,12 @@ public boolean isFieldConstant(int input, int fieldNumber) {
return false;
}

protected void readStubAnnotations() {
}
protected void readStubAnnotations() {}

public void computeOutputEstimates(DataStatistics statistics) {
// simply copy from the inputs
final OptimizerNode n = this.inConn.getSource();
this.estimatedCardinality = n.estimatedCardinality;
this.estimatedOutputSize = n.estimatedOutputSize;
this.estimatedNumRecords = n.estimatedNumRecords;
@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import java.util.ArrayList;
import java.util.List;

import eu.stratosphere.api.common.operators.CompilerHints;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.api.java.record.operators.CoGroupOperator;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.operators.CoGroupDescriptor;
Expand All @@ -28,26 +26,20 @@
import eu.stratosphere.compiler.operators.OperatorDescriptorDual;

/**
* The Optimizer representation of a <i>CoGroup</i> contract node.
* The Optimizer representation of a <i>CoGroup</i> operator.
*/
public class CoGroupNode extends TwoInputNode {

/**
* Creates a new CoGroupNode for the given contract.
*
* @param pactContract
* The CoGroup contract object.
*/
public CoGroupNode(CoGroupOperatorBase<?> pactContract) {
super(pactContract);
}

// --------------------------------------------------------------------------------------------

/**
* Gets the contract object for this CoGroup node.
* Gets the operator for this CoGroup node.
*
* @return The contract.
* @return The CoGroup operator.
*/
@Override
public CoGroupOperatorBase<?> getPactContract() {
Expand Down Expand Up @@ -95,123 +87,9 @@ public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
this.possibleProperties.add(op);
}

// --------------------------------------------------------------------------------------------
// Estimates
// --------------------------------------------------------------------------------------------

/**
* Computes the number of keys that are processed by the PACT.
*
* @return the number of keys processed by the PACT.
*/
protected long computeNumberOfProcessedKeys() {
long numKey1 = this.getFirstPredecessorNode().getEstimatedCardinality(new FieldSet(this.keys1));
long numKey2 = this.getSecondPredecessorNode().getEstimatedCardinality(new FieldSet(this.keys2));

if(numKey1 == -1 && numKey2 == -1)
// key card of both inputs unknown. Return -1
return -1;

if(numKey1 == -1)
// key card of 1st input unknown. Use key card of 2nd input as lower bound
return numKey2;

if(numKey2 == -1)
// key card of 2nd input unknown. Use key card of 1st input as lower bound
return numKey1;

// key card of both inputs known. Use maximum as lower bound
return Math.max(numKey1, numKey2);
}

/**
* Computes the number of stub calls for one processed key.
*
* @return the number of stub calls for one processed key.
*/
protected double computeStubCallsPerProcessedKey() {
// the stub is called once for each key.
return 1;
}

/**
* Computes the number of stub calls.
*
* @return the number of stub calls.
*/
protected long computeNumberOfStubCalls() {
// the stub is called once per key
return this.computeNumberOfProcessedKeys();
}

@Override
public void computeOutputEstimates(DataStatistics statistics) {
CompilerHints hints = getPactContract().getCompilerHints();

// special hint handling for CoGroup:
// In case of SameKey OutputContract, avgNumValuesPerKey and avgRecordsEmittedPerStubCall are identical,
// since the stub is called once per key
int[] keyColumns = getConstantKeySet(0);
if (keyColumns != null) {
FieldSet keySet = new FieldSet(keyColumns);
if (hints.getAvgNumRecordsPerDistinctFields(keySet) != -1 && hints.getAvgRecordsEmittedPerStubCall() == -1) {
hints.setAvgRecordsEmittedPerStubCall(hints.getAvgNumRecordsPerDistinctFields(keySet));
}
if(hints.getAvgRecordsEmittedPerStubCall() != -1 && hints.getAvgNumRecordsPerDistinctFields(keySet) == -1) {
hints.setAvgNumRecordsPerDistinctFields(keySet, hints.getAvgRecordsEmittedPerStubCall());
}
}

keyColumns = getConstantKeySet(1);
if (keyColumns != null) {
FieldSet keySet = new FieldSet(keyColumns);
if (hints.getAvgNumRecordsPerDistinctFields(keySet) != -1 && hints.getAvgRecordsEmittedPerStubCall() == -1) {
hints.setAvgRecordsEmittedPerStubCall(hints.getAvgNumRecordsPerDistinctFields(keySet));
}
if(hints.getAvgRecordsEmittedPerStubCall() != -1 && hints.getAvgNumRecordsPerDistinctFields(keySet) == -1) {
hints.setAvgNumRecordsPerDistinctFields(keySet, hints.getAvgRecordsEmittedPerStubCall());
}
}


super.computeOutputEstimates(statistics);
}

@Override
public List<FieldSet> createUniqueFieldsForNode() {
List<FieldSet> uniqueFields = null;
if (keys1 != null) {
boolean isKept = true;
for (int keyField : keys1) {
if (!isFieldConstant(0, keyField)) {
isKept = false;
break;
}
}

if (isKept) {
uniqueFields = new ArrayList<FieldSet>();
uniqueFields.add(new FieldSet(keys1));
}
}

if (keys2 != null) {
boolean isKept = true;
for (int keyField : keys2) {
if (!isFieldConstant(1, keyField)) {
isKept = false;
break;
}
}

if (isKept) {
if (uniqueFields == null) {
uniqueFields = new ArrayList<FieldSet>();
}
uniqueFields.add(new FieldSet(keys2));
}
}

return uniqueFields;
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
// for CoGroup, we currently make no reasonable default estimates
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.List;

import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.operators.CrossBlockOuterFirstDescriptor;
import eu.stratosphere.compiler.operators.CrossBlockOuterSecondDescriptor;
Expand All @@ -29,12 +29,12 @@
import eu.stratosphere.configuration.Configuration;

/**
* The Optimizer representation of a <i>Cross</i> contract node.
* The Optimizer representation of a <i>Cross</i> (Cartesian product) operator.
*/
public class CrossNode extends TwoInputNode {

/**
* Creates a new CrossNode for the given contract.
* Creates a new CrossNode for the given operator.
*
* @param pactContract The Cross contract object.
*/
Expand All @@ -44,11 +44,6 @@ public CrossNode(CrossOperatorBase<?> pactContract) {

// ------------------------------------------------------------------------

/**
* Gets the contract object for this Cross node.
*
* @return The contract.
*/
@Override
public CrossOperatorBase<?> getPactContract() {
return (CrossOperatorBase<?>) super.getPactContract();
Expand Down Expand Up @@ -117,45 +112,25 @@ else if (operation instanceof CrossOperatorBase.CrossWithLarge) {
}

/**
* Computes the number of keys that are processed by the PACT.
* We assume that the cardinality is the product of the input cardinalities
* and that the result width is the sum of the input widths.
*
* @return the number of keys processed by the PACT.
* @param statistics The statistics object to optionally access.
*/
protected long computeNumberOfProcessedKeys() {
// Match processes only keys that appear in both input sets
FieldSet fieldSet1 = new FieldSet(getPactContract().getKeyColumns(0));
FieldSet fieldSet2 = new FieldSet(getPactContract().getKeyColumns(1));

long numKey1 = this.getFirstPredecessorNode().getEstimatedCardinality(fieldSet1);
long numKey2 = this.getSecondPredecessorNode().getEstimatedCardinality(fieldSet2);

if(numKey1 == -1 || numKey2 == -1)
return -1;

return numKey1 * numKey2;
}

/**
* Computes the number of stub calls.
*
* @return the number of stub calls.
*/
protected long computeNumberOfStubCalls() {
@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2;

long numRecords1 = this.getFirstPredecessorNode().estimatedNumRecords;
if(numRecords1 == -1) {
return -1;
}

long numRecords2 = this.getSecondPredecessorNode().estimatedNumRecords;
if(numRecords2 == -1) {
return -1;
if (this.estimatedNumRecords >= 0) {
float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
if (width > 0) {
this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
}
}

return numRecords1 * numRecords2;
}

public boolean keepsUniqueProperty(FieldSet uniqueSet, int input) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,10 @@ public void setInputs(Map<Operator, OptimizerNode> contractToNode) {

/**
* Computes the estimated outputs for the data sink. Since the sink does not modify anything, it simply
* copies the output estimates from its direct predecessor. Any compiler hints on the data sink are
* ignored.
*
* @param statistics
* The statistics wrapper to be used to obtain additional knowledge. Ignored.
* @see eu.stratosphere.compiler.dag.OptimizerNode#computeOutputEstimates(eu.stratosphere.compiler.DataStatistics)
* copies the output estimates from its direct predecessor.
*/
@Override
public void computeOutputEstimates(DataStatistics statistics) {
// we copy the output estimates from the input
if (this.getPredecessorNode() == null) {
throw new CompilerException();
}

if (this.estimatedCardinality.size() > 0)
this.estimatedCardinality.clear();

this.estimatedCardinality.putAll(getPredecessorNode().getEstimatedCardinalities());
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
}
Expand Down
Loading

0 comments on commit fe2a2ab

Please sign in to comment.