Skip to content

Commit

Permalink
[FLINK-27959][TableSQL/API][avro] Remove usage of deprecated TableSch…
Browse files Browse the repository at this point in the history
…ema from flink-avro
  • Loading branch information
snuyanzin authored and alpinegizmo committed Jun 9, 2022
1 parent 9411785 commit 6017f99
Showing 1 changed file with 43 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -76,12 +77,13 @@ void testAddingOptionalField() throws IOException {

Schema newSchema =
AvroSchemaConverter.convertToSchema(
TableSchema.builder()
.field("category_id", DataTypes.BIGINT().notNull())
.field("name", DataTypes.STRING().nullable())
.field("description", DataTypes.STRING().nullable())
.build()
.toRowDataType()
ResolvedSchema.of(
Column.physical(
"category_id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING().nullable()),
Column.physical(
"description", DataTypes.STRING().nullable()))
.toSourceRowDataType()
.getLogicalType());

byte[] serializedRecord =
Expand Down Expand Up @@ -111,11 +113,12 @@ void testAddingOptionalField() throws IOException {
void testInvalidRawTypeAvroSchemaConversion() {
RowType rowType =
(RowType)
TableSchema.builder()
.field("a", DataTypes.STRING())
.field("b", DataTypes.RAW(Void.class, VoidSerializer.INSTANCE))
.build()
.toRowDataType()
ResolvedSchema.of(
Column.physical("a", DataTypes.STRING()),
Column.physical(
"b",
DataTypes.RAW(Void.class, VoidSerializer.INSTANCE)))
.toSourceRowDataType()
.getLogicalType();

assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
Expand All @@ -127,11 +130,10 @@ void testInvalidRawTypeAvroSchemaConversion() {
void testInvalidTimestampTypeAvroSchemaConversion() {
RowType rowType =
(RowType)
TableSchema.builder()
.field("a", DataTypes.STRING())
.field("b", DataTypes.TIMESTAMP(9))
.build()
.toRowDataType()
ResolvedSchema.of(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.TIMESTAMP(9)))
.toSourceRowDataType()
.getLogicalType();

assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
Expand All @@ -145,11 +147,10 @@ void testInvalidTimestampTypeAvroSchemaConversion() {
void testInvalidTimeTypeAvroSchemaConversion() {
RowType rowType =
(RowType)
TableSchema.builder()
.field("a", DataTypes.STRING())
.field("b", DataTypes.TIME(6))
.build()
.toRowDataType()
ResolvedSchema.of(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.TIME(6)))
.toSourceRowDataType()
.getLogicalType();

assertThatThrownBy(() -> AvroSchemaConverter.convertToSchema(rowType))
Expand All @@ -162,23 +163,26 @@ void testInvalidTimeTypeAvroSchemaConversion() {
void testRowTypeAvroSchemaConversion() {
RowType rowType =
(RowType)
TableSchema.builder()
.field(
"row1",
DataTypes.ROW(DataTypes.FIELD("a", DataTypes.STRING())))
.field(
"row2",
DataTypes.ROW(DataTypes.FIELD("b", DataTypes.STRING())))
.field(
"row3",
DataTypes.ROW(
DataTypes.FIELD(
"row3",
DataTypes.ROW(
DataTypes.FIELD(
"c", DataTypes.STRING())))))
.build()
.toRowDataType()
ResolvedSchema.of(
Column.physical(
"row1",
DataTypes.ROW(
DataTypes.FIELD("a", DataTypes.STRING()))),
Column.physical(
"row2",
DataTypes.ROW(
DataTypes.FIELD("b", DataTypes.STRING()))),
Column.physical(
"row3",
DataTypes.ROW(
DataTypes.FIELD(
"row3",
DataTypes.ROW(
DataTypes.FIELD(
"c",
DataTypes
.STRING()))))))
.toSourceRowDataType()
.getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
assertThat(schema.toString(true))
Expand Down

0 comments on commit 6017f99

Please sign in to comment.