Skip to content

Commit

Permalink
[FLINK-20175] Avro Confluent Registry SQL format does not support add…
Browse files Browse the repository at this point in the history
…ing nullable columns

We set null as the default value for nullable logical types. This makes it possible to add nullable columns without breaking backwards compatibility.
  • Loading branch information
dawidwys committed Nov 25, 2020
1 parent 9abe38e commit efc12ca
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,16 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
.fields();
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = fieldNames.get(i);
builder = builder
LogicalType fieldType = rowType.getTypeAt(i);
SchemaBuilder.GenericDefault<Schema> fieldBuilder = builder
.name(fieldName)
.type(convertToSchema(rowType.getTypeAt(i), rowName + "_" + fieldName))
.noDefault();
.type(convertToSchema(fieldType, rowName + "_" + fieldName));

if (fieldType.isNullable()) {
builder = fieldBuilder.withDefault(null);
} else {
builder = fieldBuilder.noDefault();
}
}
Schema record = builder.endRecord();
return nullable ? nullableSchema(record) : record;
Expand Down Expand Up @@ -441,6 +447,6 @@ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
private static Schema nullableSchema(Schema schema) {
return schema.isNullable()
? schema
: Schema.createUnion(schema, SchemaBuilder.builder().nullType());
: Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,28 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DecoderFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -61,6 +71,44 @@ public void testConvertAvroSchemaToDataType() {
validateUserSchema(AvroSchemaConverter.convertToDataType(schema));
}

@Test
public void testAddingOptionalField() throws IOException {
Schema oldSchema = SchemaBuilder.record("record")
.fields()
.requiredLong("category_id")
.optionalString("name")
.endRecord();

Schema newSchema = AvroSchemaConverter.convertToSchema(
TableSchema.builder()
.field("category_id", DataTypes.BIGINT().notNull())
.field("name", DataTypes.STRING().nullable())
.field("description", DataTypes.STRING().nullable())
.build().toRowDataType().getLogicalType()
);

byte[] serializedRecord = AvroTestUtils.writeRecord(
new GenericRecordBuilder(oldSchema)
.set("category_id", 1L)
.set("name", "test")
.build(),
oldSchema
);
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(
oldSchema,
newSchema);
GenericRecord newRecord = datumReader.read(
null,
DecoderFactory.get().binaryDecoder(serializedRecord, 0, serializedRecord.length, null));
assertThat(
newRecord,
equalTo(new GenericRecordBuilder(newSchema)
.set("category_id", 1L)
.set("name", "test")
.set("description", null)
.build()));
}

@Test
public void testInvalidRawTypeAvroSchemaConversion() {
RowType rowType = (RowType) TableSchema.builder()
Expand Down Expand Up @@ -104,48 +152,55 @@ public void testRowTypeAvroSchemaConversion() {
DataTypes.FIELD("row3", DataTypes.ROW(DataTypes.FIELD("c", DataTypes.STRING())))))
.build().toRowDataType().getLogicalType();
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
assertEquals("{\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"row1\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row1\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"a\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"row2\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row2\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"b\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"row3\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row3\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"row3\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_row3_row3\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"c\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" } ]\n" +
" }, \"null\" ]\n" +
" } ]\n" +
" }, \"null\" ]\n" +
" } ]\n" +
"}", schema.toString(true));
assertEquals("{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row1\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row1\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"a\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"row2\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row2\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"b\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"row3\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row3\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row3\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_row3_row3\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"c\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ "}", schema.toString(true));
}

/**
Expand Down Expand Up @@ -227,89 +282,106 @@ public void testDataTypeToSchemaToDataTypeNonNullable() {
*/
@Test
public void testSchemaToDataTypeToSchemaNullable() {
String schemaStr = "{\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"f_null\",\n" +
" \"type\" : \"null\"\n" +
" }, {\n" +
" \"name\" : \"f_boolean\",\n" +
" \"type\" : [ \"boolean\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_int\",\n" +
" \"type\" : [ \"int\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_bigint\",\n" +
" \"type\" : [ \"long\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_float\",\n" +
" \"type\" : [ \"float\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_double\",\n" +
" \"type\" : [ \"double\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_string\",\n" +
" \"type\" : [ \"string\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_varbinary\",\n" +
" \"type\" : [ \"bytes\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_timestamp\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"long\",\n" +
" \"logicalType\" : \"timestamp-millis\"\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_date\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"int\",\n" +
" \"logicalType\" : \"date\"\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_time\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"int\",\n" +
" \"logicalType\" : \"time-millis\"\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_decimal\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"bytes\",\n" +
" \"logicalType\" : \"decimal\",\n" +
" \"precision\" : 10,\n" +
" \"scale\" : 0\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_row\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"record\",\n" +
" \"name\" : \"record_f_row\",\n" +
" \"fields\" : [ {\n" +
" \"name\" : \"f0\",\n" +
" \"type\" : [ \"int\", \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f1\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"long\",\n" +
" \"logicalType\" : \"timestamp-millis\"\n" +
" }, \"null\" ]\n" +
" } ]\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_map\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"map\",\n" +
" \"values\" : [ \"int\", \"null\" ]\n" +
" }, \"null\" ]\n" +
" }, {\n" +
" \"name\" : \"f_array\",\n" +
" \"type\" : [ {\n" +
" \"type\" : \"array\",\n" +
" \"items\" : [ \"int\", \"null\" ]\n" +
" }, \"null\" ]\n" +
" } ]\n" +
"}";
String schemaStr = "{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_null\",\n"
+ " \"type\" : \"null\",\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_boolean\",\n"
+ " \"type\" : [ \"null\", \"boolean\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_int\",\n"
+ " \"type\" : [ \"null\", \"int\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_bigint\",\n"
+ " \"type\" : [ \"null\", \"long\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_float\",\n"
+ " \"type\" : [ \"null\", \"float\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_double\",\n"
+ " \"type\" : [ \"null\", \"double\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_string\",\n"
+ " \"type\" : [ \"null\", \"string\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_varbinary\",\n"
+ " \"type\" : [ \"null\", \"bytes\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_timestamp\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"timestamp-millis\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_date\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"int\",\n"
+ " \"logicalType\" : \"date\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_time\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"int\",\n"
+ " \"logicalType\" : \"time-millis\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_decimal\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"bytes\",\n"
+ " \"logicalType\" : \"decimal\",\n"
+ " \"precision\" : 10,\n"
+ " \"scale\" : 0\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_row\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record_f_row\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f0\",\n"
+ " \"type\" : [ \"null\", \"int\" ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f1\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"timestamp-millis\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_map\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"map\",\n"
+ " \"values\" : [ \"null\", \"int\" ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_array\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"array\",\n"
+ " \"items\" : [ \"null\", \"int\" ]\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ "}";
DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr);
Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
assertEquals(new Schema.Parser().parse(schemaStr), schema);
Expand Down

0 comments on commit efc12ca

Please sign in to comment.