Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14322][table-api] Add watermark information in TableSchema #9994

Merged
merged 1 commit into from
Nov 1, 2019

Conversation

wuchong
Copy link
Member

@wuchong wuchong commented Oct 25, 2019

What is the purpose of the change

As discussed in FLIP-66, the watermark information should be part of TableSchema, and expose to connectors vis CatalogTable#getTableSchema.

We may need to introduce a WatermarkSpec class to describe watermark information.

Brief change log

  • Introduce WatermarkSpec class to describe watermark information.
  • TableSchema#getWatermarkSpecs() returns List<WatermarkSpec>, because we may support multiple watermarks in the future. This is for compatibility purpose. Currently, there is at most one watermark spec in the list.
  • Introduce SqlExpression which is an internal ResolvedExpression and wraps a SQL expression string. As discussed in FLIP-80 ML, we are planning to use SQL format as the string representation of Expression. However, the path of Expression -> RexNode -> SqlNode -> String still is not fully supported, and there are manay problems that need to be fixed in this path. So SqlExpression is a temporary solution to quickly support FLIP-66 as all the watermark expressions come from SQL DDL.
  • The properties representation of watermark information:
"schema.watermark.0.rowtime" -> "f1.q2"
"schema.watermark.0.strategy.expression" -> "`f1`.`q2` - INTERVAL '5' SECOND"
"schema.watermark.0.strategy.datatype" -> "TIMESTAMP(3)"

Verifying this change

This change added tests:

  • Adds TableSchemaTest in flink-table-common to verify watermark relative logic in TableSchema.
  • Remove TableSchemaTest in flink-table-planner because of class name conflict, and move the tests from it to TableEnvironmentTest and TableSchemaTest in flink-table-common.
  • Adds testTableSchema in DescriptorPropertiesTest to verify the generated properties key-value string, and the derived TableSchema from properteis.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 25, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 1221469 (Wed Dec 04 15:05:40 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@wuchong
Copy link
Member Author

wuchong commented Oct 25, 2019

cc @danny0405

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 25, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@wuchong
Copy link
Member Author

wuchong commented Oct 30, 2019

Hi @KurtYoung , do you have time to have a look at this?


if (fieldNames.length != fieldDataTypes.length) {
throw new TableException(
throw new ValidationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move such sanity check to Builder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableSchema still have a public constructor, even though it is deprecated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to move all the checks to builder, these sanity check should not happen in the constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, TableSchema still have a public constructor.


// collect indices
fieldNameToIndex.put(fieldName, i);
DataType fieldType = Preconditions.checkNotNull(fieldDataTypes[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, since we have a builder for TableSchema, I think we can move all these basic check into Builder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reasone as above. I think we can do this once the public constructor is removed.

"List of all fields: " + Arrays.toString(fieldNames));
}

private void validateAndCreateNameTypeMapping(String fieldName, DataType fieldType, List<String> parentNames) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<String> parentNames can be a simple string indicating the fieldNamePrefix.

} else {
uniqueNames.add(fieldName);
// validate watermark and rowtime attribute
Preconditions.checkState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already make sure about this in Builder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we can remove this one.

LogicalType watermarkOutputType = watermark.getWatermarkStrategy()
.getOutputDataType().getLogicalType();
if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE &&
watermarkOutputType.getTypeRoot() != BIGINT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why relaxing BIGINT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this. We can introduce this when we can support BIGINT as the watermark generation return type end-to-end.

@@ -224,13 +254,31 @@ public DataType toRowDataType() {
return (TypeInformation<Row>) fromDataTypeToLegacyInfo(toRowDataType());
}

/**
* Returns an optional of the watermark specification which contains rowtime attribute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an optional -> a list

* @param rowtimeAttribute the field name as a rowtime attribute, can be a nested field using dot separator.
* @param watermarkStrategy the watermark strategy expression
*/
public Builder watermark(String rowtimeAttribute, ResolvedExpression watermarkStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user using this Builder to manually specify watermark, how can he get ResolvedExpression. AFAIK if he uses scala dsl, it can only generate UnresolvedCallExpression or UnresolvedReferenceExpression

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You are right. There are 2 reasons here:

  1. we can't get ExpressionResolver in builder to resolve expression.
  2. TableSchema.Builder is going to be internally used only. As we are planning to remove {{TableSource#getTableSchema}}, users should don't need to create TableSchema anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an @Internal annotation on this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So besides from DDL way, how to user specify watermark programmatically?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Schema descriptor API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree that Expression would make the interface more uniform(can be used by table api), can we use just Expression as the argument specification ? Then we can eliminate use to use the SqlExpression for current implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is there are two major types of Expression now, one is unresolved and another one is resolved. We will only have interface (like Java DSL style) for unresolved expression. But the issue is we need some other information which only accessible from resolved one, like data types.

Copy link
Member Author

@wuchong wuchong Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed in FLIP-80, we will use the serialize (resolved) expression to string, this happens when serialize CatalogTable to properties. And deserialize string to unresolved expression, this happens when create CatalogTable from properties in Catalog. However, we missed that converting unresovled expression to resolved expression requires ExpressionResolver which is not held by Catalog. If the TableSchema builder requires ResolvedExpression, it's impossible to create a TableSchema or CatalogTable for Catalog.

Let me sum up, I agree with @KurtYoung that using ResolveExpression here may can't work in the future, so I would like change it to :

public Builder watermark(String rowtimeAttribute, String watermarkExpression, DataType watermarkExprOutputType) {

The ResolvedExpression getWatermarkStrategy() in WatermarkSpec will be removed, and will expose String getWatermarkExpressionString() and DataType getWatermarkExprOutputType().

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using plain strings in Java classes should be avoided in general. This is not very object oriented. I understand that constructing ResolvedExpression might be difficult at certain locations but we could also use just Expression. Actually, a SQL string is also just an unresolved expression until it has been parsed and validated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to use Expression here and introduce an unresolved SqlExpression to wrap it.
So does the following signature looks good to you?

public Builder watermark(String rowtimeAttribute, Expression watermarkExpression, DataType watermarkExprOutputType) {

@KurtYoung
Copy link
Contributor

BTW, there are still some testing failures.

@wuchong
Copy link
Member Author

wuchong commented Oct 30, 2019

@KurtYoung Thanks for reviewing , I updated the pull request.

@@ -54,44 +58,64 @@

private final DataType[] fieldDataTypes;

private final Map<String, Integer> fieldNameToIndex;
/** Mapping from qualified field name to (nested) field type. */
private final Map<String, DataType> typesByName;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldNameToType ?


if (fieldNames.length != fieldDataTypes.length) {
throw new TableException(
throw new ValidationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to move all the checks to builder, these sanity check should not happen in the constructor.

String fullFieldName = prefixQualifiedName.isEmpty() ? fieldName : prefixQualifiedName + "." + fieldName;
DataType oldValue = typesByName.put(fullFieldName, fieldType);
if (oldValue != null) {
throw new ValidationException("Field names must be unique. Duplicate field: '" + fullFieldName + "'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oldValue -> oldType

}

private void validateAndCreateNameTypeMapping(String fieldName, DataType fieldType, String prefixQualifiedName) {
String fullFieldName = prefixQualifiedName.isEmpty() ? fieldName : prefixQualifiedName + "." + fieldName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define another method probably named "generatesFieldName" to wrap the prefix name logic ? It's hard to infer that a "prefixQualifiedName" means.

BTW, i think we can just name this method as "createNameToTypeMapping"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic of full name generation is simple (just one line), there is no need to extract a method for it. I can add some javadocs to describe the meaning of prefixQualifiedName.

This method is not just creating a mapping, but also does some validation. I think having a "validate" in the method name will be easier to understand.

@@ -157,11 +181,11 @@ public TableSchema copy() {
/**
* Returns the specified data type for the given field name.
*
* @param fieldName the name of the field
* @param fieldName the name of the field, the field name can be a qualified name, e.g. "f0.q1"
*/
public Optional<DataType> getFieldDataType(String fieldName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"f0.q1" is a little confusing, maybe "record_name.field_name" ?

Copy link
Member Author

@wuchong wuchong Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what's the difference between record_name and field name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to field1.innerField2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

* @param rowtimeAttribute the field name as a rowtime attribute, can be a nested field using dot separator.
* @param watermarkStrategy the watermark strategy expression
*/
public Builder watermark(String rowtimeAttribute, ResolvedExpression watermarkStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we agree that Expression would make the interface more uniform(can be used by table api), can we use just Expression as the argument specification ? Then we can eliminate use to use the SqlExpression for current implementation.

* The referenced attribute must be present in the {@link TableSchema} and of
* type {@link DataTypes#TIMESTAMP(int)}.
*/
public String getRowtimeAttribute() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataTypes#TIMESTAMP(int) is just a tool method and not a "type".


/**
* Expression that wraps a SQL expression string, e.g. {@code date_format(`f0`, 'yyyy-MM-dd')}.
* The SQL expression string can be used as serialized string and can be used for storing in Catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"e.g. {@code date_format(f0, 'yyyy-MM-dd')}" confused me a lot ? Is it a format ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a javadoc DSL like `` in markdown.

}

def createRow(innerFieldName: String): TypeInformation[_] = {
Types.ROW(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these 2 methods are only used by this test case, maybe we can eliminate them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 methods are inner methods and used by multiple times. If we remove them, there will be some code duplication here.

@wuchong
Copy link
Member Author

wuchong commented Nov 1, 2019

Hi @danny0405 @KurtYoung , I updated the PR.

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, i left two minor comments.


public WatermarkSpec(String rowtimeAttribute, String watermarkExpressionString, DataType watermarkExprOutputType) {
this.rowtimeAttribute = checkNotNull(rowtimeAttribute);
this.watermarkExpressionString = watermarkExpressionString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkNotNull?

* from {@link ResolvedExpression} to SQL string.
*/
@Internal
public class SqlExpression implements ResolvedExpression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary class in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I will introduce it in the following PR.

}

/**
* Returns a deep copy of the table schema.
*/
public TableSchema copy() {
return new TableSchema(fieldNames.clone(), fieldDataTypes.clone());
return new TableSchema(fieldNames.clone(), fieldDataTypes.clone(), new ArrayList<>(watermarkSpecs));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just clone the watermarkSpecs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because WatermarkSpec is an immutable object. We don't need to clone an immutable object just like String.

@@ -157,11 +181,11 @@ public TableSchema copy() {
/**
* Returns the specified data type for the given field name.
*
* @param fieldName the name of the field
* @param fieldName the name of the field, the field name can be a qualified name, e.g. "f0.q1"
*/
public Optional<DataType> getFieldDataType(String fieldName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

*/
private void validateAndCreateNameToTypeMapping(String fieldName, DataType fieldType, String parentFieldName) {
String fullFieldName = parentFieldName.isEmpty() ? fieldName : parentFieldName + "." + fieldName;
DataType oldType = fieldNameToType.put(fullFieldName, fieldType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass null as parentFieldName ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't pass null. If it can accept nullable String, then the parameter should be annotated @Nullable, otherwise, it is not null.
Besides, it is only internally used and we guaranteed that.

* not be validated by {@link TableSchema}.
*/
public Builder watermark(String rowtimeAttribute, String watermarkExpressionString, DataType watermarkExprOutputType) {
Preconditions.checkNotNull(rowtimeAttribute);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watermarkExpressionString -> watermarkExpr, the type already indicates that it is a String;
watermarkExprOutputType -> watermarkExprType obviously the type of a expression is it's output type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't run out of characters. A better field name describes everything. If we use watermarkExpr, some users may guess it is an Expression but is a String.

* Watermark metadata defined in {@link TableSchema}. It mainly contains two parts,
* 1) the rowtime attribute.
* 2) the watermark generation strategy.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a ol list to list these doc items.

private final String rowtimeAttribute;

private final String watermarkExpressionString;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just comment all these members ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can copy the javadoc to the members here, but I think it is not necessary. Because the javadoc of the getter methods explain them in detail. These private members are invisible to users.

* The string representation is a qualified SQL expression string (UDFs are expanded).
*/
public String getWatermarkExpressionString() {
return watermarkExpressionString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getWatermarkExpression is okey i think.

Copy link
Member Author

@wuchong wuchong Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A String in the method name explicitly declares that the return value is a string representation instead of Expression. It is weird to return String but the method name says it is a expression.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@wuchong
Copy link
Member Author

wuchong commented Nov 1, 2019

Travis passed: https://travis-ci.com/flink-ci/flink/builds/134547319
Merging...

@wuchong wuchong merged commit 3011bff into apache:master Nov 1, 2019
@wuchong wuchong deleted the watermark-schema branch November 1, 2019 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants