Skip to content

Commit

Permalink
[FLINK-20885][canal-json] Fix deserialization exception when using 'c…
Browse files Browse the repository at this point in the history
…anal-json.table.include' to filter binlogs of multiple tables

This closes apache#14631
  • Loading branch information
SteNicholas committed Jan 14, 2021
1 parent 500c30c commit 9d674ed
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public RowData deserialize(@Nullable byte[] message) throws IOException {
return null;
}
try {
final JsonNode root = objectMapper.readTree(message);
return (RowData) runtimeConverter.convert(root);
return convertToRowData(deserializeToJsonNode(message));
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
Expand All @@ -113,6 +112,14 @@ public RowData deserialize(@Nullable byte[] message) throws IOException {
}
}

public JsonNode deserializeToJsonNode(byte[] message) throws IOException {
return objectMapper.readTree(message);
}

public RowData convertToRowData(JsonNode message) {
return (RowData) runtimeConverter.convert(message);
}

@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -193,19 +195,18 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws
return;
}
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
if (database != null) {
String currentDatabase = row.getString(3).toString();
if (!database.equals(currentDatabase)) {
if (!database.equals(root.get(ReadableMetadata.DATABASE.key).asText())) {
return;
}
}
if (table != null) {
String currentTable = row.getString(4).toString();
if (!table.equals(currentTable)) {
if (!table.equals(root.get(ReadableMetadata.TABLE.key).asText())) {
return;
}
}
final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
String type = row.getString(2).toString(); // "type" field
if (OP_INSERT.equals(type)) {
// "data" field is an array of row, contains inserted rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,8 @@ public void testDeserializationMissingField() throws Exception {

// fail on missing field
deserializationSchema =
deserializationSchema =
new JsonRowDataDeserializationSchema(
schema,
InternalTypeInfo.of(schema),
true,
false,
TimestampFormat.ISO_8601);
new JsonRowDataDeserializationSchema(
schema, InternalTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);

String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'.";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter ","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":9,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337341,"type":"DELETE"}
{"data":[{"id":"101","name":"scooter","description":"Small 2-wheel scooter","weight":"5.17"},{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"}],"database":"mydb","es":1598944337000,"id":10,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":[{"weight":"3.14"},{"weight":"8.1"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944337663,"type":"UPDATE"}
{"data":[{"order_number":"10002","order_date":"2016-01-17","purchaser":"1002","quantity":"2","product_id":"105"}],"database":"mydb","es":1598944374000,"id":11,"isDdl":false,"mysqlType":{"order_number":"INTEGER","order_date":"DATE","purchaser":"INTEGER","quantity":"INTEGER","product_id":"INTEGER"},"old":null,"pkNames":["order_number"],"sql":"","sqlType":{"order_number":4,"order_date":91,"purchaser":4,"quantity":4,"product_id":4},"table":"orders","ts":1598944374999,"type":"DELETE"}
{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"mydb","es":1598944418000,"id":12,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944418418,"type":"DELETE"}
{"data":[{"id":"102","name":"car battery","description":"12V car battery","weight":"5.17"},{"id":"103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"}],"database":"mydb","es":1598944418000,"id":12,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"product","ts":1598944418418,"type":"DELETE"}
{"data":null,"database":"mydb","es":1598944271000,"id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE project (\n id VARCHAR(255) NOT NULL,\n name VARCHAR(255) NOT NULL,\n description VARCHAR(255) NOT NULL,\n weight FLOAT NOT NULL\n)","sqlType":null,"table":"projects","ts":1598944271192,"type":"CREATE"}
{"data":[{"id":"A101","name":"scooter","description":"Small 2-wheel scooter","weight":"3.14"},{"id":"A102","name":"car battery","description":"12V car battery","weight":"8.1"},{"id":"A103","name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":"0.8"},{"id":"A104","name":"hammer","description":"12oz carpenter's hammer","weight":"0.75"},{"id":"A105","name":"hammer","description":"14oz carpenter's hammer","weight":"0.875"},{"id":"A106","name":"hammer","description":"16oz carpenter's hammer","weight":"1.0"},{"id":"A107","name":"rocks","description":"box of assorted rocks","weight":"5.3"},{"id":"A108","name":"jacket","description":"water resistent black wind breaker","weight":"0.1"},{"id":"A109","name":"spare tire","description":"24 inch spare tire","weight":"22.2"}],"database":"mydb","es":1598944132000,"id":14,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","description":"varchar(512)","weight":"float"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"project","ts":1598944146308,"type":"INSERT"}

0 comments on commit 9d674ed

Please sign in to comment.