Skip to content

Commit

Permalink
[FLINK-3943] [table] Add support for EXCEPT operator
Browse files Browse the repository at this point in the history
This closes apache#2169.
  • Loading branch information
mushketyk authored and twalthr committed Jul 11, 2016
1 parent 5df0b27 commit 9753393
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 13 deletions.
51 changes: 49 additions & 2 deletions docs/apis/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,30 @@ Table result = left.intersectAll(right);
</td>
</tr>

<tr>
<td><strong>Minus</strong></td>
<td>
<p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>MinusAll</strong></td>
<td>
<p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Distinct</strong></td>
<td>
Expand Down Expand Up @@ -731,7 +755,7 @@ val result = left.intersect(right);
</td>
</tr>

<tr>
<tr>
<td><strong>IntersectAll</strong></td>
<td>
<p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p>
Expand All @@ -743,6 +767,30 @@ val result = left.intersectAll(right);
</td>
</tr>

<tr>
<td><strong>Minus</strong></td>
<td>
<p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
{% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.minus(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>MinusAll</strong></td>
<td>
<p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
{% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.minusAll(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Distinct</strong></td>
<td>
Expand Down Expand Up @@ -884,7 +932,6 @@ Among others, the following SQL features are not supported, yet:
- Non-equi joins and Cartesian products
- Result selection by order position (`ORDER BY OFFSET FETCH`)
- Grouping sets
- `EXCEPT` set operation

*Note: Tables are joined in the order in which they are specified in the `FROM` clause. In some cases the table order must be manually tweaked to resolve Cartesian products.*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,37 @@ case class Aggregate(
}
}

case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)
relBuilder.minus(all)
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Minus on stream tables is currently not supported.")
}

val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
if (left.output.length != right.output.length) {
failValidation(s"Minus two table of different column sizes:" +
s" ${left.output.size} and ${right.output.size}")
}
val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
l.resultType == r.resultType
}
if (!sameSchema) {
failValidation(s"Minus two table of different schema:" +
s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
}
resolvedMinus
}
}

case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class DataSetIntersect(
left: RelNode,
right: RelNode,
rowType: RelDataType,
all: Boolean,
ruleDescription: String)
all: Boolean)
extends BiRel(cluster, traitSet, left, right)
with DataSetRel {

Expand All @@ -55,8 +54,7 @@ class DataSetIntersect(
inputs.get(0),
inputs.get(1),
rowType,
all,
ruleDescription
all
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.runtime.MinusCoGroupFunction
import org.apache.flink.api.table.typeutils.TypeConverter._

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
* Flink RelNode which implements set minus operation.
*
*/
class DataSetMinus(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
rowType: RelDataType,
all: Boolean)
extends BiRel(cluster, traitSet, left, right)
with DataSetRel {

override def deriveRowType() = rowType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetMinus(
cluster,
traitSet,
inputs.get(0),
inputs.get(1),
rowType,
all
)
}

override def toString: String = {
s"Minus(minus: ($minusSelectionToString}))"
}

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw).item("minus", minusSelectionToString)
}

override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val children = this.getInputs
children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
val rowCnt = metadata.getRowCount(child)
val rowSize = this.estimateRowSize(child.getRowType)
cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
}
}

override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

val coGroupedDs = leftDataSet.coGroup(rightDataSet)

val coGroupOpName = s"minus: ($minusSelectionToString)"
val coGroupFunction = new MinusCoGroupFunction[Any](all)

val minusDs = coGroupedDs.where("*").equalTo("*")
.`with`(coGroupFunction).name(coGroupOpName)

val config = tableEnv.getConfig
val leftType = leftDataSet.getType

// here we only care about left type information, because we emit records from left DataSet
expectedType match {
case None if config.getEfficientTypeUsage =>
minusDs

case _ =>
val determinedType = determineReturnType(
getRowType,
expectedType,
config.getNullCheck,
config.getEfficientTypeUsage)

// conversion
if (determinedType != leftType) {
val mapFunc = getConversionMapper(
config,
false,
leftType,
determinedType,
"DataSetMinusConversion",
getRowType.getFieldNames)

val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"

minusDs.map(mapFunc).name(opName)
}
// no conversion necessary, forward
else {
minusDs
}
}
}

private def minusSelectionToString: String = {
rowType.getFieldNames.asScala.toList.mkString(", ")
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ object FlinkRuleSets {
DataSetScanRule.INSTANCE,
DataSetUnionRule.INSTANCE,
DataSetIntersectRule.INSTANCE,
DataSetMinusRule.INSTANCE,
DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class DataSetIntersectRule
convLeft,
convRight,
rel.getRowType,
intersect.all,
description)
intersect.all)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.plan.rules.dataSet

import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalMinus
import org.apache.calcite.rel.rules.UnionToDistinctRule
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}

class DataSetMinusRule
extends ConverterRule(
classOf[LogicalMinus],
Convention.NONE,
DataSetConvention.INSTANCE,
"DataSetMinusRule")
{

def convert(rel: RelNode): RelNode = {

val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)

new DataSetMinus(
rel.getCluster,
traitSet,
convLeft,
convRight,
rel.getRowType,
minus.all)
}
}

object DataSetMinusRule {
val INSTANCE: RelOptRule = new DataSetMinusRule
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable}
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector


class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T]{
override def coGroup(first: JIterable[T], second: JIterable[T], out: Collector[T]): Unit = {
if (first == null || second == null) return
Expand Down
Loading

0 comments on commit 9753393

Please sign in to comment.