Skip to content

Commit

Permalink
[FLINK-22099][table-planner-blink] Fix bug about throwing ArrayIndexO…
Browse files Browse the repository at this point in the history
…utOfBoundsException when window join deals with semi/anti queries

This closes apache#15477
  • Loading branch information
beyond1920 authored and godfreyhe committed Apr 19, 2021
1 parent 6dd276e commit f511680
Show file tree
Hide file tree
Showing 4 changed files with 546 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class StreamPhysicalWindowJoin(
.item("joinType", joinSpec.getJoinType)
.item("where",
getExpressionString(
remainingCondition, getRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
remainingCondition, inputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
.item("select", getRowType.getFieldNames.mkString(", "))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ object WindowJoinUtil {
val joinInfo = join.analyzeCondition()
val (remainLeftKeys, remainRightKeys, remainCondition) = if (
windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) {
val joinFieldsType = join.getRowType.getFieldList
val leftChildFieldsType = join.getLeft.getRowType.getFieldList
val rightChildFieldsType = join.getRight.getRowType.getFieldList
val leftFieldCnt = join.getLeft.getRowType.getFieldCount
val rexBuilder = join.getCluster.getRexBuilder
val remainEquals = mutable.ArrayBuffer[RexNode]()
Expand All @@ -106,10 +107,10 @@ object WindowJoinUtil {
joinInfo.pairs().foreach { p =>
if (!windowStartEqualityLeftKeys.contains(p.source) &&
!windowEndEqualityLeftKeys.contains(p.source)) {
val leftFieldType = joinFieldsType.get(p.source).getType
val leftFieldType = leftChildFieldsType.get(p.source).getType
val leftInputRef = new RexInputRef(p.source, leftFieldType)
val rightFieldType = rightChildFieldsType.get(p.target).getType
val rightIndex = leftFieldCnt + p.target
val rightFieldType = joinFieldsType.get(rightIndex).getType
val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
val remainEqual = rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
Expand Down
Loading

0 comments on commit f511680

Please sign in to comment.