Skip to content

Commit

Permalink
[FLINK-15421][table-planner-blink] Fix cast exception for timestamp M…
Browse files Browse the repository at this point in the history
…AX/MIN retract aggregate functions

This makes TimestampMaxAggFunction and TimestampMinAggFunction to accept SqlTimestamp values and also support high precision for timestamp types. 

This closes apache#10722
  • Loading branch information
docete authored and wuchong committed Dec 31, 2019
1 parent 9dc2528 commit ba44335
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.SqlTimestamp;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo;
import org.apache.flink.table.runtime.typeutils.SqlTimestampTypeInfo;

import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -367,13 +368,27 @@ protected TypeInformation<BinaryString> getValueTypeInfo() {
/**
* Built-in Timestamp Max with retraction aggregate function.
*/
public static class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction<Timestamp> {
public static class TimestampMaxWithRetractAggFunction extends MaxWithRetractAggFunction<SqlTimestamp> {

private static final long serialVersionUID = -7096481949093142944L;

private final int precision;

public TimestampMaxWithRetractAggFunction(int precision) {
this.precision = precision;
}

public void accumulate(MaxWithRetractAccumulator<SqlTimestamp> acc, SqlTimestamp value) throws Exception {
super.accumulate(acc, value);
}

public void retract(MaxWithRetractAccumulator<SqlTimestamp> acc, SqlTimestamp value) throws Exception {
super.retract(acc, value);
}

@Override
protected TypeInformation<Timestamp> getValueTypeInfo() {
return Types.SQL_TIMESTAMP;
protected TypeInformation<SqlTimestamp> getValueTypeInfo() {
return new SqlTimestampTypeInfo(precision);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.SqlTimestamp;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo;
import org.apache.flink.table.runtime.typeutils.SqlTimestampTypeInfo;

import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -367,13 +368,27 @@ protected TypeInformation<BinaryString> getValueTypeInfo() {
/**
* Built-in Timestamp Min with retraction aggregate function.
*/
public static class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction<Timestamp> {
public static class TimestampMinWithRetractAggFunction extends MinWithRetractAggFunction<SqlTimestamp> {

private static final long serialVersionUID = -7494198823345305907L;

private final int precision;

public TimestampMinWithRetractAggFunction(int precision) {
this.precision = precision;
}

public void accumulate(MinWithRetractAccumulator<SqlTimestamp> acc, SqlTimestamp value) throws Exception {
super.accumulate(acc, value);
}

public void retract(MinWithRetractAccumulator<SqlTimestamp> acc, SqlTimestamp value) throws Exception {
super.retract(acc, value);
}

@Override
protected TypeInformation<Timestamp> getValueTypeInfo() {
return Types.SQL_TIMESTAMP;
protected TypeInformation<SqlTimestamp> getValueTypeInfo() {
return new SqlTimestampTypeInfo(precision);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.{SqlAggFunction, SqlKind, SqlRankFunction}
import java.util

import org.apache.flink.table.dataformat.SqlTimestamp

import scala.collection.JavaConversions._

/**
Expand Down Expand Up @@ -308,10 +306,9 @@ class AggFunctionFactory(
new TimeMinWithRetractAggFunction
case DATE =>
new DateMinWithRetractAggFunction
case TIMESTAMP_WITHOUT_TIME_ZONE
if SqlTimestamp.isCompact(argTypes(0).asInstanceOf[TimestampType].getPrecision) =>
// TODO: support TimestampMinWithRetractAggFunction for high precision timestamp
new TimestampMinWithRetractAggFunction
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new TimestampMinWithRetractAggFunction(d.getPrecision)
case t =>
throw new TableException(s"Min with retract aggregate function does not " +
s"support type: ''$t''.\nPlease re-check the data type.")
Expand Down Expand Up @@ -413,10 +410,9 @@ class AggFunctionFactory(
new TimeMaxWithRetractAggFunction
case DATE =>
new DateMaxWithRetractAggFunction
case TIMESTAMP_WITHOUT_TIME_ZONE
if SqlTimestamp.isCompact(argTypes(0).asInstanceOf[TimestampType].getPrecision) =>
// TODO: support TimestampMaxWithRetractAggFunction for high precision timestamp
new TimestampMaxWithRetractAggFunction
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new TimestampMaxWithRetractAggFunction(d.getPrecision)
case t =>
throw new TableException(s"Max with retract aggregate function does not " +
s"support type: ''$t''.\nPlease re-check the data type.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.SqlTimestamp;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.BooleanMaxWithRetractAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.ByteMaxWithRetractAggFunction;
Expand All @@ -42,7 +43,6 @@
import java.lang.reflect.Method;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -251,17 +251,17 @@ public static List<AggFunctionTestSpec> testData() {
)
),
/**
* Test for TimestampMaxWithRetractAggFunction.
* Test for TimestampMaxWithRetractAggFunction with millisecond's precision.
*/
new AggFunctionTestSpec<>(
new TimestampMaxWithRetractAggFunction(),
new TimestampMaxWithRetractAggFunction(3),
Arrays.asList(
Arrays.asList(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
SqlTimestamp.fromEpochMillis(0),
SqlTimestamp.fromEpochMillis(1000),
SqlTimestamp.fromEpochMillis(100),
null,
new Timestamp(10)
SqlTimestamp.fromEpochMillis(10)
),
Arrays.asList(
null,
Expand All @@ -272,13 +272,46 @@ public static List<AggFunctionTestSpec> testData() {
),
Arrays.asList(
null,
new Timestamp(1)
SqlTimestamp.fromEpochMillis(1)
)
),
Arrays.asList(
new Timestamp(1000),
SqlTimestamp.fromEpochMillis(1000),
null,
new Timestamp(1)
SqlTimestamp.fromEpochMillis(1)
)
),
/**
* Test for TimestampMaxWithRetractAggFunction with nanosecond's precision.
*/
new AggFunctionTestSpec<>(
new TimestampMaxWithRetractAggFunction(9),
Arrays.asList(
Arrays.asList(
SqlTimestamp.fromEpochMillis(0, 0),
SqlTimestamp.fromEpochMillis(1000, 0),
SqlTimestamp.fromEpochMillis(1000, 1),
SqlTimestamp.fromEpochMillis(100, 0),
null,
SqlTimestamp.fromEpochMillis(10, 0)
),
Arrays.asList(
null,
null,
null,
null,
null
),
Arrays.asList(
null,
SqlTimestamp.fromEpochMillis(1, 0),
SqlTimestamp.fromEpochMillis(1, 1)
)
),
Arrays.asList(
SqlTimestamp.fromEpochMillis(1000, 1),
null,
SqlTimestamp.fromEpochMillis(1, 1)
)
),
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.dataformat.SqlTimestamp;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFunction.BooleanMinWithRetractAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFunction.ByteMinWithRetractAggFunction;
Expand All @@ -42,7 +43,6 @@
import java.lang.reflect.Method;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -251,17 +251,17 @@ public static List<AggFunctionTestSpec> testData() {
)
),
/**
* Test for TimestampMinWithRetractAggFunction.
* Test for TimestampMinWithRetractAggFunction with millisecond's precision.
*/
new AggFunctionTestSpec<>(
new TimestampMinWithRetractAggFunction(),
new TimestampMinWithRetractAggFunction(3),
Arrays.asList(
Arrays.asList(
new Timestamp(0),
new Timestamp(1000),
new Timestamp(100),
SqlTimestamp.fromEpochMillis(0),
SqlTimestamp.fromEpochMillis(1000),
SqlTimestamp.fromEpochMillis(100),
null,
new Timestamp(10)
SqlTimestamp.fromEpochMillis(10)
),
Arrays.asList(
null,
Expand All @@ -272,15 +272,49 @@ public static List<AggFunctionTestSpec> testData() {
),
Arrays.asList(
null,
new Timestamp(1)
SqlTimestamp.fromEpochMillis(1)
)
),
Arrays.asList(
new Timestamp(0),
SqlTimestamp.fromEpochMillis(0),
null,
new Timestamp(1)
SqlTimestamp.fromEpochMillis(1)
)
),
/**
* Test for TimestampMinWithRetractAggFunction with nanosecond's precision.
*/
new AggFunctionTestSpec<>(
new TimestampMinWithRetractAggFunction(9),
Arrays.asList(
Arrays.asList(
SqlTimestamp.fromEpochMillis(0, 1),
SqlTimestamp.fromEpochMillis(0, 2),
SqlTimestamp.fromEpochMillis(1000, 0),
SqlTimestamp.fromEpochMillis(100, 0),
null,
SqlTimestamp.fromEpochMillis(10, 0)
),
Arrays.asList(
null,
null,
null,
null,
null
),
Arrays.asList(
null,
SqlTimestamp.fromEpochMillis(1, 1),
SqlTimestamp.fromEpochMillis(1, 2)
)
),
Arrays.asList(
SqlTimestamp.fromEpochMillis(0, 1),
null,
SqlTimestamp.fromEpochMillis(1, 1)
)
),

/**
* Test for DateMinWithRetractAggFunction.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,25 @@ class TimestampITCase extends StreamingTestBase{
)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

@Test
def testMaxMinWithRetractOnTimestamp(): Unit = {
val sink = new TestingRetractSink()
tEnv.sqlQuery(
s"""
|SELECT MAX(y), MIN(x)
|FROM
| (SELECT b, MAX(c) AS x, MIN(c) AS y FROM T GROUP BY b, c)
|GROUP BY b
""".stripMargin)
.toRetractStream[Row].addSink(sink)
env.execute()
val expected = Seq(
"1969-01-01T00:00:00.123456789,1969-01-01T00:00:00.123456789",
"1970-01-01T00:00:00.123,1970-01-01T00:00:00.123",
"1970-01-01T00:00:00.123456,1970-01-01T00:00:00.123456",
"null,null"
)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,46 @@ class WindowAggregateITCase(mode: StateBackendMode)
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}

@Test
def testMinMaxWithTumblingWindow(): Unit = {
val stream = failingDataSource(data)
.assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset[(
Long, Int, Double, Float, BigDecimal, String, String)](10L))
val table =
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
tEnv.registerTable("T1", table)
tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")

val sql =
"""
|SELECT
| MAX(max_ts),
| MIN(min_ts),
| `string`
|FROM(
| SELECT
| `string`,
| `int`,
| MAX(rowtime) as max_ts,
| MIN(rowtime) as min_ts
| FROM T1
| GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
|GROUP BY `string`
""".stripMargin
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()
val expected = Seq(
"1970-01-01T00:00:00.001,1970-01-01T00:00:00.001,Hi",
"1970-01-01T00:00:00.002,1970-01-01T00:00:00.002,Hallo",
"1970-01-01T00:00:00.007,1970-01-01T00:00:00.003,Hello",
"1970-01-01T00:00:00.016,1970-01-01T00:00:00.008,Hello world",
"1970-01-01T00:00:00.032,1970-01-01T00:00:00.032,null")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

private def withLateFireDelay(tableConfig: TableConfig, interval: Time): Unit = {
val intervalInMillis = interval.toMilliseconds
val preLateFireInterval = getMillisecondFromConfigDuration(tableConfig,
Expand Down
Loading

0 comments on commit ba44335

Please sign in to comment.