Skip to content

Commit

Permalink
[FLINK-23054][table] TemporalJoinRewrite should based on upsert key
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jun 29, 2021
1 parent 9d449ff commit e1117f0
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,21 @@ class TemporalJoinRewriteWithUniqueKeyRule extends RelOptRule(
val rightFields = snapshot.getRowType.getFieldList
val fmq = FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)

val uniqueKeySet = fmq.getUniqueKeys(snapshot.getInput())
val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput())
val fields = snapshot.getRowType.getFieldList

if (uniqueKeySet != null && uniqueKeySet.size() > 0) {
if (upsertKeySet != null && upsertKeySet.size() > 0) {
val leftFieldCnt = leftInput.getRowType.getFieldCount
val uniqueKeySetInputRefs = uniqueKeySet.filter(_.nonEmpty)
val upsertKeySetInputRefs = upsertKeySet.filter(_.nonEmpty)
.map(_.toArray
.map(fields)
// build InputRef of unique key in snapshot
// build InputRef of upsert key in snapshot
.map(f => rexBuilder.makeInputRef(
f.getType,
leftFieldCnt + rightFields.indexOf(f)))
.toSeq)
// select shortest unique key as primary key
uniqueKeySetInputRefs
// select shortest upsert key as primary key
upsertKeySetInputRefs
.toArray
.sortBy(_.length)
.headOption
Expand Down

0 comments on commit e1117f0

Please sign in to comment.