Skip to content

Commit

Permalink
[FLINK-14599][table-planner-blink] Use SqlTimestamp as internal repre…
Browse files Browse the repository at this point in the history
…sentation of Timestamp type
  • Loading branch information
docete authored and KurtYoung committed Dec 2, 2019
1 parent 1f9cd8a commit c34de6d
Show file tree
Hide file tree
Showing 41 changed files with 426 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class TimestampType extends LogicalType {

private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
java.sql.Timestamp.class.getName(),
java.time.LocalDateTime.class.getName());
java.time.LocalDateTime.class.getName(),
"org.apache.flink.table.dataformat.SqlTimestamp");

private static final Class<?> DEFAULT_CONVERSION = java.time.LocalDateTime.class;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ object CodeGenUtils {

val STRING_UTIL: String = className[BinaryStringUtil]

val SQL_TIMESTAMP: String = className[SqlTimestamp]

// ----------------------------------------------------------------------------------------

private val nameCounter = new AtomicInteger
Expand Down Expand Up @@ -133,7 +135,7 @@ object CodeGenUtils {

case DATE => "int"
case TIME_WITHOUT_TIME_ZONE => "int"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => "long"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => "long"
case INTERVAL_YEAR_MONTH => "int"
case INTERVAL_DAY_TIME => "long"

Expand All @@ -151,7 +153,7 @@ object CodeGenUtils {

case DATE => className[JInt]
case TIME_WITHOUT_TIME_ZONE => className[JInt]
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[JLong]
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[JLong]
case INTERVAL_YEAR_MONTH => className[JInt]
case INTERVAL_DAY_TIME => className[JLong]

Expand All @@ -162,6 +164,7 @@ object CodeGenUtils {
case ARRAY => className[BaseArray]
case MULTISET | MAP => className[BaseMap]
case ROW => className[BaseRow]
case TIMESTAMP_WITHOUT_TIME_ZONE => className[SqlTimestamp]

case RAW => className[BinaryGeneric[_]]
}
Expand Down Expand Up @@ -190,7 +193,7 @@ object CodeGenUtils {
case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8"

case DATE | TIME_WITHOUT_TIME_ZONE => "-1"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => "-1L"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => "-1L"
case INTERVAL_YEAR_MONTH => "-1"
case INTERVAL_DAY_TIME => "-1L"

Expand Down Expand Up @@ -223,7 +226,8 @@ object CodeGenUtils {
case DECIMAL => s"$term.hashCode()"
case DATE => s"${className[JInt]}.hashCode($term)"
case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
case TIMESTAMP_WITHOUT_TIME_ZONE => s"$term.hashCode()"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
s"${className[JLong]}.hashCode($term)"
case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
Expand Down Expand Up @@ -414,8 +418,10 @@ object CodeGenUtils {
// temporal types
case DATE => s"$rowTerm.getInt($indexTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$rowTerm.getInt($indexTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.getLong($indexTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$rowTerm.getTimestamp($indexTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.getLong($indexTerm)"
case INTERVAL_YEAR_MONTH => s"$rowTerm.getInt($indexTerm)"
case INTERVAL_DAY_TIME => s"$rowTerm.getLong($indexTerm)"

Expand Down Expand Up @@ -539,8 +545,10 @@ object CodeGenUtils {
case BOOLEAN => s"$binaryRowTerm.setBoolean($index, $fieldValTerm)"
case DATE => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$binaryRowTerm.setTimestamp($index, $fieldValTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case INTERVAL_YEAR_MONTH => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case INTERVAL_DAY_TIME => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case DECIMAL =>
Expand Down Expand Up @@ -568,8 +576,7 @@ object CodeGenUtils {
case BOOLEAN => s"$rowTerm.setBoolean($indexTerm, $fieldTerm)"
case DATE => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case INTERVAL_YEAR_MONTH => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case INTERVAL_DAY_TIME => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case _ => s"$rowTerm.setNonPrimitiveValue($indexTerm, $fieldTerm)"
Expand All @@ -590,8 +597,7 @@ object CodeGenUtils {
case DOUBLE => s"$arrayTerm.setNullDouble($index)"
case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
case DATE => s"$arrayTerm.setNullInt($index)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
case INTERVAL_YEAR_MONTH => s"$arrayTerm.setNullInt($index)"
case INTERVAL_DAY_TIME => s"$arrayTerm.setNullLong($index)"
case _ => s"$arrayTerm.setNullLong($index)"
Expand Down Expand Up @@ -640,8 +646,10 @@ object CodeGenUtils {
s"$writerTerm.writeDecimal($indexTerm, $fieldValTerm, ${dt.getPrecision})"
case DATE => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$writerTerm.writeTimestamp($indexTerm, $fieldValTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
case INTERVAL_YEAR_MONTH => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case INTERVAL_DAY_TIME => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
val fieldTerm = s"timestamp"
val field =
s"""
|final long $fieldTerm = java.lang.System.currentTimeMillis();
|final $SQL_TIMESTAMP $fieldTerm =
| $SQL_TIMESTAMP.fromEpochMillis(java.lang.System.currentTimeMillis());
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -431,7 +432,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
|$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
Expand All @@ -449,12 +450,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private long $fieldTerm;")
reusableMemberStatements.add(s"private $SQL_TIMESTAMP $fieldTerm;")

// assignment
val field =
s"""
|$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp);
|$fieldTerm = $SQL_TIMESTAMP.fromEpochMillis(
| $timestamp.getMillisecond() +
| java.util.TimeZone.getDefault().getOffset($timestamp.getMillisecond()));
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -475,7 +478,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
s"""
|$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($localtimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -497,7 +500,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
|$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp.getMillisecond() / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.MetricGroup
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.dataformat.BinaryStringUtil.safeToString
import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow}
import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow, SqlTimestamp}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.FunctionCodeGenerator.generateFunction
import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils
import org.apache.flink.table.types.logical.RowType

import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.commons.lang3.StringEscapeUtils

import java.io.File
import java.util.TimeZone

Expand Down Expand Up @@ -172,6 +170,15 @@ class ExpressionReducer(
}
reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
reducedIdx += 1
case SqlTypeName.TIMESTAMP =>
val reducedValue = reduced.getField(reducedIdx)
val value = if (reducedValue != null) {
Long.box(reducedValue.asInstanceOf[SqlTimestamp].getMillisecond)
} else {
reducedValue
}
reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
reducedIdx += 1
case _ =>
val reducedValue = reduced.getField(reducedIdx)
// RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,15 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// TODO: support Timestamp(3) now
val fieldTerm = newName("timestamp")
val millis = literalValue.asInstanceOf[Long]
generateNonNullLiteral(literalType, millis + "L", millis)
val fieldTimestamp =
s"""
|$SQL_TIMESTAMP $fieldTerm = $SQL_TIMESTAMP.fromEpochMillis(${millis}L);
""".stripMargin
ctx.addReusableMember(fieldTimestamp)
generateNonNullLiteral(literalType, fieldTerm, literalType)

case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
val millis = unixTimestampToLocalDateTime(literalValue.asInstanceOf[Long])
Expand Down Expand Up @@ -438,13 +445,16 @@ object GenerateUtils {
def generateProctimeTimestamp(
ctx: CodeGeneratorContext,
contextTerm: String): GeneratedExpression = {
val resultTerm = ctx.addReusableLocalVariable("long", "result")
val resultType = new TimestampType(3)
val resultTypeTerm = primitiveTypeTermForType(resultType)
val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
val resultCode =
s"""
|$resultTerm = $contextTerm.timerService().currentProcessingTime();
|$resultTerm = $SQL_TIMESTAMP.fromEpochMillis(
| $contextTerm.timerService().currentProcessingTime());
|""".stripMargin.trim
// the proctime has been materialized, so it's TIMESTAMP now, not PROCTIME_INDICATOR
GeneratedExpression(resultTerm, NEVER_NULL, resultCode, new TimestampType(3))
GeneratedExpression(resultTerm, NEVER_NULL, resultCode, resultType)
}

def generateCurrentTimestamp(
Expand All @@ -455,13 +465,15 @@ object GenerateUtils {
def generateRowtimeAccess(
ctx: CodeGeneratorContext,
contextTerm: String): GeneratedExpression = {
val resultType = new TimestampType(true, TimestampKind.ROWTIME, 3)
val resultTypeTerm = primitiveTypeTermForType(resultType)
val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
("Long", "result"),
(resultTypeTerm, "result"),
("boolean", "isNull"))

val accessCode =
s"""
|$resultTerm = $contextTerm.timestamp();
|$resultTerm = $SQL_TIMESTAMP.fromEpochMillis($contextTerm.timestamp());
|if ($resultTerm == null) {
| throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
| "proper TimestampAssigner is defined and the stream environment uses the EventTime " +
Expand All @@ -474,7 +486,7 @@ object GenerateUtils {
resultTerm,
nullTerm,
accessCode,
new TimestampType(true, TimestampKind.ROWTIME, 3))
resultType)
}

/**
Expand Down Expand Up @@ -655,8 +667,7 @@ object GenerateUtils {
leftTerm: String,
rightTerm: String): String = t.getTypeRoot match {
case BOOLEAN => s"($leftTerm == $rightTerm ? 0 : ($leftTerm ? 1 : -1))"
case DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
case DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
case _ if PlannerTypeUtils.isPrimitive(t) =>
s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,15 @@ class MatchCodeGenerator(
}

private def generateProctimeTimestamp(): GeneratedExpression = {
val resultTerm = ctx.addReusableLocalVariable("long", "result")
val resultType = new TimestampType(3)
val resultTypeTerm = primitiveTypeTermForType(resultType)
val resultTerm = ctx.addReusableLocalVariable(resultTypeTerm, "result")
val resultCode =
s"""
|$resultTerm = $contextTerm.currentProcessingTime();
|$resultTerm = $SQL_TIMESTAMP.fromEpochMillis($contextTerm.currentProcessingTime());
|""".stripMargin.trim
// the proctime has been materialized, so it's TIMESTAMP now, not PROCTIME_INDICATOR
GeneratedExpression(resultTerm, NEVER_NULL, resultCode, new TimestampType(3))
GeneratedExpression(resultTerm, NEVER_NULL, resultCode, resultType)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object WatermarkGeneratorCodeGenerator {
if (${generatedExpr.nullTerm}) {
return null;
} else {
return ${generatedExpr.resultTerm};
return ${generatedExpr.resultTerm}.getMillisecond();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,20 +941,29 @@ class AggsHandlerCodeGenerator(
windowProperties: Seq[PlannerWindowProperty]): Seq[GeneratedExpression] = {
windowProperties.map {
case w: PlannerWindowStart =>
// return a Timestamp(Internal is long)
// return a Timestamp(Internal is SqlTimestamp)
GeneratedExpression(
s"$NAMESPACE_TERM.getStart()", "false", "", w.resultType)
s"$SQL_TIMESTAMP.fromEpochMillis($NAMESPACE_TERM.getStart())",
"false",
"",
w.resultType)
case w: PlannerWindowEnd =>
// return a Timestamp(Internal is long)
// return a Timestamp(Internal is SqlTimestamp)
GeneratedExpression(
s"$NAMESPACE_TERM.getEnd()", "false", "", w.resultType)
s"$SQL_TIMESTAMP.fromEpochMillis($NAMESPACE_TERM.getEnd())",
"false",
"",
w.resultType)
case r: PlannerRowtimeAttribute =>
// return a rowtime, use long as internal type
// return a rowtime, use SqlTimestamp as internal type
GeneratedExpression(
s"$NAMESPACE_TERM.getEnd() - 1", "false", "", r.resultType)
s"$SQL_TIMESTAMP.fromEpochMillis($NAMESPACE_TERM.getEnd() - 1)",
"false",
"",
r.resultType)
case p: PlannerProctimeAttribute =>
// ignore this property, it will be null at the position later
GeneratedExpression("-1L", "true", "", p.resultType)
GeneratedExpression(s"$SQL_TIMESTAMP.fromEpochMillis(-1L)", "true", "", p.resultType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
import org.apache.flink.table.planner.JLong
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, boxedTypeTermForType, newName}
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, SQL_TIMESTAMP, boxedTypeTermForType, newName}
import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
Expand All @@ -47,7 +47,6 @@ import org.apache.flink.table.runtime.util.RowIterator
import org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME
import org.apache.flink.table.types.logical._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot

import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
Expand Down Expand Up @@ -632,17 +631,17 @@ abstract class WindowCodeGenerator(
// get assigned window start timestamp
def windowProps(size: Expression) = {
val (startWValue, endWValue, rowTimeValue) = (
s"$currentWindowTerm.getStart()",
s"$currentWindowTerm.getEnd()",
s"$currentWindowTerm.maxTimestamp()")
s"$SQL_TIMESTAMP.fromEpochMillis($currentWindowTerm.getStart())",
s"$SQL_TIMESTAMP.fromEpochMillis($currentWindowTerm.getEnd())",
s"$SQL_TIMESTAMP.fromEpochMillis($currentWindowTerm.maxTimestamp())")
val start = if (startPos.isDefined) {
s"$propTerm.setLong($lastPos + ${startPos.get}, $startWValue);"
s"$propTerm.setTimestamp($lastPos + ${startPos.get}, $startWValue, 3);"
} else ""
val end = if (endPos.isDefined) {
s"$propTerm.setLong($lastPos + ${endPos.get}, $endWValue);"
s"$propTerm.setTimestamp($lastPos + ${endPos.get}, $endWValue, 3);"
} else ""
val rowTime = if (rowTimePos.isDefined) {
s"$propTerm.setLong($lastPos + ${rowTimePos.get}, $rowTimeValue);"
s"$propTerm.setTimestamp($lastPos + ${rowTimePos.get}, $rowTimeValue, 3);"
} else ""
(start, end, rowTime)
}
Expand Down
Loading

0 comments on commit c34de6d

Please sign in to comment.