Skip to content

Commit

Permalink
[FLINK-22737][table-common][table-planner-blink] Introduce CURRENT_WA…
Browse files Browse the repository at this point in the history
…TERMARK()

This closes apache#16046.
  • Loading branch information
Airblader authored and twalthr committed Jun 17, 2021
1 parent 0764893 commit 8bd215d
Show file tree
Hide file tree
Showing 14 changed files with 443 additions and 18 deletions.
10 changes: 9 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ temporal:
description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3)."
- sql: TO_TIMESTAMP(string1[, string2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') under the session time zone (specified by TableConfig) to a timestamp."
- sql: CURRENT_WATERMARK(rowtime)
description: |
Returns the current watermark for the given rowtime attribute, or NULL if no common watermark of all upstream operations is available at the current operation in the pipeline.
The return type of the function is inferred to match that of the provided rowtime attribute, but with an adjusted precision of 3. For example, if the rowtime attribute is TIMESTAMP_LTZ(9), the function will return TIMESTAMP_LTZ(3).
Note that this function can return NULL, and you may have to consider this case. For example, if you want to filter out late data you can use:
WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
conditional:
- sql: |
Expand Down Expand Up @@ -682,4 +690,4 @@ aggregate:
- sql: LAST_VALUE(expression)
description: Returns the last value in an ordered set of values.
- sql: LISTAGG(expression [, separator])
description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
28 changes: 24 additions & 4 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@

__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'UNBOUNDED_ROW',
'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 'current_time',
'current_timestamp', 'local_time', 'local_timestamp', 'temporal_overlaps',
'date_format', 'timestamp_diff', 'array', 'row', 'map_', 'row_interval', 'pi', 'e',
'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 'uuid', 'null_of',
'log', 'with_columns', 'without_columns', 'call', 'call_sql']
'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'call',
'call_sql']


def _leaf_op(op_name: str) -> Expression:
Expand Down Expand Up @@ -203,6 +204,25 @@ def current_timestamp() -> Expression:
return _leaf_op("currentTimestamp")


def current_watermark(rowtimeAttribute) -> Expression:
"""
Returns the current watermark for the given rowtime attribute, or NULL if no common watermark of
all upstream operations is available at the current operation in the pipeline.
The function returns the watermark with the same type as the rowtime attribute, but with an
adjusted precision of 3. For example, if the rowtime attribute is `TIMESTAMP_LTZ(9)`, the
function will return `TIMESTAMP_LTZ(3)`.
If no watermark has been emitted yet, the function will return `NULL`. Users must take care of
this when comparing against it, e.g. in order to filter out late data you can use
::
WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
"""
return _unary_op("currentWatermark", rowtimeAttribute)


def local_time() -> Expression:
"""
Returns the current SQL time in local time zone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,27 @@ public static ApiExpression currentTimestamp() {
return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
}

/**
* Returns the current watermark for the given rowtime attribute, or {@code NULL} if no common
* watermark of all upstream operations is available at the current operation in the pipeline.
*
* <p>The function returns the watermark with the same type as the rowtime attribute, but with
* an adjusted precision of 3. For example, if the rowtime attribute is {@link
* DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(9)}, the function will return {@link
* DataTypes#TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(3)}.
*
* <p>If no watermark has been emitted yet, the function will return {@code NULL}. Users must
* take care of this when comparing against it, e.g. in order to filter out late data you can
* use
*
* <pre>{@code
* WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
* }</pre>
*/
public static ApiExpression currentWatermark(Object rowtimeAttribute) {
return apiCall(BuiltInFunctionDefinitions.CURRENT_WATERMARK, rowtimeAttribute);
}

/**
* Returns the current SQL time in local time zone, the return type of this expression is {@link
* DataTypes#TIME()}, this is a synonym for {@link Expressions#currentTime()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ trait ImplicitExpressionConversions {
Expressions.currentTimestamp()
}

/**
* Returns the current watermark for the given rowtime attribute, or `NULL` if no common watermark
* of all upstream operations is available at the current operation in the pipeline.
*
* The function returns the watermark with the same type as the rowtime attribute, but with
* an adjusted precision of 3. For example, if the rowtime attribute is
* [[DataTypes.TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(9)]], the function will return
* [[DataTypes.TIMESTAMP_LTZ(int) TIMESTAMP_LTZ(3)]].
*
* If no watermark has been emitted yet, the function will return `NULL`. Users must take care of
* this when comparing against it, e.g. in order to filter out late data you can use
*
* {{{
* WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts)
* }}}
*/
def currentWatermark(rowtimeAttribute: Expression): Expression = {
Expressions.currentWatermark(rowtimeAttribute)
}

/**
* Returns the current SQL time in local time zone,
* the return type of this expression is [[DataTypes.TIME]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(TypeStrategies.MISSING)
.build();

public static final BuiltInFunctionDefinition CURRENT_WATERMARK =
BuiltInFunctionDefinition.newBuilder()
.name("CURRENT_WATERMARK")
.kind(SCALAR)
.inputTypeStrategy(InputTypeStrategies.SPECIFIC_FOR_CURRENT_WATERMARK)
.outputTypeStrategy(TypeStrategies.CURRENT_WATERMARK)
.notDeterministic()
.runtimeProvided()
.build();

// --------------------------------------------------------------------------------------------
// Over window
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CompositeArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CurrentWatermarkInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
Expand Down Expand Up @@ -338,6 +339,10 @@ public static InputTypeStrategy commonType(int count) {
public static final InputTypeStrategy TWO_EQUALS_COMPARABLE =
comparable(ConstantArgumentCount.of(2), StructuredComparision.EQUALS);

/** Strategy specific for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK}. */
public static final InputTypeStrategy SPECIFIC_FOR_CURRENT_WATERMARK =
new CurrentWatermarkInputTypeStrategy();

// --------------------------------------------------------------------------------------------

private InputTypeStrategies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.strategies.CommonTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ExplicitTypeStrategy;
Expand Down Expand Up @@ -434,6 +435,24 @@ public static TypeStrategy varyingString(TypeStrategy initialStrategy) {
return Optional.of(timestampDataType);
};

/**
* Type strategy for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK} which mirrors the type
* of the passed rowtime column, but removes the rowtime kind and enforces the correct precision
* for watermarks.
*/
public static final TypeStrategy CURRENT_WATERMARK =
callContext -> {
final LogicalType inputType =
callContext.getArgumentDataTypes().get(0).getLogicalType();
if (hasRoot(inputType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
return Optional.of(DataTypes.TIMESTAMP(3));
} else if (hasRoot(inputType, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return Optional.of(DataTypes.TIMESTAMP_LTZ(3));
}

return Optional.empty();
};

/**
* Type strategy specific for aggregations that partially produce different nullability
* depending whether the result is grouped or not.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#CURRENT_WATERMARK}.
*
* <p>It expects a single argument representing a rowtime attribute.
*/
@Internal
public class CurrentWatermarkInputTypeStrategy implements InputTypeStrategy {

@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.of(1);
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();

final DataType dataType = argumentDataTypes.get(0);
if (!LogicalTypeChecks.canBeTimeAttributeType(dataType.getLogicalType())) {
if (throwOnFailure) {
throw callContext.newValidationError(
"CURRENT_WATERMARK() must be called with a single rowtime attribute argument, but '%s' cannot be a time attribute.",
dataType.getLogicalType().asSummaryString());
}

return Optional.empty();
}

if (!LogicalTypeChecks.isRowtimeAttribute(dataType.getLogicalType())) {
if (throwOnFailure) {
throw callContext.newValidationError(
"The argument of CURRENT_WATERMARK() must be a rowtime attribute, but was '%s'.",
dataType.getLogicalType().asSummaryString());
}

return Optional.empty();
}

return Optional.of(Collections.singletonList(dataType));
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
final List<Signature> signatures = new ArrayList<>();
signatures.add(createExpectedSignature(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE));
signatures.add(createExpectedSignature(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE));
return signatures;
}

private static Signature createExpectedSignature(LogicalTypeRoot typeRoot) {
final String argument = String.format("<%s *%s*>", typeRoot, TimestampKind.ROWTIME);
return Signature.of(Signature.Argument.of(argument));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,56 +178,56 @@ private TestSpec(@Nullable String description, InputTypeStrategy strategy) {
this.strategy = strategy;
}

static TestSpec forStrategy(InputTypeStrategy strategy) {
public static TestSpec forStrategy(InputTypeStrategy strategy) {
return new TestSpec(null, strategy);
}

static TestSpec forStrategy(String description, InputTypeStrategy strategy) {
public static TestSpec forStrategy(String description, InputTypeStrategy strategy) {
return new TestSpec(description, strategy);
}

TestSpec namedArguments(String... names) {
public TestSpec namedArguments(String... names) {
this.namedArguments = Arrays.asList(names);
return this;
}

TestSpec typedArguments(DataType... dataTypes) {
public TestSpec typedArguments(DataType... dataTypes) {
this.typedArguments = Arrays.asList(dataTypes);
return this;
}

TestSpec surroundingStrategy(InputTypeStrategy surroundingStrategy) {
public TestSpec surroundingStrategy(InputTypeStrategy surroundingStrategy) {
this.surroundingStrategy = surroundingStrategy;
return this;
}

TestSpec calledWithArgumentTypes(AbstractDataType<?>... dataTypes) {
public TestSpec calledWithArgumentTypes(AbstractDataType<?>... dataTypes) {
this.actualArgumentTypes.add(resolveDataTypes(dataTypes));
return this;
}

TestSpec calledWithLiteralAt(int pos) {
public TestSpec calledWithLiteralAt(int pos) {
this.literalPos = pos;
return this;
}

TestSpec calledWithLiteralAt(int pos, Object value) {
public TestSpec calledWithLiteralAt(int pos, Object value) {
this.literalPos = pos;
this.literalValue = value;
return this;
}

TestSpec expectSignature(String signature) {
public TestSpec expectSignature(String signature) {
this.expectedSignature = signature;
return this;
}

TestSpec expectArgumentTypes(AbstractDataType<?>... dataTypes) {
public TestSpec expectArgumentTypes(AbstractDataType<?>... dataTypes) {
this.expectedArgumentTypes = resolveDataTypes(dataTypes);
return this;
}

TestSpec expectErrorMessage(String expectedErrorMessage) {
public TestSpec expectErrorMessage(String expectedErrorMessage) {
this.expectedErrorMessage = expectedErrorMessage;
return this;
}
Expand Down
Loading

0 comments on commit 8bd215d

Please sign in to comment.