Skip to content

Commit

Permalink
[FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does…
Browse files Browse the repository at this point in the history
… not work in Table api

This closes apache#12335
  • Loading branch information
godfreyhe authored and dawidwys committed Jun 9, 2020
1 parent fa46073 commit 8a8d8a9
Show file tree
Hide file tree
Showing 31 changed files with 599 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -281,6 +282,32 @@ public void testStreamReadWriteCsvWithProctime() {
}

private void testReadWriteCsvWithProctime(boolean isStreaming) {
TableEnvironment tableEnv = prepareTable(isStreaming);
ArrayList<Row> rows = Lists.newArrayList(
tableEnv.executeSql("SELECT * FROM proctime_src").collect());
Assert.assertEquals(5, rows.size());
tableEnv.executeSql("DROP TABLE proctime_src");
}

@Test
public void testTableApiWithProctimeForBatch() {
testTableApiWithProctime(false);
}

@Test
public void testTableApiWithProctimeForStreaming() {
testTableApiWithProctime(true);
}

private void testTableApiWithProctime(boolean isStreaming) {
TableEnvironment tableEnv = prepareTable(isStreaming);
ArrayList<Row> rows = Lists.newArrayList(
tableEnv.from("proctime_src").select($("price"), $("ts"), $("l_proctime")).execute().collect());
Assert.assertEquals(5, rows.size());
tableEnv.executeSql("DROP TABLE proctime_src");
}

private TableEnvironment prepareTable(boolean isStreaming) {
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().useBlinkPlanner();
if (isStreaming) {
builder = builder.inStreamingMode();
Expand Down Expand Up @@ -308,10 +335,7 @@ private void testReadWriteCsvWithProctime(boolean isStreaming) {
"'connector.path' = 'file:https://%s'," +
"'format.type' = 'csv')", srcPath));

ArrayList<Row> rows = Lists.newArrayList(
tableEnv.executeSql("SELECT * FROM proctime_src").collect());
Assert.assertEquals(5, rows.size());
tableEnv.executeSql("DROP TABLE proctime_src");
return tableEnv;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.table.client.gateway.local.result.DynamicResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
Expand Down Expand Up @@ -473,6 +474,11 @@ public List<Operation> parse(String statement) {
public UnresolvedIdentifier parseIdentifier(String identifier) {
return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
}

@Override
public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
return context.wrapClassLoader(() -> parser.parseSqlExpression(sqlExpression, inputSchema));
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.utils.TypeConversions;

/**
* The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
* because the date type of computed column from catalog table is not trusted.
*
* <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
* but its correct type is Timestamp(3) *PROCTIME*.
*/
@Internal
public class CatalogTableSchemaResolver {
private final Parser parser;
// A flag to indicate the table environment should work in a batch or streaming
// TODO remove this once FLINK-18180 is finished
private final boolean isStreamingMode;

public CatalogTableSchemaResolver(Parser parser, boolean isStreamingMode) {
this.parser = parser;
this.isStreamingMode = isStreamingMode;
}

/**
* Resolve the computed column's type for the given schema.
*
* @param tableSchema Table schema to derive table field names and data types
* @return the resolved TableSchema
*/
public TableSchema resolve(TableSchema tableSchema) {
final String rowtime;
if (!tableSchema.getWatermarkSpecs().isEmpty()) {
// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
if (rowtime.contains(".")) {
throw new ValidationException(
String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
}
} else {
rowtime = null;
}

String[] fieldNames = tableSchema.getFieldNames();
DataType[] fieldTypes = tableSchema.getFieldDataTypes();

TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
TableColumn tableColumn = tableSchema.getTableColumns().get(i);
DataType fieldType = fieldTypes[i];
if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
if (fieldNames[i].equals(rowtime)) {
throw new TableException("Watermark can not be defined for a processing time attribute column.");
}
TimestampType originalType = (TimestampType) fieldType.getLogicalType();
LogicalType proctimeType = new TimestampType(
originalType.isNullable(),
TimestampKind.PROCTIME,
originalType.getPrecision());
fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
TimestampType originalType = (TimestampType) fieldType.getLogicalType();
LogicalType rowtimeType = new TimestampType(
originalType.isNullable(),
TimestampKind.ROWTIME,
originalType.getPrecision());
fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
}
if (tableColumn.isGenerated()) {
builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
} else {
builder.field(fieldNames[i], fieldType);
}
}

tableSchema.getWatermarkSpecs().forEach(builder::watermark);
tableSchema.getPrimaryKey().ifPresent(
pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
return builder.build();
}

private boolean isProctimeType(String expr, TableSchema tableSchema) {
ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
if (resolvedExpr == null) {
return false;
}
LogicalType type = resolvedExpr.getOutputDataType().getLogicalType();
return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.PROCTIME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ protected TableEnvironmentImpl(
Planner planner,
boolean isStreamingMode) {
this.catalogManager = catalogManager;
this.catalogManager.setCatalogTableSchemaResolver(
new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
this.moduleManager = moduleManager;
this.execEnv = executor;

Expand Down Expand Up @@ -490,8 +492,9 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);

return catalogManager.getTable(tableIdentifier)
.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
return catalogManager.getTable(tableIdentifier).map(t -> {
return new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +76,8 @@ public final class CatalogManager {

private String currentDatabaseName;

private CatalogTableSchemaResolver schemaResolver;

// The name of the built-in catalog
private final String builtInCatalogName;

Expand Down Expand Up @@ -146,6 +154,16 @@ public CatalogManager build() {
}
}

/**
* We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a
* {@link Planner}. At the same time {@link Planner} needs a {@link CatalogManager} to
* be constructed. Thus we can't get {@link Parser} instance when creating a
* {@link CatalogManager}. See {@link TableEnvironmentImpl#create}.
*/
public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {
this.schemaResolver = schemaResolver;
}

/**
* Returns a factory for creating fully resolved data types that can be used for planning.
*/
Expand Down Expand Up @@ -336,12 +354,24 @@ public CatalogBaseTable getTable() {
* @return table that the path points to.
*/
public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
return Optional.of(TableLookupResult.temporary(temporaryTable));
return Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable)));
} else {
return getPermanentTable(objectIdentifier);
Optional<TableLookupResult> result = getPermanentTable(objectIdentifier);
return result.map(tableLookupResult ->
TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable())));
}
}

private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
if (!(table instanceof CatalogTableImpl)) {
return table;
}
CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table;
TableSchema newTableSchema = schemaResolver.resolve(catalogTableImpl.getSchema());
return catalogTableImpl.copy(newTableSchema);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public CatalogTable copy(Map<String, String> options) {
return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, getComment());
}

public CatalogTable copy(TableSchema tableSchema) {
return new CatalogTableImpl(
tableSchema.copy(),
new ArrayList<>(getPartitionKeys()),
new HashMap<>(getProperties()),
getComment());
}

/**
* Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.table.delegation;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;

Expand Down Expand Up @@ -54,4 +56,14 @@ public interface Parser {
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
*/
UnresolvedIdentifier parseIdentifier(String identifier);

/**
* Entry point for parsing SQL expressions expressed as a String.
*
* @param sqlExpression the SQL expression to parse
* @param inputSchema the schema of the fields in sql expression
* @return resolved expression
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
*/
ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.table.utils;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.operations.Operation;

import java.util.List;
Expand All @@ -37,4 +39,9 @@ public List<Operation> parse(String statement) {
public UnresolvedIdentifier parseIdentifier(String identifier) {
return UnresolvedIdentifier.of(identifier);
}

@Override
public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;

import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.Schema;
Expand All @@ -43,18 +42,13 @@ public class CatalogCalciteSchema extends FlinkSchema {
private final CatalogManager catalogManager;
// Flag that tells if the current planner should work in a batch or streaming mode.
private final boolean isStreamingMode;
// The SQL expression converter factory is used to derive correct result type of computed column,
// because the date type of computed column from catalog table is not trusted.
private final SqlExprToRexConverterFactory converterFactory;

public CatalogCalciteSchema(
String catalogName,
CatalogManager catalog,
SqlExprToRexConverterFactory converterFactory,
boolean isStreamingMode) {
this.catalogName = catalogName;
this.catalogManager = catalog;
this.converterFactory = converterFactory;
this.isStreamingMode = isStreamingMode;
}

Expand All @@ -67,8 +61,7 @@ public CatalogCalciteSchema(
@Override
public Schema getSubSchema(String schemaName) {
if (catalogManager.schemaExists(catalogName, schemaName)) {
return new DatabaseCalciteSchema(
schemaName, catalogName, catalogManager, converterFactory, isStreamingMode);
return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
} else {
return null;
}
Expand Down
Loading

0 comments on commit 8a8d8a9

Please sign in to comment.