Skip to content

Commit

Permalink
[FLINK-14599][table-planner-blink] Support precision of Timestamp typ…
Browse files Browse the repository at this point in the history
…e in blink planner

This closes apache#10268
  • Loading branch information
docete authored and KurtYoung committed Dec 2, 2019
1 parent c34de6d commit 6256b11
Show file tree
Hide file tree
Showing 59 changed files with 1,253 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule.ConvertContext;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnit;
Expand All @@ -53,6 +54,7 @@
import org.apache.calcite.util.TimestampWithTimeZoneString;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
Expand Down Expand Up @@ -134,8 +136,16 @@ public RexNode visit(ValueLiteralExpression valueLiteral) {
return relBuilder.getRexBuilder().makeTimeLiteral(TimeString.fromCalendarFields(
valueAsCalendar(extractValue(valueLiteral, java.sql.Time.class))), 0);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return relBuilder.getRexBuilder().makeTimestampLiteral(TimestampString.fromCalendarFields(
valueAsCalendar(extractValue(valueLiteral, java.sql.Timestamp.class))), 3);
TimestampType timestampType = (TimestampType) type;
LocalDateTime datetime = extractValue(valueLiteral, LocalDateTime.class);
return relBuilder.getRexBuilder().makeTimestampLiteral(
new TimestampString(
datetime.getYear(),
datetime.getMonthValue(),
datetime.getDayOfMonth(),
datetime.getHour(),
datetime.getMinute(),
datetime.getSecond()).withNanos(datetime.getNano()), timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
TimeZone timeZone = TimeZone.getTimeZone(this.relBuilder.getCluster()
.getPlanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;

import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
Expand Down Expand Up @@ -279,13 +280,16 @@ public DataType getResultType() {
*/
public static class TimestampLeadLagAggFunction extends LeadLagAggFunction {

public TimestampLeadLagAggFunction(int operandCount) {
private final TimestampType type;

public TimestampLeadLagAggFunction(int operandCount, TimestampType type) {
super(operandCount);
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIMESTAMP(3);
return DataTypes.TIMESTAMP(type.getPrecision());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;

import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.greaterThan;
Expand Down Expand Up @@ -212,9 +213,16 @@ public DataType getResultType() {
* Built-in Timestamp Max aggregate function.
*/
public static class TimestampMaxAggFunction extends MaxAggFunction {

private final TimestampType type;

public TimestampMaxAggFunction(TimestampType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIMESTAMP(3);
return DataTypes.TIMESTAMP(type.getPrecision());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;

import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
Expand Down Expand Up @@ -212,9 +213,16 @@ public DataType getResultType() {
* Built-in Timestamp Min aggregate function.
*/
public static class TimestampMinAggFunction extends MinAggFunction {

private final TimestampType type;

public TimestampMinAggFunction(TimestampType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIMESTAMP(3);
return DataTypes.TIMESTAMP(type.getPrecision());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;

import java.util.Arrays;

Expand Down Expand Up @@ -284,9 +285,15 @@ public static final class TimestampSingleValueAggFunction extends SingleValueAgg

private static final long serialVersionUID = 320495723666949978L;

private final TimestampType type;

public TimestampSingleValueAggFunction(TimestampType type) {
this.type = type;
}

@Override
public DataType getResultType() {
return DataTypes.TIMESTAMP(3);
return DataTypes.TIMESTAMP(type.getPrecision());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,14 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);

// TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
// but conversion of DataType and TypeInformation only support TIMESTAMP(3) now.
// change to TIMESTAMP(9) when FLINK-14645 is fixed.
// https://issues.apache.org/jira/browse/FLINK-14925
public static final SqlFunction TO_TIMESTAMP = new SqlFunction(
"TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ProctimeMaterializeSqlFunction() {
"PROCTIME_MATERIALIZE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
SqlTypeTransforms.TO_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.family(SqlTypeFamily.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
timestampType.getKind match {
case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision)
}
case _ =>
seenTypes.get(t) match {
Expand Down Expand Up @@ -436,11 +436,7 @@ object FlinkTypeFactory {
// blink runner support precision 3, but for consistent with flink runner, we set to 0.
new TimeType()
case TIMESTAMP =>
if (relDataType.getPrecision > 3) {
throw new TableException(
s"TIMESTAMP precision is not supported: ${relDataType.getPrecision}")
}
new TimestampType(3)
new TimestampType(relDataType.getPrecision)
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
if (relDataType.getPrecision > 3) {
throw new TableException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
import org.apache.flink.table.types.logical.{DecimalType, LogicalType}

import org.apache.flink.table.types.logical.{DecimalType, LogicalType, TimestampType}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeSystemImpl}
import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}

Expand All @@ -41,8 +40,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
case SqlTypeName.VARCHAR | SqlTypeName.VARBINARY =>
Int.MaxValue

// we currently support only timestamps with milliseconds precision
case SqlTypeName.TIMESTAMP | SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
// by default we support timestamp with microseconds precision (Timestamp(6))
case SqlTypeName.TIMESTAMP =>
TimestampType.DEFAULT_PRECISION

// we currently support only timestamp with local time zone with milliseconds precision
case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
3

case _ =>
Expand All @@ -53,6 +56,10 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
case SqlTypeName.VARCHAR | SqlTypeName.CHAR | SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
Int.MaxValue

// The maximum precision of TIMESTAMP is 3 in Calcite,
// change it to 9 to support nanoseconds precision
case SqlTypeName.TIMESTAMP => TimestampType.MAX_PRECISION

case _ =>
super.getMaxPrecision(typeName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ object CodeGenUtils {
def binaryRowSetNull(indexTerm: String, rowTerm: String, t: LogicalType): String = t match {
case d: DecimalType if !Decimal.isCompact(d.getPrecision) =>
s"$rowTerm.setDecimal($indexTerm, null, ${d.getPrecision})"
case d: TimestampType if !SqlTimestamp.isCompact(d.getPrecision) =>
s"$rowTerm.setTimestamp($indexTerm, null, ${d.getPrecision})"
case _ => s"$rowTerm.setNullAt($indexTerm)"
}

Expand Down Expand Up @@ -592,14 +594,11 @@ object CodeGenUtils {
case TINYINT => s"$arrayTerm.setNullByte($index)"
case SMALLINT => s"$arrayTerm.setNullShort($index)"
case INTEGER => s"$arrayTerm.setNullInt($index)"
case BIGINT => s"$arrayTerm.setNullLong($index)"
case FLOAT => s"$arrayTerm.setNullFloat($index)"
case DOUBLE => s"$arrayTerm.setNullDouble($index)"
case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
case DATE => s"$arrayTerm.setNullInt($index)"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
case INTERVAL_YEAR_MONTH => s"$arrayTerm.setNullInt($index)"
case INTERVAL_DAY_TIME => s"$arrayTerm.setNullLong($index)"
case _ => s"$arrayTerm.setNullLong($index)"
}

Expand All @@ -614,6 +613,8 @@ object CodeGenUtils {
t: LogicalType): String = t match {
case d: DecimalType if !Decimal.isCompact(d.getPrecision) =>
s"$writerTerm.writeDecimal($indexTerm, null, ${d.getPrecision})"
case d: TimestampType if !SqlTimestamp.isCompact(d.getPrecision) =>
s"$writerTerm.writeTimestamp($indexTerm, null, ${d.getPrecision})"
case _ => s"$writerTerm.setNullAt($indexTerm)"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTemporal, isTimeInterval}
import org.apache.flink.table.types.logical._
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo

import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
import org.apache.calcite.util.TimestampString

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -388,7 +388,12 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)

override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
val resultType = FlinkTypeFactory.toLogicalType(literal.getType)
val value = literal.getValue3
val value = resultType.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
literal.getValueAs(classOf[TimestampString])
case _ =>
literal.getValue3
}
generateLiteral(ctx, resultType, value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.util.TimestampString
import org.apache.commons.lang3.StringEscapeUtils
import java.io.File
import java.util.TimeZone
Expand Down Expand Up @@ -173,7 +174,16 @@ class ExpressionReducer(
case SqlTypeName.TIMESTAMP =>
val reducedValue = reduced.getField(reducedIdx)
val value = if (reducedValue != null) {
Long.box(reducedValue.asInstanceOf[SqlTimestamp].getMillisecond)
val dt = reducedValue.asInstanceOf[SqlTimestamp].toLocalDateTime
val timestampString =
new TimestampString(
dt.getYear,
dt.getMonthValue,
dt.getDayOfMonth,
dt.getHour,
dt.getMinute,
dt.getSecond)
timestampString.withNanos(dt.getNano)
} else {
reducedValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE}
import org.apache.flink.table.planner.codegen.calls.CurrentTimePointCallGen
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils.unixTimestampToLocalDateTime
import org.apache.flink.table.runtime.types.PlannerTypeUtils
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, isReference, isTemporal}
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical._

import org.apache.calcite.avatica.util.ByteString
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.calcite.util.TimestampString

import org.apache.commons.lang3.StringEscapeUtils
import java.math.{BigDecimal => JBigDecimal}


import scala.collection.mutable

/**
Expand Down Expand Up @@ -370,12 +373,14 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// TODO: support Timestamp(3) now
val fieldTerm = newName("timestamp")
val millis = literalValue.asInstanceOf[Long]
val millis = literalValue.asInstanceOf[TimestampString].getMillisSinceEpoch
val nanoOfMillis = SqlDateTimeUtils.getNanoOfMillisSinceEpoch(
literalValue.asInstanceOf[TimestampString].toString)
val fieldTimestamp =
s"""
|$SQL_TIMESTAMP $fieldTerm = $SQL_TIMESTAMP.fromEpochMillis(${millis}L);
|$SQL_TIMESTAMP $fieldTerm =
| $SQL_TIMESTAMP.fromEpochMillis(${millis}L, $nanoOfMillis);
""".stripMargin
ctx.addReusableMember(fieldTimestamp)
generateNonNullLiteral(literalType, fieldTerm, literalType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ package org.apache.flink.table.planner.codegen
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.Gauge
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.dataformat.{BaseRow, JoinedRow}
import org.apache.flink.table.dataformat.{BaseRow, JoinedRow, SqlTimestamp}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BASE_ROW, BINARY_ROW, baseRowFieldReadAccess, className, newName}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{INPUT_SELECTION, generateCollect}
import org.apache.flink.table.runtime.generated.{GeneratedJoinCondition, GeneratedProjection}
import org.apache.flink.table.runtime.hashtable.{LongHashPartition, LongHybridHashTable}
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.operators.join.HashJoinType
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.LogicalTypeRoot.{TIMESTAMP_WITHOUT_TIME_ZONE, _}
import org.apache.flink.table.types.logical._

/**
Expand All @@ -49,8 +49,10 @@ object LongHashJoinGenerator {
keyType.getFieldCount == 1 && {
keyType.getTypeAt(0).getTypeRoot match {
case BIGINT | INTEGER | SMALLINT | TINYINT | FLOAT | DOUBLE | DATE |
TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => true
TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => true
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val timestampType = keyType.getTypeAt(0).asInstanceOf[TimestampType]
if (SqlTimestamp.isCompact(timestampType.getPrecision)) true else false
case _ => false
}
// TODO decimal and multiKeys support.
Expand Down
Loading

0 comments on commit 6256b11

Please sign in to comment.