When rolling back and retrying a transaction in parallel replication, don't
release the domain ownership (for --gtid-ignore-duplicates) as part of the
rollback. Otherwise another master connection could grab the ownership and
double-apply the transaction in parallel with the retry.

Signed-off-by: Kristian Nielsen <kniel...@knielsen-hq.org>
---
 .../gtid_ignore_duplicates.result             | 108 +++++++++++++++++-
 .../multi_source/gtid_ignore_duplicates.test  |  68 ++++++++++-
 sql/rpl_parallel.cc                           |   8 +-
 sql/rpl_rli.cc                                |   4 +-
 sql/rpl_rli.h                                 |   2 +-
 5 files changed, 178 insertions(+), 12 deletions(-)

diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result 
b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result
index e142ff8b981..88b525e21ff 100644
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result
@@ -174,6 +174,105 @@ a
 10
 11
 12
+*** MDEV-33475: --gtid-ignore-duplicate can double-apply event in case of 
parallel replication retry
+connection server_2;
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+include/wait_for_slave_to_stop.inc
+connection server_1;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+BEGIN;
+INSERT INTO t2 VALUES (0, 0);
+INSERT INTO t2 VALUES (1, 0);
+INSERT INTO t2 VALUES (2, 0);
+INSERT INTO t2 VALUES (3, 0);
+INSERT INTO t2 VALUES (4, 0);
+INSERT INTO t2 VALUES (5, 0);
+INSERT INTO t2 VALUES (6, 0);
+INSERT INTO t2 VALUES (7, 0);
+INSERT INTO t2 VALUES (8, 0);
+INSERT INTO t2 VALUES (9, 0);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (0+10, 100);
+UPDATE t2 SET b=0 WHERE a<10;
+INSERT INTO t2 VALUES (0+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (1+10, 100);
+UPDATE t2 SET b=1 WHERE a<10;
+INSERT INTO t2 VALUES (1+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (2+10, 100);
+UPDATE t2 SET b=2 WHERE a<10;
+INSERT INTO t2 VALUES (2+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (3+10, 100);
+UPDATE t2 SET b=3 WHERE a<10;
+INSERT INTO t2 VALUES (3+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (4+10, 100);
+UPDATE t2 SET b=4 WHERE a<10;
+INSERT INTO t2 VALUES (4+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (5+10, 100);
+UPDATE t2 SET b=5 WHERE a<10;
+INSERT INTO t2 VALUES (5+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (6+10, 100);
+UPDATE t2 SET b=6 WHERE a<10;
+INSERT INTO t2 VALUES (6+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (7+10, 100);
+UPDATE t2 SET b=7 WHERE a<10;
+INSERT INTO t2 VALUES (7+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (8+10, 100);
+UPDATE t2 SET b=8 WHERE a<10;
+INSERT INTO t2 VALUES (8+20, 200);
+COMMIT;
+BEGIN;
+INSERT INTO t2 VALUES (9+10, 100);
+UPDATE t2 SET b=9 WHERE a<10;
+INSERT INTO t2 VALUES (9+20, 200);
+COMMIT;
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+COUNT(*)       SUM(a)  SUM(b)
+30     435     3090
+include/save_master_gtid.inc
+connection server_2;
+SET @old_mode= @@GLOBAL.slave_parallel_mode;
+SET GLOBAL slave_parallel_mode=aggressive;
+SET default_master_connection = "a2b";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2b";
+START SLAVE;
+include/wait_for_slave_to_start.inc
+include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+COUNT(*)       SUM(a)  SUM(b)
+30     435     3090
+connection server_3;
+include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+COUNT(*)       SUM(a)  SUM(b)
+30     435     3090
+connection server_4;
+include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+COUNT(*)       SUM(a)  SUM(b)
+30     435     3090
 *** Test also with not using parallel replication.
 connection server_1;
 SET default_master_connection = "b2a";
@@ -474,6 +573,7 @@ Warnings:
 Note   1938    SLAVE 'a2b' stopped
 Note   1938    SLAVE 'c2b' stopped
 SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL slave_parallel_mode= @old_mode;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
 connection server_3;
 SET GLOBAL gtid_domain_id=0;
@@ -491,22 +591,22 @@ Note      1938    SLAVE 'a2d' stopped
 SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
 connection server_1;
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 include/reset_master_slave.inc
 disconnect server_1;
 connection server_2;
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 include/reset_master_slave.inc
 disconnect server_2;
 connection server_3;
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 include/reset_master_slave.inc
 disconnect server_3;
 connection server_4;
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 include/reset_master_slave.inc
 disconnect server_4;
diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test 
b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test
index 3d2d151bd0d..cbc06920b41 100644
--- a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test
@@ -173,6 +173,65 @@ SET default_master_connection = "a2b";
 SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
 
 
+--echo *** MDEV-33475: --gtid-ignore-duplicate can double-apply event in case 
of parallel replication retry
+
+# Create a bunch of transactions that will cause conflicts and retries.
+# The bug was that the retry code was not handling the --gtid-ignore-duplicates
+# option, so events could be doubly-applied.
+
+--connection server_2
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+--source include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+--source include/wait_for_slave_to_stop.inc
+
+--connection server_1
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+BEGIN;
+--let $i= 0
+while ($i < 10) {
+  eval INSERT INTO t2 VALUES ($i, 0);
+  inc $i;
+}
+COMMIT;
+
+--let $i= 0
+while ($i < 10) {
+  BEGIN;
+  eval INSERT INTO t2 VALUES ($i+10, 100);
+  eval UPDATE t2 SET b=$i WHERE a<10;
+  eval INSERT INTO t2 VALUES ($i+20, 200);
+  COMMIT;
+  inc $i;
+}
+
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+--source include/save_master_gtid.inc
+
+--connection server_2
+SET @old_mode= @@GLOBAL.slave_parallel_mode;
+SET GLOBAL slave_parallel_mode=aggressive;
+SET default_master_connection = "a2b";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+SET default_master_connection = "c2b";
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+
+--source include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+
+--connection server_4
+--source include/sync_with_master_gtid.inc
+SELECT COUNT(*), SUM(a), SUM(b) FROM t2;
+
+
 --echo *** Test also with not using parallel replication.
 
 --connection server_1
@@ -414,6 +473,7 @@ SET GLOBAL gtid_domain_id=0;
 --sorted_result
 STOP ALL SLAVES;
 SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL slave_parallel_mode= @old_mode;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
 
 --connection server_3
@@ -431,25 +491,25 @@ SET GLOBAL slave_parallel_threads= @old_parallel;
 SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
 
 --connection server_1
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 --source include/reset_master_slave.inc
 --disconnect server_1
 
 --connection server_2
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 --source include/reset_master_slave.inc
 --disconnect server_2
 
 --connection server_3
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 --source include/reset_master_slave.inc
 --disconnect server_3
 
 --connection server_4
-DROP TABLE t1;
+DROP TABLE t1, t2;
 ALTER TABLE mysql.gtid_slave_pos ENGINE=Aria;
 --source include/reset_master_slave.inc
 --disconnect server_4
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ac96d92eb5d..1c7f4dd1f93 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -868,7 +868,13 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread 
*rpt,
     });
 #endif
 
-  rgi->cleanup_context(thd, 1);
+  /*
+    We are still applying the event group, even though we will roll it back
+    and retry it. So for --gtid-ignore-duplicates, keep ownership of the
+    domain during the retry so another master connection will not try to take
+    over and duplicate apply the same event group (MDEV-33475).
+  */
+  rgi->cleanup_context(thd, 1, 1 /* keep_domain_owner */);
   wait_for_pending_deadlock_kill(thd, rgi);
   thd->reset_killed();
   thd->clear_error();
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 95566b2f6c7..1af38be1787 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2248,7 +2248,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
 }
 
 
-void rpl_group_info::cleanup_context(THD *thd, bool error)
+void rpl_group_info::cleanup_context(THD *thd, bool error, bool 
keep_domain_owner)
 {
   DBUG_ENTER("rpl_group_info::cleanup_context");
   DBUG_PRINT("enter", ("error: %d", (int) error));
@@ -2298,7 +2298,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
       Ensure we always release the domain for others to process, when using
       --gtid-ignore-duplicates.
     */
-    if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
+    if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL && 
!keep_domain_owner)
       rpl_global_gtid_slave_state->release_domain_owner(this);
   }
 
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 9fc1a384355..91628bee3c7 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -917,7 +917,7 @@ struct rpl_group_info
   }
 
   void clear_tables_to_lock();
-  void cleanup_context(THD *, bool);
+  void cleanup_context(THD *, bool, bool keep_domain_owner= false);
   void slave_close_thread_tables(THD *);
   void mark_start_commit_no_lock();
   void mark_start_commit();
-- 
2.30.2

_______________________________________________
commits mailing list -- commits@lists.mariadb.org
To unsubscribe send an email to commits-le...@lists.mariadb.org

Reply via email to