Skip to content

Commit

Permalink
[FLINK-21172][canal][json] Support 'event-timestampcanal-json format …
Browse files Browse the repository at this point in the history
…include es field (apache#14792)
  • Loading branch information
SteNicholas committed Feb 2, 2021
1 parent a334b92 commit 195056b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
10 changes: 9 additions & 1 deletion docs/dev/table/connectors/formats/canal.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ metadata fields for its value format.
<td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code>
field in the Canal record.</td>
</tr>
<tr>
<td><code>event-timestamp</code></td>
<td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NULL</code></td>
<td>The timestamp when the corresponding change was executed in MySQL server. Corresponds to the <code>es</code>
field in the Canal record.</td>
</tr>
</tbody>
</table>

Expand All @@ -204,9 +210,11 @@ CREATE TABLE KafkaTable (
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
behavior STRING,
WATERMARK FOR origin_es AS origin_es - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
Expand Down
12 changes: 10 additions & 2 deletions docs/dev/table/connectors/formats/canal.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ metadata fields for its value format.
<tr>
<td><code>ingestion-timestamp</code></td>
<td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NULL</code></td>
<td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code>
<td>The timestamp when the corresponding change was executed in MySQL server. Corresponds to the <code>ts</code>
field in the Canal record.</td>
</tr>
<tr>
<td><code>event-timestamp</code></td>
<td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NULL</code></td>
<td>The timestamp at which the connector produced the event. Corresponds to the <code>es</code>
field in the Canal record.</td>
</tr>
</tbody>
Expand All @@ -203,9 +209,11 @@ CREATE TABLE KafkaTable (
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
behavior STRING,
WATERMARK FOR origin_es AS origin_es - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,15 @@ public void testKafkaCanalChangelogSource() throws Exception {
+ " origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,"
+ " origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,"
+ " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,"
+ " origin_es TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,"
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
// test connector metadata
+ " origin_topic STRING METADATA FROM 'topic' VIRTUAL,"
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL" // unused
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL," // unused
+ " WATERMARK FOR origin_es AS origin_es - INTERVAL '5' SECOND"
+ ") WITH ("
+ " 'connector' = 'kafka',"
+ " 'topic' = '%s',"
Expand All @@ -271,6 +273,7 @@ public void testKafkaCanalChangelogSource() throws Exception {
+ " origin_sql_type MAP<STRING, INT>,"
+ " origin_pk_names ARRAY<STRING>,"
+ " origin_ts TIMESTAMP(3),"
+ " origin_es TIMESTAMP(3),"
+ " name STRING,"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
Expand All @@ -283,7 +286,7 @@ public void testKafkaCanalChangelogSource() throws Exception {
tEnv.executeSql(
"INSERT INTO sink "
+ "SELECT origin_topic, origin_database, origin_table, origin_sql_type, "
+ "origin_pk_names, origin_ts, name "
+ "origin_pk_names, origin_ts, origin_es, name "
+ "FROM canal_source");

/*
Expand Down Expand Up @@ -336,11 +339,11 @@ public void testKafkaCanalChangelogSource() throws Exception {

List<String> expected =
Arrays.asList(
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, spare tire]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, hammer]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, rocks]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, jacket]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, scooter]");
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");

waitingExpectedResults("sink", expected, Duration.ofSeconds(10));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ public Object convert(GenericRowData row, int pos) {
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int pos) {
if (row.isNullAt(pos)) {
return null;
}
return TimestampData.fromEpochMillis(row.getLong(pos));
}
}),

EVENT_TIMESTAMP(
"event-timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
DataTypes.FIELD("es", DataTypes.BIGINT()),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(GenericRowData row, int pos) {
if (row.isNullAt(pos)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void testDeserializationWithMetadata() throws Exception {
assertThat(row.getMap(6).size(), equalTo(4));
assertThat(row.getArray(7).getString(0).toString(), equalTo("id"));
assertThat(row.getTimestamp(8, 3).getMillisecond(), equalTo(1589373515477L));
assertThat(row.getTimestamp(9, 3).getMillisecond(), equalTo(1589373515000L));
});
testDeserializationWithMetadata(
"canal-data-filter-table.txt",
Expand All @@ -124,6 +125,7 @@ public void testDeserializationWithMetadata() throws Exception {
assertThat(row.getMap(6).size(), equalTo(4));
assertThat(row.getArray(7).getString(0).toString(), equalTo("id"));
assertThat(row.getTimestamp(8, 3).getMillisecond(), equalTo(1598944146308L));
assertThat(row.getTimestamp(9, 3).getMillisecond(), equalTo(1598944132000L));
});
}

Expand Down

0 comments on commit 195056b

Please sign in to comment.