Skip to content

Commit

Permalink
MDEV-7850: Extend GTID Binlog Events with Thread Id
Browse files Browse the repository at this point in the history
This patch augments Gtid_log_event with the user thread-id.
In particular that compensates for the loss of this info in
Rows_log_events.

Gtid_log_event::thread_id gets visible in mysqlbinlog output like

  #231025 16:21:45 server id 1  end_log_pos 537 CRC32 0x1cf1d963  GTID 0-1-2 ddl thread_id=10

as a 32 bit unsigned integer. Note this is a 32-bit value, as
the connection id can only be 32 bits (see MDEV-15089 for
details).

While the size of Gtid event has grown by 4 bytes
replication from OLD <-> NEW is not affected by it. This patch
also slightly changes the logic to convert Gtid events to Query
events for older replicas which don't support Gtid. Instead of
hard-coding the padding of the sys var section of the generated
Query event, the length to pad is dynamically calculated based
on the length of the Gtid event.

This work was started by the late Sujatha Sivakumar.
Brandon Nesterenko took it over, reviewed initial patches and
extended the work.

Also thanks to Andrei for his help in finalizing the fixes for
MDEV-33924, which were squashed into this patch.

Reviewed-by:
=============
Andrei Elkin <[email protected]>
Kristian Nielsen <[email protected]>
  • Loading branch information
bnestere committed May 3, 2024
1 parent f151c5f commit e4afa61
Show file tree
Hide file tree
Showing 28 changed files with 1,072 additions and 769 deletions.
1 change: 1 addition & 0 deletions mysql-test/include/binlog_parallel_replication_marks.test
Expand Up @@ -80,6 +80,7 @@ while (<F>) {
s/table id \d+/table id #/;
s/mapped to number \d+/mapped to number #/;
s/CRC32 0x[0-9a-f]+/CRC32 0x########/;
s/thread_id=\d+/thread_id=#/;
print if /\b(GTID|START TRANSACTION|COMMIT|Table_map|Write_rows|Update_rows|Delete_rows|generated by server|40005 TEMPORARY)\b/;
}
close F;
Expand Down
98 changes: 49 additions & 49 deletions mysql-test/main/mysqlbinlog_row_compressed.result
Expand Up @@ -16,23 +16,23 @@ FLUSH BINARY LOGS;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#<date> server id 1 end_log_pos 256 CRC32 XXX Start: xxx
#<date> server id 1 end_log_pos # CRC32 XXX Start: xxx
ROLLBACK/*!*/;
# at 256
#<date> server id 1 end_log_pos 285 CRC32 XXX Gtid list []
#<date> server id 1 end_log_pos # CRC32 XXX Gtid list []
# at 285
#<date> server id 1 end_log_pos 329 CRC32 XXX Binlog checkpoint master-bin.000001
#<date> server id 1 end_log_pos # CRC32 XXX Binlog checkpoint master-bin.000001
# at 329
#<date> server id 1 end_log_pos 371 CRC32 XXX GTID 0-1-1 ddl
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-1 ddl thread_id=TID
/*!100101 SET @@session.skip_parallel_replication=0*//*!*/;
/*!100001 SET @@session.gtid_domain_id=0*//*!*/;
/*!100001 SET @@session.server_id=1*//*!*/;
/*!100001 SET @@session.gtid_seq_no=1*//*!*/;
# at 371
#<date> server id 1 end_log_pos 542 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query_compressed thread_id=TID exec_time=x error_code=0 xid=<xid>
use `test`/*!*/;
SET TIMESTAMP=X/*!*/;
SET @@session.pseudo_thread_id=5/*!*/;
SET @@session.pseudo_thread_id=TID/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1, @@session.check_constraint_checks=1, @@session.sql_if_exists=0, @@session.explicit_defaults_for_timestamp=1, @@session.system_versioning_insert_history=0/*!*/;
SET @@session.sql_mode=#/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
Expand All @@ -43,25 +43,25 @@ SET @@session.collation_database=DEFAULT/*!*/;
CREATE TABLE t1 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 TINYINT, f4 MEDIUMINT, f5 BIGINT, f6 INT, f7 INT, f8 char(1))
/*!*/;
# at 542
#<date> server id 1 end_log_pos 584 CRC32 XXX GTID 0-1-2 ddl
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-2 ddl thread_id=TID
/*!100001 SET @@session.gtid_seq_no=2*//*!*/;
# at 584
#<date> server id 1 end_log_pos 745 CRC32 XXX Query_compressed thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query_compressed thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
CREATE TABLE t2 (pk INT PRIMARY KEY, f1 INT, f2 INT, f3 INT, f4 INT, f5 MEDIUMINT, f6 INT, f7 INT, f8 char(1))
/*!*/;
# at 745
#<date> server id 1 end_log_pos 787 CRC32 XXX GTID 0-1-3
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-3 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=3*//*!*/;
START TRANSACTION
/*!*/;
# at 787
# at 861
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> INSERT INTO t1 VALUES (10, 1, 2, 3, 4, 5, 6, 7, "")
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number num
# at 917
#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
### INSERT INTO `test`.`t1`
### SET
### @1=10 /* INT meta=0 nullable=0 is_null=0 */
Expand All @@ -75,22 +75,22 @@ START TRANSACTION
### @9='' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 1
# at 985
#<date> server id 1 end_log_pos 1058 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 1058
#<date> server id 1 end_log_pos 1100 CRC32 XXX GTID 0-1-4
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-4 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=4*//*!*/;
START TRANSACTION
/*!*/;
# at 1100
# at 1176
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> INSERT INTO t1 VALUES (11, 1, 2, 3, 4, 5, 6, 7, NULL)
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number num
# at 1232
#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
### INSERT INTO `test`.`t1`
### SET
### @1=11 /* INT meta=0 nullable=0 is_null=0 */
Expand All @@ -104,22 +104,22 @@ START TRANSACTION
### @9=NULL /* STRING(1) meta=65025 nullable=1 is_null=1 */
# Number of rows: 1
# at 1299
#<date> server id 1 end_log_pos 1372 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 1372
#<date> server id 1 end_log_pos 1414 CRC32 XXX GTID 0-1-5
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-5 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=5*//*!*/;
START TRANSACTION
/*!*/;
# at 1414
# at 1492
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> INSERT INTO t1 VALUES (12, 1, 2, 3, NULL, 5, 6, 7, "A")
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number num
# at 1548
#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
### INSERT INTO `test`.`t1`
### SET
### @1=12 /* INT meta=0 nullable=0 is_null=0 */
Expand All @@ -133,22 +133,22 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 1
# at 1614
#<date> server id 1 end_log_pos 1687 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 1687
#<date> server id 1 end_log_pos 1729 CRC32 XXX GTID 0-1-6
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-6 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=6*//*!*/;
START TRANSACTION
/*!*/;
# at 1729
# at 1804
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> INSERT INTO t1 VALUES (13, 1, 2, 3, 0, 5, 6, 7, "A")
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number num
# at 1860
#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Write_compressed_rows: table id 32 flags: STMT_END_F
### INSERT INTO `test`.`t1`
### SET
### @1=13 /* INT meta=0 nullable=0 is_null=0 */
Expand All @@ -162,22 +162,22 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 1
# at 1927
#<date> server id 1 end_log_pos 2000 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 2000
#<date> server id 1 end_log_pos 2042 CRC32 XXX GTID 0-1-7
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-7 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=7*//*!*/;
START TRANSACTION
/*!*/;
# at 2042
# at 2096
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> INSERT INTO t2 SELECT * FROM t1
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number num
# at 2152
#<date> server id 1 end_log_pos 0 CRC32 XXX Write_compressed_rows: table id 33 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Write_compressed_rows: table id 33 flags: STMT_END_F
### INSERT INTO `test`.`t2`
### SET
### @1=10 /* INT meta=0 nullable=0 is_null=0 */
Expand Down Expand Up @@ -224,22 +224,22 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 4
# at 2243
#<date> server id 1 end_log_pos 2316 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 2316
#<date> server id 1 end_log_pos 2358 CRC32 XXX GTID 0-1-8
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-8 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=8*//*!*/;
START TRANSACTION
/*!*/;
# at 2358
# at 2424
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> UPDATE t2 SET f4=5 WHERE f4>0 or f4 is NULL
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number num
# at 2480
#<date> server id 1 end_log_pos 0 CRC32 XXX Update_compressed_rows: table id 33 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Update_compressed_rows: table id 33 flags: STMT_END_F
### UPDATE `test`.`t2`
### WHERE
### @1=10 /* INT meta=0 nullable=0 is_null=0 */
Expand Down Expand Up @@ -305,22 +305,22 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 3
# at 2579
#<date> server id 1 end_log_pos 2652 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 2652
#<date> server id 1 end_log_pos 2694 CRC32 XXX GTID 0-1-9
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-9 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=9*//*!*/;
START TRANSACTION
/*!*/;
# at 2694
# at 2731
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> DELETE FROM t1
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t1` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t1` mapped to number num
# at 2787
#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_compressed_rows: table id 32 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Delete_compressed_rows: table id 32 flags: STMT_END_F
### DELETE FROM `test`.`t1`
### WHERE
### @1=10 /* INT meta=0 nullable=0 is_null=0 */
Expand Down Expand Up @@ -367,22 +367,22 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 4
# at 2879
#<date> server id 1 end_log_pos 2952 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 2952
#<date> server id 1 end_log_pos 2994 CRC32 XXX GTID 0-1-10
#<date> server id 1 end_log_pos # CRC32 XXX GTID 0-1-10 thread_id=TID
/*!100001 SET @@session.gtid_seq_no=10*//*!*/;
START TRANSACTION
/*!*/;
# at 2994
# at 3031
#<date> server id 1 end_log_pos 0 CRC32 XXX Annotate_rows:
#<date> server id 1 end_log_pos # CRC32 XXX Annotate_rows:
#Q> DELETE FROM t2
#<date> server id 1 end_log_pos 0 CRC32 XXX Table_map: `test`.`t2` mapped to number num
#<date> server id 1 end_log_pos # CRC32 XXX Table_map: `test`.`t2` mapped to number num
# at 3087
#<date> server id 1 end_log_pos 0 CRC32 XXX Delete_compressed_rows: table id 33 flags: STMT_END_F
#<date> server id 1 end_log_pos # CRC32 XXX Delete_compressed_rows: table id 33 flags: STMT_END_F
### DELETE FROM `test`.`t2`
### WHERE
### @1=10 /* INT meta=0 nullable=0 is_null=0 */
Expand Down Expand Up @@ -429,12 +429,12 @@ START TRANSACTION
### @9='A' /* STRING(1) meta=65025 nullable=1 is_null=0 */
# Number of rows: 4
# at 3172
#<date> server id 1 end_log_pos 3245 CRC32 XXX Query thread_id=5 exec_time=x error_code=0 xid=<xid>
#<date> server id 1 end_log_pos # CRC32 XXX Query thread_id=TID exec_time=x error_code=0 xid=<xid>
SET TIMESTAMP=X/*!*/;
COMMIT
/*!*/;
# at 3245
#<date> server id 1 end_log_pos 3293 CRC32 XXX Rotate to master-bin.000002 pos: 4
#<date> server id 1 end_log_pos # CRC32 XXX Rotate to master-bin.000002 pos: 4
DELIMITER ;
# End of log file
ROLLBACK /* added by mysqlbinlog */;
Expand Down
2 changes: 1 addition & 1 deletion mysql-test/main/mysqlbinlog_row_compressed.test
Expand Up @@ -30,7 +30,7 @@ DELETE FROM t2;

FLUSH BINARY LOGS;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
--replace_regex /\d{6} *\d*:\d\d:\d\d/<date>/ /Start:.*at startup/Start: xxx/ /SET TIMESTAMP=\d*/SET TIMESTAMP=X/ /exec_time=\d*/exec_time=x/ /mapped to number \d*/mapped to number num/ /CRC32 0x[0-9a-f]+/CRC32 XXX/ /@@session.sql_mode=\d+/@@session.sql_mode=#/ /collation_server=\d+/collation_server=#/ /xid=\d*/xid=<xid>/
--replace_regex /\d{6} *\d*:\d\d:\d\d/<date>/ /Start:.*at startup/Start: xxx/ /SET TIMESTAMP=\d*/SET TIMESTAMP=X/ /exec_time=\d*/exec_time=x/ /mapped to number \d*/mapped to number num/ /CRC32 0x[0-9a-f]+/CRC32 XXX/ /@@session.sql_mode=\d+/@@session.sql_mode=#/ /collation_server=\d+/collation_server=#/ /xid=\d*/xid=<xid>/ /thread_id=\d*/thread_id=TID/ /end_log_pos [0-9]*/end_log_pos #/
--exec $MYSQL_BINLOG --verbose --verbose --base64-output=DECODE-ROWS $datadir/$binlog

--echo
Expand Down

0 comments on commit e4afa61

Please sign in to comment.