Skip to content

Commit

Permalink
[FLINK-21396][table-common] Improve usability of new schema hierarchy
Browse files Browse the repository at this point in the history
This closes apache#15096.
  • Loading branch information
twalthr committed Mar 9, 2021
1 parent b25ccd0 commit adaf020
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ public void testSchemaResolution() {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
Column.metadata("topic", DataTypes.STRING(), true),
Column.metadata("topic", DataTypes.STRING(), null, true),
Column.computed("ts", COMPUTED_COLUMN_RESOLVED),
Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp"),
Column.metadata(
"orig_ts", DataTypes.TIMESTAMP(3), "timestamp", false),
Column.computed("proctime", PROCTIME_RESOLVED)),
Collections.singletonList(new WatermarkSpec("ts", WATERMARK_RESOLVED)),
UniqueConstraint.primaryKey(
Expand Down Expand Up @@ -291,7 +292,7 @@ public void testPhysicalRowDataType() {

@Test
public void testSourceRowDataType() {
final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, true);
final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
final DataType expectedDataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT().notNull()),
Expand All @@ -310,6 +311,14 @@ public void testSourceRowDataType() {
assertThat(resolvedSchema.toSourceRowDataType(), equalTo(expectedDataType));
}

@Test
public void testLegacySchemaCompatibility() {
final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
final ResolvedSchema resolvedSchemaFromLegacy =
resolveSchema(TableSchema.fromResolvedSchema(resolvedSchema).toSchema());
assertThat(resolvedSchemaFromLegacy, equalTo(resolvedSchema));
}

// --------------------------------------------------------------------------------------------

private static void testError(Schema schema, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,23 @@ public Builder columnByMetadata(
return this;
}

/**
* Declares a metadata column that is appended to this schema.
*
* <p>See {@link #columnByMetadata(String, AbstractDataType, boolean)} for a detailed
* explanation.
*
* <p>This method uses a type string that can be easily persisted in a durable catalog.
*
* @param columnName column name
* @param serializableTypeString data type of the column
* @param isVirtual whether the column should be persisted or not
*/
public Builder columnByMetadata(
String columnName, String serializableTypeString, boolean isVirtual) {
return columnByMetadata(columnName, DataTypes.of(serializableTypeString), isVirtual);
}

/**
* Declares a metadata column that is appended to this schema.
*
Expand Down Expand Up @@ -329,6 +346,24 @@ public Builder columnByMetadata(
return this;
}

/**
* Declares a metadata column that is appended to this schema.
*
* <p>See {@link #columnByMetadata(String, AbstractDataType, String)} for a detailed
* explanation.
*
* <p>This method uses a type string that can be easily persisted in a durable catalog.
*
* @param columnName column name
* @param serializableTypeString data type of the column
* @param metadataKey identifying metadata key, if null the column name will be used as
* metadata key
*/
public Builder columnByMetadata(
String columnName, String serializableTypeString, @Nullable String metadataKey) {
return columnByMetadata(columnName, DataTypes.of(serializableTypeString), metadataKey);
}

/**
* Declares a metadata column that is appended to this schema.
*
Expand Down Expand Up @@ -364,6 +399,29 @@ public Builder columnByMetadata(
return this;
}

/**
* Declares a metadata column that is appended to this schema.
*
* <p>See {@link #columnByMetadata(String, AbstractDataType, String, boolean)} for a
* detailed explanation.
*
* <p>This method uses a type string that can be easily persisted in a durable catalog.
*
* @param columnName column name
* @param serializableTypeString data type of the column
* @param metadataKey identifying metadata key, if null the column name will be used as
* metadata key
* @param isVirtual whether the column should be persisted or not
*/
public Builder columnByMetadata(
String columnName,
String serializableTypeString,
@Nullable String metadataKey,
boolean isVirtual) {
return columnByMetadata(
columnName, DataTypes.of(serializableTypeString), metadataKey, isVirtual);
}

/**
* Declares that the given column should serve as an event-time (i.e. rowtime) attribute and
* specifies a corresponding watermark strategy as an expression.
Expand Down Expand Up @@ -502,7 +560,7 @@ private void addResolvedColumns(List<Column> columns) {
columnByMetadata(
metadataColumn.getName(),
metadataColumn.getDataType(),
metadataColumn.getMetadataAlias().orElse(null),
metadataColumn.getMetadataKey().orElse(null),
metadataColumn.isVirtual());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableColumn.ComputedColumn;
import org.apache.flink.table.api.TableColumn.MetadataColumn;
import org.apache.flink.table.api.TableColumn.PhysicalColumn;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -298,6 +303,40 @@ public Optional<UniqueConstraint> getPrimaryKey() {
return Optional.ofNullable(primaryKey);
}

/** Helps to migrate to the new {@link Schema} class. */
public Schema toSchema() {
final Schema.Builder builder = Schema.newBuilder();

columns.forEach(
column -> {
if (column instanceof PhysicalColumn) {
final PhysicalColumn c = (PhysicalColumn) column;
builder.column(c.getName(), c.getType());
} else if (column instanceof MetadataColumn) {
final MetadataColumn c = (MetadataColumn) column;
builder.columnByMetadata(
c.getName(),
c.getType(),
c.getMetadataAlias().orElse(null),
c.isVirtual());
} else if (column instanceof ComputedColumn) {
final ComputedColumn c = (ComputedColumn) column;
builder.columnByExpression(c.getName(), c.getExpression());
} else {
throw new IllegalArgumentException("Unsupported column type: " + column);
}
});

watermarkSpecs.forEach(
spec -> builder.watermark(spec.getRowtimeAttribute(), spec.getWatermarkExpr()));

if (primaryKey != null) {
builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns());
}

return builder.build();
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -369,6 +408,54 @@ public static TableSchema fromTypeInfo(TypeInformation<?> typeInfo) {
}
}

/** Helps to migrate to the new {@link ResolvedSchema} to old API methods. */
public static TableSchema fromResolvedSchema(ResolvedSchema resolvedSchema) {
final TableSchema.Builder builder = TableSchema.builder();

resolvedSchema.getColumns().stream()
.map(
column -> {
if (column instanceof Column.PhysicalColumn) {
final Column.PhysicalColumn c = (Column.PhysicalColumn) column;
return TableColumn.physical(c.getName(), c.getDataType());
} else if (column instanceof Column.MetadataColumn) {
final Column.MetadataColumn c = (Column.MetadataColumn) column;
return TableColumn.metadata(
c.getName(),
c.getDataType(),
c.getMetadataKey().orElse(null),
c.isVirtual());
} else if (column instanceof Column.ComputedColumn) {
final Column.ComputedColumn c = (Column.ComputedColumn) column;
return TableColumn.computed(
c.getName(),
c.getDataType(),
c.getExpression().asSerializableString());
}
throw new IllegalArgumentException(
"Unsupported column type: " + column);
})
.forEach(builder::add);

resolvedSchema
.getWatermarkSpecs()
.forEach(
spec ->
builder.watermark(
spec.getRowtimeAttribute(),
spec.getWatermarkExpression().asSerializableString(),
spec.getWatermarkExpression().getOutputDataType()));

resolvedSchema
.getPrimaryKey()
.ifPresent(
pk ->
builder.primaryKey(
pk.getName(), pk.getColumns().toArray(new String[0])));

return builder.build();
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,45 +64,17 @@ public static ComputedColumn computed(String name, ResolvedExpression expression
return new ComputedColumn(name, expression.getOutputDataType(), expression);
}

/**
* Creates a metadata column from metadata of the given column name.
*
* <p>The column is not virtual by default.
*/
public static MetadataColumn metadata(String name, DataType dataType) {
return metadata(name, dataType, null, false);
}

/**
* Creates a metadata column from metadata of the given column name.
*
* <p>Allows to specify whether the column is virtual or not.
*/
public static MetadataColumn metadata(String name, DataType type, boolean isVirtual) {
return metadata(name, type, null, isVirtual);
}

/**
* Creates a metadata column from metadata of the given alias.
*
* <p>The column is not virtual by default.
*/
public static MetadataColumn metadata(String name, DataType type, String metadataAlias) {
Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be null.");
return metadata(name, type, metadataAlias, false);
}

/**
* Creates a metadata column from metadata of the given column name or from metadata of the
* given alias (if not null).
* given key (if not null).
*
* <p>Allows to specify whether the column is virtual or not.
*/
public static MetadataColumn metadata(
String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
String name, DataType dataType, @Nullable String metadataKey, boolean isVirtual) {
Preconditions.checkNotNull(name, "Column name can not be null.");
Preconditions.checkNotNull(dataType, "Column data type can not be null.");
return new MetadataColumn(name, dataType, metadataAlias, isVirtual);
return new MetadataColumn(name, dataType, metadataKey, isVirtual);
}

/**
Expand Down Expand Up @@ -257,23 +229,23 @@ public int hashCode() {
/** Representation of a metadata column. */
public static final class MetadataColumn extends Column {

private final @Nullable String metadataAlias;
private final @Nullable String metadataKey;

private final boolean isVirtual;

private MetadataColumn(
String name, DataType dataType, @Nullable String metadataAlias, boolean isVirtual) {
String name, DataType dataType, @Nullable String metadataKey, boolean isVirtual) {
super(name, dataType);
this.metadataAlias = metadataAlias;
this.metadataKey = metadataKey;
this.isVirtual = isVirtual;
}

public boolean isVirtual() {
return isVirtual;
}

public Optional<String> getMetadataAlias() {
return Optional.ofNullable(metadataAlias);
public Optional<String> getMetadataKey() {
return Optional.ofNullable(metadataKey);
}

@Override
Expand All @@ -290,10 +262,10 @@ public boolean isPersisted() {
public Optional<String> explainExtras() {
final StringBuilder sb = new StringBuilder();
sb.append("METADATA");
if (metadataAlias != null) {
if (metadataKey != null) {
sb.append(" FROM ");
sb.append("'");
sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias));
sb.append(EncodingUtils.escapeSingleQuotes(metadataKey));
sb.append("'");
}
if (isVirtual) {
Expand All @@ -304,7 +276,7 @@ public Optional<String> explainExtras() {

@Override
public Column copy(DataType newDataType) {
return new MetadataColumn(name, newDataType, metadataAlias, isVirtual);
return new MetadataColumn(name, newDataType, metadataKey, isVirtual);
}

@Override
Expand All @@ -319,12 +291,12 @@ public boolean equals(Object o) {
return false;
}
MetadataColumn that = (MetadataColumn) o;
return isVirtual == that.isVirtual && Objects.equals(metadataAlias, that.metadataAlias);
return isVirtual == that.isVirtual && Objects.equals(metadataKey, that.metadataKey);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), metadataAlias, isVirtual);
return Objects.hash(super.hashCode(), metadataKey, isVirtual);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class ResolvedSchema {
private final List<WatermarkSpec> watermarkSpecs;
private final @Nullable UniqueConstraint primaryKey;

ResolvedSchema(
public ResolvedSchema(
List<Column> columns,
List<WatermarkSpec> watermarkSpecs,
@Nullable UniqueConstraint primaryKey) {
Expand Down

0 comments on commit adaf020

Please sign in to comment.