Skip to content

Commit

Permalink
[FLINK-19873][canal-json] Skip DDL change events for Canal data
Browse files Browse the repository at this point in the history
This closes apache#13872
  • Loading branch information
pyscala committed Nov 4, 2020
1 parent 6c1b3b6 commit 25701e1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
private static final String OP_DELETE = "DELETE";
private static final String OP_CREATE = "CREATE";

/** The deserializer to deserialize Debezium JSON data. */
private final JsonRowDataDeserializationSchema jsonDeserializer;
Expand Down Expand Up @@ -223,6 +224,10 @@ public void deserialize(byte[] message, Collector<RowData> out) throws IOExcepti
insert.setRowKind(RowKind.DELETE);
out.collect(insert);
}
} else if (OP_CREATE.equals(type)){
// "data" field is null and "type" is "CREATE" which means
// this is a DDL change event, and we should skip it.
return;
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
Expand Down
3 changes: 2 additions & 1 deletion flink-formats/flink-json/src/test/resources/canal-data.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373560000,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.18"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373560798,"type":"UPDATE"}
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"inventory","es":1589373563000,"id":10,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373563798,"type":"DELETE"}
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"inventory","es":1589373753000,"id":11,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589373753939,"type":"UPDATE"}
{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"inventory","es":1589374013000,"id":12,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589374013680,"type":"DELETE"}
{"data":null,"database":"inventory","es":1589373566000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `xj_`.`user02` (`uid` int(0) NOT NULL,`uname` varchar(255) NULL, PRIMARY KEY (`uid`))","sqlType":null,"table":"user02","ts":1589373566000,"type":"CREATE"}
{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"inventory","es":1589374013000,"id":12,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products2","ts":1589374013680,"type":"DELETE"}

0 comments on commit 25701e1

Please sign in to comment.