Skip to content

Commit

Permalink
[FLINK-18774][debezium-avro] Improve debezium-avro format implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wuchong committed Nov 8, 2020
1 parent edec6aa commit bff7da1
Show file tree
Hide file tree
Showing 18 changed files with 443 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public SerializationSchema<RowData> createRuntimeEncoder(
subject.get(),
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL),
RowDataToAvroConverters.createRowConverter(rowType));
RowDataToAvroConverters.createConverter(rowType));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.formats.avro.registry.confluent.debezium;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
Expand Down Expand Up @@ -82,38 +83,37 @@ public final class DebeziumAvroDeserializationSchema implements DeserializationS
/**
* TypeInformation of the produced {@link RowData}.
**/
private final TypeInformation<RowData> resultTypeInfo;

/**
* Debezium Avro data rowType.
*/
private final RowType rowType;
private final TypeInformation<RowData> producedTypeInfo;

public DebeziumAvroDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
TypeInformation<RowData> producedTypeInfo,
String schemaRegistryUrl) {
this.resultTypeInfo = resultTypeInfo;
this.rowType = rowType;
RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));
this.producedTypeInfo = producedTypeInfo;
RowType debeziumAvroRowType = createDebeziumAvroRowType(
fromLogicalToDataType(rowType));

this.avroDeserializer = new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
schemaRegistryUrl),
AvroToRowDataConverters.createRowConverter(debeziumAvroRowType),
resultTypeInfo);
producedTypeInfo);
}

public DebeziumAvroDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
@VisibleForTesting
DebeziumAvroDeserializationSchema(
TypeInformation<RowData> producedTypeInfo,
AvroRowDataDeserializationSchema avroDeserializer) {
this.rowType = rowType;
this.resultTypeInfo = resultTypeInfo;
this.producedTypeInfo = producedTypeInfo;
this.avroDeserializer = avroDeserializer;
}

@Override
public void open(InitializationContext context) throws Exception {
avroDeserializer.open(context);
}

@Override
public RowData deserialize(byte[] message) throws IOException {
throw new RuntimeException(
Expand Down Expand Up @@ -156,8 +156,7 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
}
} catch (Throwable t) {
// a big try catch to protect the processing.
throw new IOException(format(
"Corrupt Debezium Avro message '%s'.", new String(message)), t);
throw new IOException("Can't deserialize Debezium Avro message.", t);
}
}

Expand All @@ -168,7 +167,7 @@ public boolean isEndOfStream(RowData nextElement) {

@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
return producedTypeInfo;
}

@Override
Expand All @@ -181,20 +180,20 @@ public boolean equals(Object o) {
}
DebeziumAvroDeserializationSchema that = (DebeziumAvroDeserializationSchema) o;
return Objects.equals(avroDeserializer, that.avroDeserializer) &&
Objects.equals(resultTypeInfo, that.resultTypeInfo);
Objects.equals(producedTypeInfo, that.producedTypeInfo);
}

@Override
public int hashCode() {
return Objects.hash(avroDeserializer, resultTypeInfo);
return Objects.hash(avroDeserializer, producedTypeInfo);
}

public static RowType createDebeziumAvroRowType(DataType dataType) {
public static RowType createDebeziumAvroRowType(DataType databaseSchema) {
// Debezium Avro contains other information, e.g. "source", "ts_ms"
// but we don't need them
return (RowType) DataTypes.ROW(
DataTypes.FIELD("before", dataType.nullable()),
DataTypes.FIELD("after", dataType.nullable()),
DataTypes.FIELD("before", databaseSchema.nullable()),
DataTypes.FIELD("after", databaseSchema.nullable()),
DataTypes.FIELD("op", DataTypes.STRING())).getLogicalType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class DebeziumAvroFormatFactory implements DeserializationFormatFactory,

public static final String IDENTIFIER = "debezium-avro-confluent";

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
Expand All @@ -67,11 +66,11 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new DebeziumAvroDeserializationSchema(
rowType,
rowDataTypeInfo,
producedTypeInfo,
schemaRegistryURL);
}

Expand All @@ -89,16 +88,19 @@ public ChangelogMode getChangelogMode() {

@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {

FactoryUtil.validateFactoryOptions(this, formatOptions);
String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL);
Optional<String> subject = formatOptions.getOptional(SCHEMA_REGISTRY_SUBJECT);
if (!subject.isPresent()) {
throw new ValidationException(String.format("Option %s.%s is required for serialization",
IDENTIFIER, SCHEMA_REGISTRY_SUBJECT.key()));
throw new ValidationException(String.format(
"Option '%s.%s' is required for serialization",
IDENTIFIER,
SCHEMA_REGISTRY_SUBJECT.key()));
}

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
Expand All @@ -115,7 +117,10 @@ public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new DebeziumAvroSerializationSchema(rowType, schemaRegistryURL, subject.get());
return new DebeziumAvroSerializationSchema(
rowType,
schemaRegistryURL,
subject.get());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.avro.registry.confluent.debezium;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
Expand All @@ -29,10 +30,10 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import java.util.Objects;

import static java.lang.String.format;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;

/**
Expand All @@ -44,7 +45,7 @@ public class DebeziumAvroSerializationSchema implements SerializationSchema<RowD
/**
* insert operation.
*/
private static final StringData OP_CREATE = StringData.fromString("c");
private static final StringData OP_INSERT = StringData.fromString("c");
/**
* delete operation.
*/
Expand All @@ -55,6 +56,8 @@ public class DebeziumAvroSerializationSchema implements SerializationSchema<RowD
*/
private final AvroRowDataSerializationSchema avroSerializer;

private transient GenericRowData outputReuse;

public DebeziumAvroSerializationSchema(
RowType rowType,
String schemaRegistryUrl,
Expand All @@ -67,44 +70,41 @@ public DebeziumAvroSerializationSchema(
schemaRegistrySubject,
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
schemaRegistryUrl),
RowDataToAvroConverters.createRowConverter(debeziumAvroRowType));
RowDataToAvroConverters.createConverter(debeziumAvroRowType));
}

public DebeziumAvroSerializationSchema(AvroRowDataSerializationSchema avroSerializer) {
@VisibleForTesting
DebeziumAvroSerializationSchema(AvroRowDataSerializationSchema avroSerializer) {
this.avroSerializer = avroSerializer;
}

@Override
public void open(InitializationContext context) throws Exception {
avroSerializer.open(context);
outputReuse = new GenericRowData(3);
}

@Override
public byte[] serialize(RowData element) {
GenericRowData reuse = new GenericRowData(3);
switch (element.getRowKind()) {
case INSERT:
reuse.setField(1, element);
break;
case DELETE:
reuse.setField(0, element);
break;
default:
throw new UnsupportedOperationException("Unsupported operation '" + element.getRowKind() + "' for row kind.");
}
reuse.setField(2, rowKind2String(element.getRowKind()));
return avroSerializer.serialize(reuse);
}

private StringData rowKind2String(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return OP_CREATE;
case UPDATE_BEFORE:
case DELETE:
return OP_DELETE;
default:
throw new UnsupportedOperationException("Unsupported operation '" + rowKind + "' for row kind.");
public byte[] serialize(RowData rowData) {
try {
switch (rowData.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
outputReuse.setField(0, null);
outputReuse.setField(1, rowData);
outputReuse.setField(2, OP_INSERT);
return avroSerializer.serialize(outputReuse);
case UPDATE_BEFORE:
case DELETE:
outputReuse.setField(0, rowData);
outputReuse.setField(1, null);
outputReuse.setField(2, OP_DELETE);
return avroSerializer.serialize(outputReuse);
default:
throw new UnsupportedOperationException(format("Unsupported operation '%s' for row kind.", rowData.getRowKind()));
}
} catch (Throwable t) {
throw new RuntimeException(format("Could not serialize row '%s'.", rowData), t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testSerializationSchema() {
SUBJECT,
AvroSchemaConverter.convertToSchema(ROW_TYPE),
REGISTRY_URL),
RowDataToAvroConverters.createRowConverter(ROW_TYPE));
RowDataToAvroConverters.createConverter(ROW_TYPE));

final DynamicTableSink actualSink = createTableSink(getDefaultOptions());
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private static AvroRowDataSerializationSchema getSerializationSchema(
GenericRecord.class,
avroSchema,
() -> registryCoder),
RowDataToAvroConverters.createRowConverter(rowType));
RowDataToAvroConverters.createConverter(rowType));
}

private static AvroRowDataDeserializationSchema getDeserializationSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Map;

import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link DebeziumAvroFormatFactory}.
Expand All @@ -68,35 +70,19 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
public void testSeDeSchema() {
final Map<String, String> options = getAllOptions();

final DebeziumAvroDeserializationSchema expectedDeser = new DebeziumAvroDeserializationSchema(
DebeziumAvroDeserializationSchema expectedDeser = new DebeziumAvroDeserializationSchema(
ROW_TYPE,
InternalTypeInfo.of(ROW_TYPE),
REGISTRY_URL);

final DynamicTableSource actualSource = createTableSource(options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE,
SCHEMA.toRowDataType());

DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertEquals(expectedDeser, actualDeser);
final DebeziumAvroSerializationSchema expectedSer = new DebeziumAvroSerializationSchema(

DebeziumAvroSerializationSchema expectedSer = new DebeziumAvroSerializationSchema(
ROW_TYPE,
REGISTRY_URL,
SUBJECT
);
final DynamicTableSink actualSink = createTableSink(options);
assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
TestDynamicTableFactory.DynamicTableSinkMock sinkMock = (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

SerializationSchema<RowData> actualSer = sinkMock.valueFormat
.createRuntimeEncoder(
new SinkRuntimeProviderContext(false),
SCHEMA.toRowDataType());

SerializationSchema<RowData> actualSer = createSerializationSchema(options);
Assert.assertEquals(expectedSer, actualSer);
}

Expand All @@ -112,6 +98,30 @@ private Map<String, String> getAllOptions() {
return options;
}

private static DeserializationSchema<RowData> createDeserializationSchema(Map<String, String> options) {
final DynamicTableSource actualSource = createTableSource(options);
assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

return scanSourceMock.valueFormat
.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE,
SCHEMA.toRowDataType());
}

private static SerializationSchema<RowData> createSerializationSchema(Map<String, String> options) {
final DynamicTableSink actualSink = createTableSink(options);
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

return sinkMock.valueFormat
.createRuntimeEncoder(
new SinkRuntimeProviderContext(false),
SCHEMA.toRowDataType());
}

private static DynamicTableSource createTableSource(Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
Expand Down
Loading

0 comments on commit bff7da1

Please sign in to comment.