-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-14322][table-api] Add watermark information in TableSchema #9994
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 1221469 (Wed Dec 04 15:05:40 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
cc @danny0405 |
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
Hi @KurtYoung , do you have time to have a look at this? |
|
||
if (fieldNames.length != fieldDataTypes.length) { | ||
throw new TableException( | ||
throw new ValidationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can move such sanity check to Builder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableSchema
still have a public constructor, even though it is deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to move all the checks to builder, these sanity check should not happen in the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said above, TableSchema
still have a public constructor.
|
||
// collect indices | ||
fieldNameToIndex.put(fieldName, i); | ||
DataType fieldType = Preconditions.checkNotNull(fieldDataTypes[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, since we have a builder for TableSchema
, I think we can move all these basic check into Builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same reasone as above. I think we can do this once the public constructor is removed.
"List of all fields: " + Arrays.toString(fieldNames)); | ||
} | ||
|
||
private void validateAndCreateNameTypeMapping(String fieldName, DataType fieldType, List<String> parentNames) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<String> parentNames
can be a simple string indicating the fieldNamePrefix.
} else { | ||
uniqueNames.add(fieldName); | ||
// validate watermark and rowtime attribute | ||
Preconditions.checkState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already make sure about this in Builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think we can remove this one.
LogicalType watermarkOutputType = watermark.getWatermarkStrategy() | ||
.getOutputDataType().getLogicalType(); | ||
if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE && | ||
watermarkOutputType.getTypeRoot() != BIGINT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why relaxing BIGINT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this. We can introduce this when we can support BIGINT as the watermark generation return type end-to-end.
@@ -224,13 +254,31 @@ public DataType toRowDataType() { | |||
return (TypeInformation<Row>) fromDataTypeToLegacyInfo(toRowDataType()); | |||
} | |||
|
|||
/** | |||
* Returns an optional of the watermark specification which contains rowtime attribute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an optional
-> a list
* @param rowtimeAttribute the field name as a rowtime attribute, can be a nested field using dot separator. | ||
* @param watermarkStrategy the watermark strategy expression | ||
*/ | ||
public Builder watermark(String rowtimeAttribute, ResolvedExpression watermarkStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user using this Builder
to manually specify watermark, how can he get ResolvedExpression
. AFAIK if he uses scala dsl, it can only generate UnresolvedCallExpression
or UnresolvedReferenceExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. You are right. There are 2 reasons here:
- we can't get
ExpressionResolver
in builder to resolve expression. TableSchema.Builder
is going to be internally used only. As we are planning to remove {{TableSource#getTableSchema}}, users should don't need to createTableSchema
anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an @Internal
annotation on this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So besides from DDL way, how to user specify watermark programmatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the Schema
descriptor API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we agree that Expression
would make the interface more uniform(can be used by table api), can we use just Expression
as the argument specification ? Then we can eliminate use to use the SqlExpression
for current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is there are two major types of Expression
now, one is unresolved and another one is resolved. We will only have interface (like Java DSL style) for unresolved expression. But the issue is we need some other information which only accessible from resolved one, like data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed in FLIP-80, we will use the serialize (resolved) expression to string, this happens when serialize CatalogTable to properties. And deserialize string to unresolved expression, this happens when create CatalogTable
from properties in Catalog
. However, we missed that converting unresovled expression to resolved expression requires ExpressionResolver
which is not held by Catalog
. If the TableSchema builder requires ResolvedExpression
, it's impossible to create a TableSchema
or CatalogTable
for Catalog
.
Let me sum up, I agree with @KurtYoung that using ResolveExpression
here may can't work in the future, so I would like change it to :
public Builder watermark(String rowtimeAttribute, String watermarkExpression, DataType watermarkExprOutputType) {
The ResolvedExpression getWatermarkStrategy()
in WatermarkSpec
will be removed, and will expose String getWatermarkExpressionString()
and DataType getWatermarkExprOutputType()
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using plain strings in Java classes should be avoided in general. This is not very object oriented. I understand that constructing ResolvedExpression
might be difficult at certain locations but we could also use just Expression
. Actually, a SQL string is also just an unresolved expression until it has been parsed and validated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine to use Expression
here and introduce an unresolved SqlExpression
to wrap it.
So does the following signature looks good to you?
public Builder watermark(String rowtimeAttribute, Expression watermarkExpression, DataType watermarkExprOutputType) {
BTW, there are still some testing failures. |
@KurtYoung Thanks for reviewing , I updated the pull request. |
@@ -54,44 +58,64 @@ | |||
|
|||
private final DataType[] fieldDataTypes; | |||
|
|||
private final Map<String, Integer> fieldNameToIndex; | |||
/** Mapping from qualified field name to (nested) field type. */ | |||
private final Map<String, DataType> typesByName; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fieldNameToType
?
|
||
if (fieldNames.length != fieldDataTypes.length) { | ||
throw new TableException( | ||
throw new ValidationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to move all the checks to builder, these sanity check should not happen in the constructor.
String fullFieldName = prefixQualifiedName.isEmpty() ? fieldName : prefixQualifiedName + "." + fieldName; | ||
DataType oldValue = typesByName.put(fullFieldName, fieldType); | ||
if (oldValue != null) { | ||
throw new ValidationException("Field names must be unique. Duplicate field: '" + fullFieldName + "'"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oldValue -> oldType
} | ||
|
||
private void validateAndCreateNameTypeMapping(String fieldName, DataType fieldType, String prefixQualifiedName) { | ||
String fullFieldName = prefixQualifiedName.isEmpty() ? fieldName : prefixQualifiedName + "." + fieldName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define another method probably named "generatesFieldName" to wrap the prefix name logic ? It's hard to infer that a "prefixQualifiedName" means.
BTW, i think we can just name this method as "createNameToTypeMapping"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic of full name generation is simple (just one line), there is no need to extract a method for it. I can add some javadocs to describe the meaning of prefixQualifiedName
.
This method is not just creating a mapping, but also does some validation. I think having a "validate" in the method name will be easier to understand.
@@ -157,11 +181,11 @@ public TableSchema copy() { | |||
/** | |||
* Returns the specified data type for the given field name. | |||
* | |||
* @param fieldName the name of the field | |||
* @param fieldName the name of the field, the field name can be a qualified name, e.g. "f0.q1" | |||
*/ | |||
public Optional<DataType> getFieldDataType(String fieldName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"f0.q1" is a little confusing, maybe "record_name.field_name" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what's the difference between record_name and field name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to field1.innerField2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
* @param rowtimeAttribute the field name as a rowtime attribute, can be a nested field using dot separator. | ||
* @param watermarkStrategy the watermark strategy expression | ||
*/ | ||
public Builder watermark(String rowtimeAttribute, ResolvedExpression watermarkStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we agree that Expression
would make the interface more uniform(can be used by table api), can we use just Expression
as the argument specification ? Then we can eliminate use to use the SqlExpression
for current implementation.
* The referenced attribute must be present in the {@link TableSchema} and of | ||
* type {@link DataTypes#TIMESTAMP(int)}. | ||
*/ | ||
public String getRowtimeAttribute() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataTypes#TIMESTAMP(int)
is just a tool method and not a "type".
|
||
/** | ||
* Expression that wraps a SQL expression string, e.g. {@code date_format(`f0`, 'yyyy-MM-dd')}. | ||
* The SQL expression string can be used as serialized string and can be used for storing in Catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"e.g. {@code date_format(f0
, 'yyyy-MM-dd')}" confused me a lot ? Is it a format ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a javadoc DSL like `` in markdown.
} | ||
|
||
def createRow(innerFieldName: String): TypeInformation[_] = { | ||
Types.ROW( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these 2 methods are only used by this test case, maybe we can eliminate them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 methods are inner methods and used by multiple times. If we remove them, there will be some code duplication here.
Hi @danny0405 @KurtYoung , I updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, i left two minor comments.
|
||
public WatermarkSpec(String rowtimeAttribute, String watermarkExpressionString, DataType watermarkExprOutputType) { | ||
this.rowtimeAttribute = checkNotNull(rowtimeAttribute); | ||
this.watermarkExpressionString = watermarkExpressionString; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNull?
* from {@link ResolvedExpression} to SQL string. | ||
*/ | ||
@Internal | ||
public class SqlExpression implements ResolvedExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary class in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I will introduce it in the following PR.
} | ||
|
||
/** | ||
* Returns a deep copy of the table schema. | ||
*/ | ||
public TableSchema copy() { | ||
return new TableSchema(fieldNames.clone(), fieldDataTypes.clone()); | ||
return new TableSchema(fieldNames.clone(), fieldDataTypes.clone(), new ArrayList<>(watermarkSpecs)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just clone the watermarkSpecs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because WatermarkSpec is an immutable object. We don't need to clone an immutable object just like String
.
@@ -157,11 +181,11 @@ public TableSchema copy() { | |||
/** | |||
* Returns the specified data type for the given field name. | |||
* | |||
* @param fieldName the name of the field | |||
* @param fieldName the name of the field, the field name can be a qualified name, e.g. "f0.q1" | |||
*/ | |||
public Optional<DataType> getFieldDataType(String fieldName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
*/ | ||
private void validateAndCreateNameToTypeMapping(String fieldName, DataType fieldType, String parentFieldName) { | ||
String fullFieldName = parentFieldName.isEmpty() ? fieldName : parentFieldName + "." + fieldName; | ||
DataType oldType = fieldNameToType.put(fullFieldName, fieldType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass null as parentFieldName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't pass null
. If it can accept nullable String, then the parameter should be annotated @Nullable
, otherwise, it is not null.
Besides, it is only internally used and we guaranteed that.
* not be validated by {@link TableSchema}. | ||
*/ | ||
public Builder watermark(String rowtimeAttribute, String watermarkExpressionString, DataType watermarkExprOutputType) { | ||
Preconditions.checkNotNull(rowtimeAttribute); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
watermarkExpressionString
-> watermarkExpr
, the type already indicates that it is a String;
watermarkExprOutputType
-> watermarkExprType
obviously the type of a expression is it's output type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't run out of characters. A better field name describes everything. If we use watermarkExpr
, some users may guess it is an Expression
but is a String
.
* Watermark metadata defined in {@link TableSchema}. It mainly contains two parts, | ||
* 1) the rowtime attribute. | ||
* 2) the watermark generation strategy. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use a ol
list to list these doc items.
private final String rowtimeAttribute; | ||
|
||
private final String watermarkExpressionString; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just comment all these members ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can copy the javadoc to the members here, but I think it is not necessary. Because the javadoc of the getter methods explain them in detail. These private members are invisible to users.
* The string representation is a qualified SQL expression string (UDFs are expanded). | ||
*/ | ||
public String getWatermarkExpressionString() { | ||
return watermarkExpressionString; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getWatermarkExpression
is okey i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A String
in the method name explicitly declares that the return value is a string representation instead of Expression
. It is weird to return String but the method name says it is a expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
d244b4f
to
1221469
Compare
Travis passed: https://travis-ci.com/flink-ci/flink/builds/134547319 |
What is the purpose of the change
As discussed in FLIP-66, the watermark information should be part of
TableSchema
, and expose to connectors visCatalogTable#getTableSchema
.We may need to introduce a WatermarkSpec class to describe watermark information.
Brief change log
WatermarkSpec
class to describe watermark information.TableSchema#getWatermarkSpecs()
returnsList<WatermarkSpec>
, because we may support multiple watermarks in the future. This is for compatibility purpose. Currently, there is at most one watermark spec in the list.SqlExpression
which is an internalResolvedExpression
and wraps a SQL expression string. As discussed in FLIP-80 ML, we are planning to use SQL format as the string representation ofExpression
. However, the path ofExpression
->RexNode
->SqlNode
->String
still is not fully supported, and there are manay problems that need to be fixed in this path. SoSqlExpression
is a temporary solution to quickly support FLIP-66 as all the watermark expressions come from SQL DDL.Verifying this change
This change added tests:
TableSchemaTest
inflink-table-common
to verify watermark relative logic inTableSchema
.TableSchemaTest
inflink-table-planner
because of class name conflict, and move the tests from it toTableEnvironmentTest
andTableSchemaTest
inflink-table-common
.testTableSchema
inDescriptorPropertiesTest
to verify the generated properties key-value string, and the derivedTableSchema
from properteis.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation