Skip to content

Commit

Permalink
[FLINK-22168][table] Partition insert can not work with union all
Browse files Browse the repository at this point in the history
This closes apache#15608
  • Loading branch information
JingsongLi committed Apr 22, 2021
1 parent e3cfa40 commit 04ce600
Show file tree
Hide file tree
Showing 3 changed files with 549 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.calcite.PreValidateReWriter.appendPartitionAndNullsProjects
import org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects, notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, FlinkPreparingTableBase, LegacyCatalogSourceTable}
import org.apache.flink.util.Preconditions.checkArgument

import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
Expand All @@ -33,7 +34,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlSelect, SqlUtil}
import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE

import java.util
Expand All @@ -50,24 +51,26 @@ class PreValidateReWriter(
call match {
case r: RichSqlInsert
if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null => r.getSource match {
case select: SqlSelect =>
appendPartitionAndNullsProjects(r, validator, typeFactory, select, r.getStaticPartitions)
case values: SqlCall if values.getKind == SqlKind.VALUES =>
val newSource = appendPartitionAndNullsProjects(r, validator, typeFactory, values,
r.getStaticPartitions)
case call: SqlCall =>
val newSource = appendPartitionAndNullsProjects(
r, validator, typeFactory, call, r.getStaticPartitions)
r.setOperand(2, newSource)
case source =>
throw new ValidationException(
s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support "
+ s"SELECT and VALUES clause for now, '$source' is not supported yet.")
case source => throw new ValidationException(notSupported(source))
}
case _ =>
}
}
}

object PreValidateReWriter {

//~ Tools ------------------------------------------------------------------

private def notSupported(source: SqlNode): String = {
s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support " +
s"SELECT, VALUES, SET_QUERY AND ORDER BY clause for now, '$source' is not supported yet."
}

/**
* Append the static partitions and unspecified columns to the data source projection list.
* The columns are appended to the corresponding positions.
Expand Down Expand Up @@ -108,7 +111,6 @@ object PreValidateReWriter {
typeFactory: RelDataTypeFactory,
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
assert(source.getKind == SqlKind.SELECT || source.getKind == SqlKind.VALUES)
val calciteCatalogReader = validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
Expand Down Expand Up @@ -185,11 +187,49 @@ object PreValidateReWriter {
}
}

source match {
case select: SqlSelect =>
rewriteSelect(validator, select, targetRowType, assignedFields, targetPosition)
case values: SqlCall if values.getKind == SqlKind.VALUES =>
rewriteValues(values, targetRowType, assignedFields, targetPosition)
rewriteSqlCall(validator, source, targetRowType, assignedFields, targetPosition)
}

private def rewriteSqlCall(
validator: FlinkCalciteSqlValidator,
call: SqlCall,
targetRowType: RelDataType,
assignedFields: util.LinkedHashMap[Integer, SqlNode],
targetPosition: util.List[Int]): SqlCall = {

def rewrite(node: SqlNode): SqlCall = {
checkArgument(node.isInstanceOf[SqlCall], node)
rewriteSqlCall(
validator,
node.asInstanceOf[SqlCall],
targetRowType,
assignedFields,
targetPosition)
}

call.getKind match {
case SqlKind.SELECT =>
rewriteSelect(
validator, call.asInstanceOf[SqlSelect], targetRowType, assignedFields, targetPosition)
case SqlKind.VALUES =>
rewriteValues(call, targetRowType, assignedFields, targetPosition)
case kind if SqlKind.SET_QUERY.contains(kind) =>
call.getOperandList.zipWithIndex.foreach {
case (operand, index) => call.setOperand(index, rewrite(operand))
}
call
case SqlKind.ORDER_BY =>
val operands = call.getOperandList
new SqlOrderBy(
call.getParserPosition,
rewrite(operands.get(0)),
operands.get(1).asInstanceOf[SqlNodeList],
operands.get(2),
operands.get(3))
// Not support:
// case SqlKind.WITH =>
// case SqlKind.EXPLICIT_TABLE =>
case _ => throw new ValidationException(notSupported(call))
}
}

Expand Down
Loading

0 comments on commit 04ce600

Please sign in to comment.