Skip to content

Commit

Permalink
[FLINK-19446][canal-json] Fix canal-json format parse UPDATE record w…
Browse files Browse the repository at this point in the history
…ith null value will get wrong result

This closes apache#14693
  • Loading branch information
SteNicholas committed Jan 22, 2021
1 parent 1678b60 commit 6ee32ee
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
public final class CanalJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;

private static final String FIELD_OLD = "old";
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
private static final String OP_DELETE = "DELETE";
Expand All @@ -87,6 +88,9 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;

/** Names of fields. */
private final List<String> fieldNames;

/** Number of fields. */
private final int fieldCount;

Expand Down Expand Up @@ -121,7 +125,9 @@ private CanalJsonDeserializationSchema(
this.database = database;
this.table = table;
this.ignoreParseErrors = ignoreParseErrors;
this.fieldCount = ((RowType) physicalDataType.getLogicalType()).getFieldCount();
final RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
this.fieldNames = physicalRowType.getFieldNames();
this.fieldCount = physicalRowType.getFieldCount();
this.databasePattern = database == null ? null : Pattern.compile(database);
this.tablePattern = table == null ? null : Pattern.compile(table);
}
Expand Down Expand Up @@ -238,10 +244,11 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
// the underlying JSON deserialization schema always produce GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
final JsonNode oldField = root.get(FIELD_OLD);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
// not null fields in "old" (before) means the fields are changed
// null/empty fields in "old" (before) means the fields are not changed
if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) {
// fields in "old" (before) means the fields are changed
// fields not in "old" (before) means the fields are not changed
// so we just copy the not changed fields into before
before.setField(f, after.getField(f));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ public void runTest(List<String> lines, CanalJsonDeserializationSchema deseriali
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
"+I(104,hammer,12oz carpenter's hammer,0.75)",
"+I(105,hammer,14oz carpenter's hammer,0.875)",
"+I(106,hammer,16oz carpenter's hammer,1.0)",
"+I(106,hammer,null,1.0)",
"+I(107,rocks,box of assorted rocks,5.3)",
"+I(108,jacket,water resistent black wind breaker,0.1)",
"+I(109,spare tire,24 inch spare tire,22.2)",
"-U(106,hammer,16oz carpenter's hammer,1.0)",
"-U(106,hammer,null,1.0)",
"+U(106,hammer,18oz carpenter hammer,1.0)",
"-U(107,rocks,box of assorted rocks,5.3)",
"+U(107,rocks,box of assorted rocks,5.1)",
Expand Down Expand Up @@ -233,11 +233,11 @@ public void runTest(List<String> lines, CanalJsonDeserializationSchema deseriali
"{\"data\":[{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0}],\"type\":\"DELETE\"}",
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":null,\"weight\":1.0}],\"type\":\"DELETE\"}",
"{\"data\":[{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0}],\"type\":\"INSERT\"}",
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3}],\"type\":\"DELETE\"}",
"{\"data\":[{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1}],\"type\":\"INSERT\"}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}
{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"mydb","es":1598944202000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":"16oz carpenter's hammer"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944202218,"type":"UPDATE"}
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":1,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944146308,"type":"INSERT"}
{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"mydb","es":1598944202000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944202218,"type":"UPDATE"}
{"data":null,"database":"mydb","es":1598944271000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE orders (\n order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n order_date DATE NOT NULL,\n purchaser INTEGER NOT NULL,\n quantity INTEGER NOT NULL,\n product_id INTEGER NOT NULL\n) AUTO_INCREMENT = 10001","sqlType":null,"table":"orders","ts":1598944271192,"type":"CREATE"}
{"data":[{"order_number":"10001","order_date":"2016-01-16","purchaser":"1001","quantity":"1","product_id":"102"},{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"},{"order_number":"10003","order_date":"2016-02-19","purchaser":"1002","quantity":"2","product_id":"106"},{"order_number":"10004","order_date":"2016-02-21","purchaser":"1003","quantity":"1","product_id":"107"}],"database":"mydb","es":1598944275000,"id":4,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944275018,"type":"INSERT"}
{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"mydb","es":1598944279000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944279665,"type":"UPDATE"}
Expand Down
4 changes: 2 additions & 2 deletions flink-formats/flink-json/src/test/resources/canal-data.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"}
{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"inventory","es":1589373546000,"id":4,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":"16oz carpenter's hammer"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373546301,"type":"UPDATE"}
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"106","name":"hammer","description":null,"weight":"1.0"},{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"inventory","es":1589373515000,"id":3,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373515477,"type":"INSERT"}
{"data":[{"id":"106","name":"hammer","description":"18oz carpenter hammer","weight":"1.0"}],"database":"inventory","es":1589373546000,"id":4,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"description":null}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373546301,"type":"UPDATE"}
{"data":[{"id":"107","name":"rocks","description":"box of assorted rocks","weight":"5.1"}],"database":"inventory","es":1589373549000,"id":5,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373549489,"type":"UPDATE"}
{"data":[{"id":"110","name":"jacket","description":"water resistent white wind breaker","weight":"0.2"}],"database":"inventory","es":1589373552000,"id":6,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373552882,"type":"INSERT"}
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.18"}],"database":"inventory","es":1589373555000,"id":7,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373555457,"type":"INSERT"}
Expand Down

0 comments on commit 6ee32ee

Please sign in to comment.