Skip to content

Commit

Permalink
[FLINK-20926][maxwell][json] Allow to read metadata for maxwell-json …
Browse files Browse the repository at this point in the history
…format

This closes apache#16040
  • Loading branch information
SteNicholas authored and lirui-apache committed Jun 10, 2021
1 parent 1455028 commit 410b8c4
Show file tree
Hide file tree
Showing 10 changed files with 726 additions and 109 deletions.
68 changes: 68 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/maxwell.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,74 @@ INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
```

Available Metadata
------------------

The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.

{{< hint info >}}
Format metadata fields are only available if the
corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose
metadata fields for its value format.
{{< /hint >}}

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Key</th>
<th class="text-center" style="width: 40%">Data Type</th>
<th class="text-center" style="width: 40%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>database</code></td>
<td><code>STRING NULL</code></td>
<td>The originating database. Corresponds to the <code>database</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>table</code></td>
<td><code>STRING NULL</code></td>
<td>The originating database table. Corresponds to the <code>table</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>primary-key-columns</code></td>
<td><code>ARRAY&lt;STRING&gt; NULL</code></td>
<td>Array of primary key names. Corresponds to the <code>primary_key_columns</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>ingestion-timestamp</code></td>
<td><code>TIMESTAMP_LTZ(3) NULL</code></td>
<td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code>
field in the Maxwell record.</td>
</tr>
</tbody>
</table>

The following example shows how to access Maxwell metadata fields in Kafka:

```sql
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
```

Format Options
----------------

Expand Down
68 changes: 68 additions & 0 deletions docs/content/docs/connectors/table/formats/maxwell.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,74 @@ INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
```

Available Metadata
------------------

The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.

{{< hint info >}}
Format metadata fields are only available if the
corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose
metadata fields for its value format.
{{< /hint >}}

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Key</th>
<th class="text-center" style="width: 40%">Data Type</th>
<th class="text-center" style="width: 40%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>database</code></td>
<td><code>STRING NULL</code></td>
<td>The originating database. Corresponds to the <code>database</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>table</code></td>
<td><code>STRING NULL</code></td>
<td>The originating database table. Corresponds to the <code>table</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>primary-key-columns</code></td>
<td><code>ARRAY&lt;STRING&gt; NULL</code></td>
<td>Array of primary key names. Corresponds to the <code>primary_key_columns</code> field in the
Maxwell record if available.</td>
</tr>
<tr>
<td><code>ingestion-timestamp</code></td>
<td><code>TIMESTAMP_LTZ(3) NULL</code></td>
<td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code>
field in the Maxwell record.</td>
</tr>
</tbody>
</table>

The following example shows how to access Maxwell metadata fields in Kafka:

```sql
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
```

Format Options
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,152 @@ public void testKafkaCanalChangelogSource() throws Exception {
tableResult.getJobClient().get().cancel().get(); // stop the job
deleteTestTopic(topic);
}

@Test
public void testKafkaMaxwellChangelogSource() throws Exception {
final String topic = "changelog_maxwell";
createTestTopic(topic, 1, 1);

// configure time zone of the Maxwell Json metadata "ingestion-timestamp"
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
// enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769
Configuration tableConf = tEnv.getConfig().getConfiguration();
tableConf.setString("table.exec.mini-batch.enabled", "true");
tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
tableConf.setString("table.exec.mini-batch.size", "5000");
tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

// ---------- Write the Maxwell json into Kafka -------------------
List<String> lines = readLines("maxwell-data.txt");
DataStreamSource<String> stream = env.fromCollection(lines);
SerializationSchema<String> serSchema = new SimpleStringSchema();
FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();

// the producer must not produce duplicates
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
try {
stream.addSink(
new FlinkKafkaProducer<>(
topic,
serSchema,
producerProperties,
partitioner,
EXACTLY_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
env.execute("Write sequence");
} catch (Exception e) {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}

// ---------- Produce an event time stream into Kafka -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
"CREATE TABLE maxwell_source ("
// test format metadata
+ " origin_database STRING METADATA FROM 'value.database' VIRTUAL,"
+ " origin_table STRING METADATA FROM 'value.table' VIRTUAL,"
+ " origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,"
+ " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,"
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
// test connector metadata
+ " origin_topic STRING METADATA FROM 'topic' VIRTUAL,"
+ " origin_partition STRING METADATA FROM 'partition' VIRTUAL" // unused
+ ") WITH ("
+ " 'connector' = 'kafka',"
+ " 'topic' = '%s',"
+ " 'properties.bootstrap.servers' = '%s',"
+ " 'scan.startup.mode' = 'earliest-offset',"
+ " 'value.format' = 'maxwell-json'"
+ ")",
topic, bootstraps);
String sinkDDL =
"CREATE TABLE sink ("
+ " origin_topic STRING,"
+ " origin_database STRING,"
+ " origin_table STRING,"
+ " origin_primary_key_columns ARRAY<STRING>,"
+ " origin_ts TIMESTAMP(3),"
+ " name STRING,"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult tableResult =
tEnv.executeSql(
"INSERT INTO sink "
+ "SELECT origin_topic, origin_database, origin_table, origin_primary_key_columns, "
+ "origin_ts, name "
+ "FROM maxwell_source");

/*
* Maxwell captures change data on the `products` table:
*
* <pre>
* CREATE TABLE products (
* id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
* name VARCHAR(255),
* description VARCHAR(512),
* weight FLOAT
* );
* ALTER TABLE products AUTO_INCREMENT = 101;
*
* INSERT INTO products
* VALUES (default,"scooter","Small 2-wheel scooter",3.14),
* (default,"car battery","12V car battery",8.1),
* (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
* (default,"hammer","12oz carpenter's hammer",0.75),
* (default,"hammer","14oz carpenter's hammer",0.875),
* (default,"hammer","16oz carpenter's hammer",1.0),
* (default,"rocks","box of assorted rocks",5.3),
* (default,"jacket","water resistent black wind breaker",0.1),
* (default,"spare tire","24 inch spare tire",22.2);
* UPDATE products SET description='18oz carpenter hammer' WHERE id=106;
* UPDATE products SET weight='5.1' WHERE id=107;
* INSERT INTO products VALUES (default,"jacket","water resistent white wind breaker",0.2);
* INSERT INTO products VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
* UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
* UPDATE products SET weight='5.17' WHERE id=111;
* DELETE FROM products WHERE id=111;
*
* > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+
* | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+
* </pre>
*/

List<String> expected =
Arrays.asList(
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");

waitingExpectedResults("sink", expected, Duration.ofSeconds(10));

// ------------- cleanup -------------------

tableResult.getJobClient().get().cancel().get(); // stop the job
deleteTestTopic(topic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":1,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":2,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":3,"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":4,"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":5,"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":6,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"xoffset":7,"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1}}
{"database":"test","table":"product","type":"insert","ts":1596684883,"xid":7125,"commit":true,"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2}}
{"database":"test","table":"product","type":"update","ts":1596684893,"xid":7152,"commit":true,"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0},"old":{"description":"16oz carpenter's hammer"}}
{"database":"test","table":"product","type":"update","ts":1596684897,"xid":7169,"commit":true,"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1},"old":{"weight":5.3}}
{"database":"test","table":"product","type":"insert","ts":1596684900,"xid":7186,"commit":true,"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2}}
{"database":"test","table":"product","type":"insert","ts":1596684904,"xid":7201,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18}}
{"database":"test","table":"product","type":"update","ts":1596684906,"xid":7216,"commit":true,"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"old":{"description":"water resistent white wind breaker","weight":0.2}}
{"database":"test","table":"product","type":"update","ts":1596684912,"xid":7235,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17},"old":{"weight":5.18}}
{"database":"test","table":"product","type":"delete","ts":1596684914,"xid":7250,"commit":true,"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17}}
{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"xoffset":0,"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":5.17},"old":{"weight":3.14}}
{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}
{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"xoffset":0,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17}}
{"database":"test","table":"product","type":"delete","ts":1596684938,"xid":7322,"commit":true,"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8}}
Loading

0 comments on commit 410b8c4

Please sign in to comment.