From: Andrei <andrei.el...@mariadb.com>

XA-Prepare group of events

  XA START xid
  ...
  XA END xid
  XA PREPARE xid

and its XA-"complete" terminator

  XA COMMIT or
  XA ROLLBACK

are made distributed Round-Robin across slave parallel workers.
The former hash-based policy was proven to attribute to execution
latency through creating a big - many times larger than the size
of the worker pool - queue of binlog-ordered transactions
to commit.

Acronyms and notations used below:

  XAP := XA-Prepare event or the whole prepared XA group of events
  XAC := XA-"complete", which is a solitary group of events
  |W| := the size of the slave worker pool
  Subscripts like `_k' denote order in a corresponding sequence
     (e.g binlog file).

KEY CHANGES:

The parallel slave
------------------
driver thread now maintains a list XAP:s currently
in processing. It's purpose is to avoid "wild" parallel execution of XA:s
with duplicate xids (unlikely, but that's the user's right).
The list is arranged as a sliding window with the size of 2*|W| to account
a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events
count sense) dependency.
Say k=1, and |W| the # of Workers is 4. As transactions are distributed
Round-Robin, it's possible to have T^*_1 -> T^*_8 as the largest
dependency ('*' marks the dependents) in runtime.
It can be seen from worker queues, like in the picture below.
Let Q_i worker queues  develop downward:

  Q1 ...  Q4
  1^* 2 3 4
  5   6 7 8^*

Worker # 1 has assigned with T_1 and T_5.
Worker #4 can take on its T_8 when T_1 is yet at the
beginning of its processing, so even before XA START of that XAP.

XA related
----------
XID_cache_element is extended with two pointers to resolve
two types of dependencies: the duplicate xid XAP_k -> XAP_k+i
and the ordinary completion on the prepare XAP_k -> XAC_k+j.
The former is handled by a wait-for-xid protocol conducted by
xid_cache_delete() and xid_cache_insert_maybe_wait().
The later is done analogously by xid_cache_search_maybe_wait() and
slave_applier_reset_xa_trans().

XA-"complete" are allowed to go forward before its XAP parent
has released the xid (all recovery concerns are covered in MDEV-21496,
MDEV-21777).
Yet XAC is going to wait for it at a critical
point of execution which is at "complete" the work in Engine.

CAVEAT: storage/innobase/trx/trx0undo.cc changes are due to possibly
        fixed MDEV-32144,
        TODO: to be verified.

Thanks to Brandon Nesterenko at mariadb.com for initial review and
a lot of creative efforts to advance with this work!
---
 mysql-test/include/show_binlog_events2.inc    |  11 +
 .../binlog/r/binlog_xa_prepared_bugs.result   |  53 +
 .../binlog/t/binlog_xa_prepared_bugs.test     |  30 +
 .../rpl/include/rpl_xa_concurrent_2pc.inc     | 442 ++++++++
 .../suite/rpl/r/rpl_xa_concurrent_2pc.result  | 953 ++++++++++++++++++
 .../rpl/r/rpl_xa_empty_transaction.result     |  51 +
 .../rpl/r/rpl_xa_prepare_gtid_fail.result     |   2 +-
 .../suite/rpl/t/rpl_xa_concurrent_2pc.test    | 111 ++
 .../suite/rpl/t/rpl_xa_empty_transaction.test |  89 ++
 .../suite/rpl/t/rpl_xa_prepare_gtid_fail.test |   2 +-
 sql/handler.cc                                |  23 +-
 sql/log.cc                                    | 219 +++-
 sql/log_event_server.cc                       |  17 +-
 sql/mysqld.cc                                 |   4 +-
 sql/mysqld.h                                  |   4 +-
 sql/rpl_parallel.cc                           | 128 ++-
 sql/rpl_parallel.h                            |  16 +
 sql/rpl_rli.cc                                |   6 +-
 sql/rpl_rli.h                                 |   8 +
 sql/sql_array.h                               |   4 +-
 sql/sql_class.cc                              |   2 +
 sql/sql_class.h                               |   9 +
 sql/xa.cc                                     | 333 +++++-
 storage/innobase/trx/trx0undo.cc              |   3 +-
 24 files changed, 2419 insertions(+), 101 deletions(-)
 create mode 100644 mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
 create mode 100644 mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
 create mode 100644 mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
 create mode 100644 mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
 create mode 100644 mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test

diff --git a/mysql-test/include/show_binlog_events2.inc 
b/mysql-test/include/show_binlog_events2.inc
index 84c62cced66..416514faea4 100644
--- a/mysql-test/include/show_binlog_events2.inc
+++ b/mysql-test/include/show_binlog_events2.inc
@@ -1,3 +1,9 @@
+# ==== Usage ====
+#
+# [--let $binlog_file= [<FILENAME> | LAST]]
+# [--let $binlog_start= <POSITION> ]
+# [--let $filter_cid=  [0 | 1]
+
 if ($binlog_start)
 {
   --let $_binlog_start=$binlog_start
@@ -14,4 +20,9 @@ if ($binlog_file)
 --replace_result "$_from_binlog_start" "from <binlog_start>" $MYSQLTEST_VARDIR 
MYSQLTEST_VARDIR
 --replace_column 2 # 5 #
 --replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ 
/file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/
+if ($filter_cid)
+{
+--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ 
/file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ / cid=[0-9]+//
+
+}
 --eval show binlog events $_in_binlog_file from $_binlog_start
diff --git a/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result 
b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
new file mode 100644
index 00000000000..35b7accfb24
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_xa_prepared_bugs.result
@@ -0,0 +1,53 @@
+CREATE TABLE ta (c INT KEY) engine=Aria;
+XA START 'xid_a';
+INSERT INTO ta VALUES (1);
+XA END 'xid_a';
+XA PREPARE 'xid_a';
+Warnings:
+Warning        1030    Got error 131 "Command not supported by the engine" 
from storage engine Aria
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+Table  Op      Msg_type        Msg_text
+test.c preload_keys    Error   XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED state
+test.c preload_keys    Error   XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED state
+test.c preload_keys    error   Corrupt
+Warnings:
+Warning        1196    Some non-transactional changed tables couldn't be 
rolled back
+XA ROLLBACK 'xid_a';
+CREATE TABLE ti (c INT KEY) engine=Innodb;
+XA START 'xid_i';
+INSERT INTO ti VALUES (1);
+XA END 'xid_i';
+XA PREPARE 'xid_i';
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+Table  Op      Msg_type        Msg_text
+test.c preload_keys    Error   XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED state
+test.c preload_keys    Error   XAER_RMFAIL: The command cannot be executed 
when global transaction is in the  PREPARED state
+test.c preload_keys    error   Corrupt
+XA COMMIT 'xid_i';
+SELECT * FROM ti;
+c
+include/show_binlog_events.inc
+Log_name       Pos     Event_type      Server_id       End_log_pos     Info
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; CREATE 
TABLE ta (c INT KEY) engine=Aria
+master-bin.000001      #       Gtid    #       #       BEGIN GTID #-#-#
+master-bin.000001      #       Annotate_rows   #       #       INSERT INTO ta 
VALUES (1)
+master-bin.000001      #       Table_map       #       #       table_id: # 
(test.ta)
+master-bin.000001      #       Write_rows_v1   #       #       table_id: # 
flags: STMT_END_F
+master-bin.000001      #       Query   #       #       COMMIT
+master-bin.000001      #       Gtid    #       #       XA START 
X'7869645f61',X'',1 GTID #-#-#
+master-bin.000001      #       Query   #       #       XA END 
X'7869645f61',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7869645f61',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK 
X'7869645f61',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; CREATE 
TABLE ti (c INT KEY) engine=Innodb
+master-bin.000001      #       Gtid    #       #       XA START 
X'7869645f69',X'',1 GTID #-#-#
+master-bin.000001      #       Annotate_rows   #       #       INSERT INTO ti 
VALUES (1)
+master-bin.000001      #       Table_map       #       #       table_id: # 
(test.ti)
+master-bin.000001      #       Write_rows_v1   #       #       table_id: # 
flags: STMT_END_F
+master-bin.000001      #       Query   #       #       XA END 
X'7869645f69',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7869645f69',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK 
X'7869645f69',X'',1
+drop table ta,ti;
diff --git a/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test 
b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
new file mode 100644
index 00000000000..3a5bb15968e
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_xa_prepared_bugs.test
@@ -0,0 +1,30 @@
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+
+CREATE TABLE ta (c INT KEY) engine=Aria;
+XA START 'xid_a';
+INSERT INTO ta VALUES (1);
+XA END 'xid_a';
+XA PREPARE 'xid_a';
+
+#--error ER_XAER_RMFAIL
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+
+XA ROLLBACK 'xid_a';
+
+CREATE TABLE ti (c INT KEY) engine=Innodb;
+XA START 'xid_i';
+INSERT INTO ti VALUES (1);
+XA END 'xid_i';
+XA PREPARE 'xid_i';
+
+# --error ER_XAER_RMFAIL
+LOAD INDEX INTO CACHE c KEY(PRIMARY);
+
+XA COMMIT 'xid_i';
+SELECT * FROM ti;
+
+#
+--source include/show_binlog_events.inc
+
+drop table ta,ti;
diff --git a/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc 
b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
new file mode 100644
index 00000000000..d1f1868d2c9
--- /dev/null
+++ b/mysql-test/suite/rpl/include/rpl_xa_concurrent_2pc.inc
@@ -0,0 +1,442 @@
+#
+# Helper file to run the 1-4(a,b) test cases for rpl_xa_concurrent_2pc,
+# with either XA COMMIT or XA ROLLBACK used to complete XA transactions.
+#
+# Parameters
+#   $xa_complete_sym (string) : COMMIT or ROLLBACK, the action used to complete
+#                               a prepared XA transaction
+#
+
+if (!$xa_complete_sym)
+{
+    die MTR variable xa_complete_sym not specified, must be either COMMIT or 
ROLLBACK;
+}
+
+--let $is_xac= 0
+--let $is_xar= 0
+
+if (`SELECT strcmp("COMMIT", "$xa_complete_sym") = 0`)
+{
+    --let $is_xac= 1
+}
+
+if (`SELECT strcmp("ROLLBACK", "$xa_complete_sym") = 0`)
+{
+    --let $is_xar= 1
+}
+
+if (`SELECT !$is_xar && !$is_xac`)
+{
+    die MTR variable xa_complete_sym invalid, must be either COMMIT or 
ROLLBACK;
+}
+
+
+--echo #
+--echo # Initialize test data
+--connection slave
+--source include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+
+if ($is_xac)
+{
+--connection slave
+RESET MASTER;
+
+--connection master
+RESET MASTER;
+}
+
+--connection master
+create table t1 (a int primary key, b int) engine=innodb;
+
+# Slave locks this row before updates to pause transaction progress
+--let $hold_row= -1
+--let $t1_ctr= 0
+--eval insert into t1 values ($hold_row, 0)
+
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+--source include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to 
failure of an earlier commit on which this one depends");
+
+
+--echo #
+--echo # Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+--echo # and XA $xa_complete_sym run concurrently. That is, the
+--echo # XA $xa_complete_sym will wait at group commit until the XA PREPARE
+--echo # binlogs, and then it will wait again until the XA PREPARE finishes
+--echo # preparing in all engines. At this point, the XA $xa_complete_sym will
+--echo # run to completion.
+--connection master
+# For worker thread to hold XAP at dequeue time via debug_sync through
+# `hold_worker_on_schedule`.
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave
+# For worker to stop at dequeue event time and after binlogging XA PREPARE
+set @@global.debug_dbug= 
"+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+--source include/start_slave.inc
+
+--echo # Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+
+--echo # Before the XA PREPARE executes, the XA $xa_complete_sym should wait 
in group commit..
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior transaction to 
commit"
+--source include/wait_condition.inc
+--echo # ..done
+
+
+--echo # Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+
+--echo # Wait for XA PREPARE to have binlogged, but hold it before it prepares 
in engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # The XA $xa_complete_sym should move on from binlog to wait for the XA 
PREPARE to complete in engines
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+--echo # ..done
+
+--echo # Signal the XAP to complete in engines (which will automatically 
signal XAC)
+set debug_sync= "now signal continue_xap";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 2: If two XA $xa_complete_sym transactions have different
+--echo # XIDs, ensure both phases of both transactions all execute 
concurrently.
+--echo #
+
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+
+# Stop both XAP after their binlogging and before their engine changing
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+
+--connection master
+XA START 'x1';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x1';
+XA PREPARE 'x1';
+--eval XA $xa_complete_sym 'x1'
+
+XA START 'x2';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x2';
+XA PREPARE 'x2';
+--eval XA $xa_complete_sym 'x2'
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+
+# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that
+# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has 
finished.
+# Prove the workers' status are like the following:
+--let $count_wait= 2
+--let $wait_condition=SELECT count(*) = $count_wait FROM 
information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE 
"Waiting for prior transaction to commit"
+--source include/wait_condition.inc
+
+--let $count_wait= 2
+--let $wait_condition=SELECT count(*) = $count_wait FROM 
information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE 
"debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+# wait for two XAP:s arrive at their stations (XAP_1 to next one after binlog 
is done)
+--let $wait_condition=SELECT count(*) >= 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+--echo # Ensuring both phases of both transactions all execute concurrently
+# Waiting for both XA "COMPLETE"s to binlog proves this, as they would not pass
+# group commit if their preceding XA PREPAREs had not also binlogged
+--let $count_xa_wait_workers= 2
+
+--let $wait_condition=SELECT count(*) = $count_xa_wait_workers FROM 
information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE 
"Waiting for prior xa transaction"
+--source include/wait_condition.inc
+--echo # ..done
+
+--echo # Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Signal the XAPs to complete in engines (which will automatically 
signal XACs)
+set debug_sync= "now signal continue_xap";
+
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal continue_xap";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 3: Two current 2-phase XA transactions with matching XIDs
+--echo # should run one after the other, while each transaction still allows
+--echo # its XA PREPARE and XA $xa_complete_sym to run concurrently
+
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+
+# Stop both XAP after their binlogging and before their engine changing
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+
+--connection master
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/start_slave.inc
+
+# This stage is necessary to avoid XAP_1 <-register-> XAC_2 race in that
+# XAC_2 may get stuck in the below WFPT2C state all time until XAP_1 has 
finished.
+--let $count_wait= 3
+--let $wait_condition=SELECT count(*) = $count_wait FROM 
information_schema.processlist WHERE command = 'Slave_worker' AND state LIKE 
"Waiting for prior transaction to commit"
+
+--source include/wait_condition.inc
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+
+--echo # Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Ensure first XA transaction is running concurrently
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa 
transaction" AND info LIKE "XA $xa_complete_sym%"
+--source include/wait_condition.inc
+
+--echo # Ensure second XA transaction's XAP waits for the first transaction
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa 
transaction" AND info LIKE "XA START%"
+--source include/wait_condition.inc
+
+--echo # Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+
+--echo # Wait for first XA $xa_complete_sym to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+
+--echo # Ensure second XA PREPARE doesn't begin yet because the XAC hadn't 
released its XID
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+
+--echo # Signal first XA $xa_complete_sym to complete
+set debug_sync= "now signal continue_xa_cor";
+
+--echo # Wait for second XA PREPARE to binlogged
+--echo # First pass through binlog_xap
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "debug sync point: now"
+--source include/wait_condition.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+
+--echo # Ensure second XA $xa_complete_sym is concurrent with XAP
+--let $wait_condition=SELECT count(*) = 1 FROM information_schema.processlist 
WHERE command = 'Slave_worker' AND state LIKE "Waiting for prior xa transaction"
+--source include/wait_condition.inc
+
+--echo # Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+
+--source include/sync_with_master_gtid.inc
+
+--let $diff_tables=master:test.t1, slave:test.t1
+--source include/diff_tables.inc
+
+--connection slave
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo #
+--echo # Test Case 4 (Error Case): If an XA PREPARE errors while its
+--echo # XA $xa_complete_sym is waiting on it, both phases should rollback
+--echo # successfully. Note this tests both:
+--echo #    a) XA $xa_complete_sym is waiting in group commit (first phase
+--echo #       times out in DMLs)
+--echo #    b) XA $xa_complete_sym is waiting in group commit, with another XAP
+--echo #       with a duplicate XID waiting on it.
+
+--echo # Case a)
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+
+--connection master
+XA START 'x';
+--eval update t1 set b=b+1 where a=$hold_row
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+--source include/save_master_gtid.inc
+
+--connection slave1
+BEGIN;
+--eval select * from t1 where a=$hold_row for update;
+
+--connection slave
+--source include/start_slave.inc
+
+--let $slave_sql_errno= 1205
+--source include/wait_for_slave_sql_error.inc
+
+--connection slave1
+ROLLBACK;
+
+--connection slave
+# Stop the IO thread too
+--source include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+
+--echo # Ensure on slave restart, we can re-execute the XA transaction
+--source include/start_slave.inc
+--source include/save_master_gtid.inc
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+
+
+--echo # Case b)
+--echo # Ensure slave is stopped
+--connection slave
+--source include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+
+--connection master
+XA START 'x';
+--eval update t1 set b=b+1 where a=$hold_row
+XA END 'x';
+XA PREPARE 'x';
+--eval XA $xa_complete_sym 'x'
+
+XA START 'x';
+--eval insert into t1 values ($t1_ctr, 0)
+--let $new_row_idx= $t1_ctr
+--inc $t1_ctr
+XA END 'x';
+XA PREPARE 'x';
+--source include/save_master_gtid.inc
+--eval XA $xa_complete_sym 'x'
+
+--connection slave1
+BEGIN;
+--eval select * from t1 where a=$hold_row for update;
+
+--connection slave
+--source include/start_slave.inc
+
+--let $slave_sql_errno= 1205
+--source include/wait_for_slave_sql_error.inc
+
+--connection slave1
+ROLLBACK;
+
+--echo # There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+
+--echo # Ensuring data from second XAP isn't visible..
+if (`select count(*) from t1 where a=$new_row_idx`)
+{
+    --die Failed, row exists
+}
+--echo # ..done
+
+--connection slave
+--source include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+
+--echo # Ensure on slave restart, we can re-execute the XA transaction
+--source include/start_slave.inc
+--source include/save_master_gtid.inc
+--source include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+--source include/start_slave.inc
+
+--echo # Ensuring data from second XAP is visible..
+if ($is_xac)
+{
+    --let $expected_row_count= 1
+}
+if ($is_xar)
+{
+    --let $expected_row_count= 0
+}
+if (`select count(*) != $expected_row_count from t1 where a=$new_row_idx`)
+{
+    --die Failed, XA $xa_complete_sym was not observed
+}
+--echo # ..done
+
+--echo #
+--echo # Cleanup
+--connection master
+DROP TABLE t1;
+--source include/save_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+--source include/show_binlog_events.inc
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+--let $filter_cid=1
+--source include/show_binlog_events2.inc
+
+--source include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+--source include/start_slave.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result 
b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
new file mode 100644
index 00000000000..d30943b23da
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_concurrent_2pc.result
@@ -0,0 +1,953 @@
+include/master-slave.inc
+[connection master]
+#
+# Initialize test data
+connection slave;
+include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+connection slave;
+RESET MASTER;
+connection master;
+RESET MASTER;
+connection master;
+create table t1 (a int primary key, b int) engine=innodb;
+insert into t1 values (-1, 0);
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to 
failure of an earlier commit on which this one depends");
+#
+# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+# and XA COMMIT run concurrently. That is, the
+# XA COMMIT will wait at group commit until the XA PREPARE
+# binlogs, and then it will wait again until the XA PREPARE finishes
+# preparing in all engines. At this point, the XA COMMIT will
+# run to completion.
+connection master;
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+insert into t1 values (0, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave;
+set @@global.debug_dbug= 
"+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+include/start_slave.inc
+# Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+# Before the XA PREPARE executes, the XA COMMIT should wait in group commit..
+# ..done
+# Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+# Wait for XA PREPARE to have binlogged, but hold it before it prepares in 
engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# The XA COMMIT should move on from binlog to wait for the XA PREPARE to 
complete in engines
+# ..done
+# Signal the XAP to complete in engines (which will automatically signal XAC)
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 2: If two XA COMMIT transactions have different
+# XIDs, ensure both phases of both transactions all execute concurrently.
+#
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+connection master;
+XA START 'x1';
+insert into t1 values (1, 0);
+XA END 'x1';
+XA PREPARE 'x1';
+XA COMMIT 'x1';
+XA START 'x2';
+insert into t1 values (2, 0);
+XA END 'x2';
+XA PREPARE 'x2';
+XA COMMIT 'x2';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now signal binlog_xap";
+# Ensuring both phases of both transactions all execute concurrently
+# ..done
+# Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Signal the XAPs to complete in engines (which will automatically signal XACs)
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 3: Two current 2-phase XA transactions with matching XIDs
+# should run one after the other, while each transaction still allows
+# its XA PREPARE and XA COMMIT to run concurrently
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+connection master;
+XA START 'x';
+insert into t1 values (3, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+XA START 'x';
+insert into t1 values (4, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+# Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure first XA transaction is running concurrently
+# Ensure second XA transaction's XAP waits for the first transaction
+# Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+# Wait for first XA COMMIT to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released 
its XID
+# Signal first XA COMMIT to complete
+set debug_sync= "now signal continue_xa_cor";
+# Wait for second XA PREPARE to binlogged
+# First pass through binlog_xap
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure second XA COMMIT is concurrent with XAP
+# Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 4 (Error Case): If an XA PREPARE errors while its
+# XA COMMIT is waiting on it, both phases should rollback
+# successfully. Note this tests both:
+#    a) XA COMMIT is waiting in group commit (first phase
+#       times out in DMLs)
+#    b) XA COMMIT is waiting in group commit, with another XAP
+#       with a duplicate XID waiting on it.
+# Case a)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+include/save_master_gtid.inc
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a      b
+-1     0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+# Case b)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA COMMIT 'x';
+XA START 'x';
+insert into t1 values (5, 0);
+XA END 'x';
+XA PREPARE 'x';
+include/save_master_gtid.inc
+XA COMMIT 'x';
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a      b
+-1     1
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+# There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+formatID       gtrid_length    bqual_length    data
+# Ensuring data from second XAP isn't visible..
+# ..done
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+include/start_slave.inc
+# Ensuring data from second XAP is visible..
+# ..done
+#
+# Cleanup
+connection master;
+DROP TABLE t1;
+include/save_master_gtid.inc
+include/show_binlog_events.inc
+Log_name       Pos     Event_type      Server_id       End_log_pos     Info
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+master-bin.000001      #       Gtid    #       #       BEGIN GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (-1, 0)
+master-bin.000001      #       Xid     #       #       COMMIT /* XID */
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (0, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7831',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (1, 0)
+master-bin.000001      #       Query   #       #       XA END X'7831',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7832',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (2, 0)
+master-bin.000001      #       Query   #       #       XA END X'7832',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (3, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (4, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (5, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; DROP TABLE 
`t1` /* generated by server */
+connection slave;
+include/sync_with_master_gtid.inc
+show binlog events in 'slave-bin.000001' from <binlog_start>;
+Log_name       Pos     Event_type      Server_id       End_log_pos     Info
+slave-bin.000001       #       Gtid_list       2       #       []
+slave-bin.000001       #       Binlog_checkpoint       2       #       
slave-bin.000001
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001       #       Gtid    1       #       BEGIN GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (-1, 0)
+slave-bin.000001       #       Xid     1       #       COMMIT /* XID */
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (0, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7831',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (1, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7831',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7832',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (2, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7832',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (3, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (4, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (5, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; DROP TABLE 
IF EXISTS `t1` /* generated by server */
+include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+include/start_slave.inc
+#
+# Initialize test data
+connection slave;
+include/stop_slave.inc
+RESET SLAVE;
+set @@global.gtid_slave_pos= "";
+connection master;
+create table t1 (a int primary key, b int) engine=innodb;
+insert into t1 values (-1, 0);
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+set @save_debug= @@GLOBAL.debug_dbug;
+set @save_par_thds= @@GLOBAL.slave_parallel_threads;
+set @save_par_mode= @@GLOBAL.slave_parallel_mode;
+set @@GLOBAL.slave_parallel_threads= 4;
+set @@GLOBAL.slave_parallel_mode= optimistic;
+set statement sql_log_bin=0 for call mtr.add_suppression("Commit failed due to 
failure of an earlier commit on which this one depends");
+#
+# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE
+# and XA ROLLBACK run concurrently. That is, the
+# XA ROLLBACK will wait at group commit until the XA PREPARE
+# binlogs, and then it will wait again until the XA PREPARE finishes
+# preparing in all engines. At this point, the XA ROLLBACK will
+# run to completion.
+connection master;
+set @@session.gtid_seq_no= 100;
+XA START 'x';
+insert into t1 values (0, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave;
+set @@global.debug_dbug= 
"+d,hold_worker_on_schedule,stop_after_binlog_prepare";
+include/start_slave.inc
+# Waiting for XAP to pause when it is pulled from the queue
+set debug_sync= "now wait_for reached_pause";
+# Before the XA PREPARE executes, the XA ROLLBACK should wait in group commit..
+# ..done
+# Execute the XA PREPARE
+set debug_sync= "now signal continue_worker";
+# Wait for XA PREPARE to have binlogged, but hold it before it prepares in 
engines
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# The XA ROLLBACK should move on from binlog to wait for the XA PREPARE to 
complete in engines
+# ..done
+# Signal the XAP to complete in engines (which will automatically signal XAC)
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 2: If two XA ROLLBACK transactions have different
+# XIDs, ensure both phases of both transactions all execute concurrently.
+#
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare";
+connection master;
+XA START 'x1';
+insert into t1 values (1, 0);
+XA END 'x1';
+XA PREPARE 'x1';
+XA ROLLBACK 'x1';
+XA START 'x2';
+insert into t1 values (2, 0);
+XA END 'x2';
+XA PREPARE 'x2';
+XA ROLLBACK 'x2';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now signal binlog_xap";
+# Ensuring both phases of both transactions all execute concurrently
+# ..done
+# Verify XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Signal the XAPs to complete in engines (which will automatically signal XACs)
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now signal continue_xap";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 3: Two current 2-phase XA transactions with matching XIDs
+# should run one after the other, while each transaction still allows
+# its XA PREPARE and XA ROLLBACK to run concurrently
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @@global.debug_dbug= 
"+d,stop_before_binlog_prepare,stop_after_binlog_prepare,stop_after_binlog_cor_by_xid";
+connection master;
+XA START 'x';
+insert into t1 values (3, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+XA START 'x';
+insert into t1 values (4, 0);
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave;
+include/start_slave.inc
+set debug_sync= "now signal binlog_xap";
+# Verify first XA PREPARE has binlogged
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure first XA transaction is running concurrently
+# Ensure second XA transaction's XAP waits for the first transaction
+# Signal first XA PREPARE to complete
+set debug_sync= "now signal continue_xap";
+# Wait for first XA ROLLBACK to binlog
+set debug_sync= "now wait_for xa_cor_binlogged";
+# Ensure second XA PREPARE doesn't begin yet because the XAC hadn't released 
its XID
+# Signal first XA ROLLBACK to complete
+set debug_sync= "now signal continue_xa_cor";
+# Wait for second XA PREPARE to binlogged
+# First pass through binlog_xap
+set debug_sync= "now signal binlog_xap";
+set debug_sync= "now wait_for xa_prepare_binlogged";
+# Ensure second XA ROLLBACK is concurrent with XAP
+# Signal second XA transaction to complete
+set debug_sync= "now signal continue_xap";
+set debug_sync= "now wait_for xa_cor_binlogged";
+set debug_sync= "now signal continue_xa_cor";
+include/sync_with_master_gtid.inc
+include/diff_tables.inc [master:test.t1, slave:test.t1]
+connection slave;
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+#
+# Test Case 4 (Error Case): If an XA PREPARE errors while its
+# XA ROLLBACK is waiting on it, both phases should rollback
+# successfully. Note this tests both:
+#    a) XA ROLLBACK is waiting in group commit (first phase
+#       times out in DMLs)
+#    b) XA ROLLBACK is waiting in group commit, with another XAP
+#       with a duplicate XID waiting on it.
+# Case a)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+include/save_master_gtid.inc
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a      b
+-1     0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+# Case b)
+# Ensure slave is stopped
+connection slave;
+include/wait_for_slave_to_stop.inc
+set @save_lock_wait_timeout= @@GLOBAL.innodb_lock_wait_timeout;
+set @save_trans_retries= @@GLOBAL.slave_transaction_retries;
+set @@global.innodb_lock_wait_timeout= 1;
+set @@global.slave_transaction_retries= 0;
+connection master;
+XA START 'x';
+update t1 set b=b+1 where a=-1;
+XA END 'x';
+XA PREPARE 'x';
+XA ROLLBACK 'x';
+XA START 'x';
+insert into t1 values (5, 0);
+XA END 'x';
+XA PREPARE 'x';
+include/save_master_gtid.inc
+XA ROLLBACK 'x';
+connection slave1;
+BEGIN;
+select * from t1 where a=-1 for update;;
+a      b
+-1     0
+connection slave;
+include/start_slave.inc
+include/wait_for_slave_sql_error.inc [errno=1205]
+connection slave1;
+ROLLBACK;
+# There should not be any prepared rows seen by XA RECOVER
+XA RECOVER;
+formatID       gtrid_length    bqual_length    data
+# Ensuring data from second XAP isn't visible..
+# ..done
+connection slave;
+include/stop_slave_io.inc
+set @@global.innodb_lock_wait_timeout= @save_lock_wait_timeout;
+set @@global.slave_transaction_retries= @save_trans_retries;
+# Ensure on slave restart, we can re-execute the XA transaction
+include/start_slave.inc
+include/save_master_gtid.inc
+include/stop_slave.inc
+set @@global.debug_dbug= @save_debug;
+include/start_slave.inc
+# Ensuring data from second XAP is visible..
+# ..done
+#
+# Cleanup
+connection master;
+DROP TABLE t1;
+include/save_master_gtid.inc
+include/show_binlog_events.inc
+Log_name       Pos     Event_type      Server_id       End_log_pos     Info
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+master-bin.000001      #       Gtid    #       #       BEGIN GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (-1, 0)
+master-bin.000001      #       Xid     #       #       COMMIT /* XID */
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (0, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7831',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (1, 0)
+master-bin.000001      #       Query   #       #       XA END X'7831',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7832',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (2, 0)
+master-bin.000001      #       Query   #       #       XA END X'7832',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (3, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (4, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (5, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA COMMIT X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; DROP TABLE 
`t1` /* generated by server */
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+master-bin.000001      #       Gtid    #       #       BEGIN GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (-1, 0)
+master-bin.000001      #       Xid     #       #       COMMIT /* XID */
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (0, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7831',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (1, 0)
+master-bin.000001      #       Query   #       #       XA END X'7831',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK 
X'7831',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'7832',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (2, 0)
+master-bin.000001      #       Query   #       #       XA END X'7832',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK 
X'7832',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (3, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (4, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; update t1 
set b=b+1 where a=-1
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       XA START X'78',X'',1 
GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; insert into 
t1 values (5, 0)
+master-bin.000001      #       Query   #       #       XA END X'78',X'',1
+master-bin.000001      #       XA_prepare      #       #       XA PREPARE 
X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       XA ROLLBACK X'78',X'',1
+master-bin.000001      #       Gtid    #       #       GTID #-#-#
+master-bin.000001      #       Query   #       #       use `test`; DROP TABLE 
`t1` /* generated by server */
+connection slave;
+include/sync_with_master_gtid.inc
+show binlog events in 'slave-bin.000001' from <binlog_start>;
+Log_name       Pos     Event_type      Server_id       End_log_pos     Info
+slave-bin.000001       #       Gtid_list       2       #       []
+slave-bin.000001       #       Binlog_checkpoint       2       #       
slave-bin.000001
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001       #       Gtid    1       #       BEGIN GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (-1, 0)
+slave-bin.000001       #       Xid     1       #       COMMIT /* XID */
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (0, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7831',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (1, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7831',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7832',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (2, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7832',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (3, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (4, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (5, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; DROP TABLE 
IF EXISTS `t1` /* generated by server */
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001       #       Gtid    1       #       BEGIN GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (-1, 0)
+slave-bin.000001       #       Xid     1       #       COMMIT /* XID */
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (0, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7831',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (1, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7831',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7832',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (2, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7832',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (3, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (4, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (5, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA COMMIT X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; DROP TABLE 
IF EXISTS `t1` /* generated by server */
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; create 
table t1 (a int primary key, b int) engine=innodb
+slave-bin.000001       #       Gtid    1       #       BEGIN GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (-1, 0)
+slave-bin.000001       #       Xid     1       #       COMMIT /* XID */
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (0, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7831',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (1, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7831',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK 
X'7831',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'7832',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (2, 0)
+slave-bin.000001       #       Query   1       #       XA END X'7832',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK 
X'7832',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (3, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (4, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; update t1 
set b=b+1 where a=-1
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       XA START X'78',X'',1 
GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; insert into 
t1 values (5, 0)
+slave-bin.000001       #       Query   1       #       XA END X'78',X'',1
+slave-bin.000001       #       XA_prepare      1       #       XA PREPARE 
X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       XA ROLLBACK X'78',X'',1
+slave-bin.000001       #       Gtid    1       #       GTID #-#-#
+slave-bin.000001       #       Query   1       #       use `test`; DROP TABLE 
IF EXISTS `t1` /* generated by server */
+include/stop_slave.inc
+set @@GLOBAL.slave_parallel_threads= @save_par_thds;
+set @@GLOBAL.slave_parallel_mode= @save_par_mode;
+include/start_slave.inc
+#
+# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+# replica should report ER_XAER_NOTA.
+connection master;
+create table t1 (a int) engine=innodb;
+include/save_master_gtid.inc
+connection slave;
+include/sync_with_master_gtid.inc
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+include/stop_slave.inc
+change master to master_use_gtid = slave_pos;
+connection master;
+xa start '1';
+insert into t1 set a=1;
+xa end '1';
+xa prepare '1';
+xa rollback '1';
+insert into t1 set a=2;
+include/save_master_gtid.inc
+connection slave;
+set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+into @gtid_skip
+FROM mysql.gtid_slave_pos
+WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
+set @@global.gtid_slave_pos = @gtid_skip;
+start slave;
+include/wait_for_slave_sql_error.inc [errno=1397]
+select count(*) = 2 % 2 as 'must be true' from t1;;
+must be true
+1
+include/stop_slave.inc
+set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+show warnings;
+Level  Code    Message
+Warning        1947    Specified GTID <value> conflicts with the binary log 
which contains a more recent GTID <value>. If MASTER_GTID_POS=CURRENT_POS is 
used, the binlog position will override the new value of @@gtid_slave_pos
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+change master to master_use_gtid = slave_pos;
+connection master;
+xa start '1';
+insert into t1 set a=1;
+xa end '1';
+xa prepare '1';
+xa commit '1';
+insert into t1 set a=2;
+include/save_master_gtid.inc
+connection slave;
+set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+into @gtid_skip
+FROM mysql.gtid_slave_pos
+WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
+set @@global.gtid_slave_pos = @gtid_skip;
+start slave;
+include/wait_for_slave_sql_error.inc [errno=1397]
+select count(*) = 1 % 2 as 'must be true' from t1;;
+must be true
+1
+include/stop_slave.inc
+set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+show warnings;
+Level  Code    Message
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection master;
+drop table t1;
+connection slave;
+include/rpl_end.inc
+# End of rpl_xa_concurrent_2pc.test
diff --git a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result 
b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
index f3ea53c219a..92a820d6753 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_empty_transaction.result
@@ -1165,5 +1165,56 @@ connection server_1;
 set @@binlog_format = @sav_binlog_format;
 set @@global.binlog_format = @sav_binlog_format;
 connection server_1;
+create table t_not_in_binlog (a int) engine=innodb;
+flush logs;
+include/save_master_gtid.inc
+connect con1,localhost,root,,;
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con1;
+connection server_1;
+xa recover;
+formatID       gtrid_length    bqual_length    data
+1      1       0       a
+XA ROLLBACK 'a';
+drop table t_not_in_binlog;
+include/save_master_gtid.inc
+connection server_2;
+XAER_NOTA: Unknown XID
+include/wait_for_slave_sql_error.inc [errno=1397]
+connect  con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con2;
+connection server_2;
+xa recover;
+formatID       gtrid_length    bqual_length    data
+1      1       0       a
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection server_3;
+XAER_NOTA: Unknown XID
+include/wait_for_slave_sql_error.inc [errno=1397]
+connect  con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,;
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+disconnect con3;
+connection server_3;
+xa recover;
+formatID       gtrid_length    bqual_length    data
+1      1       0       a
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+connection server_1;
 include/rpl_end.inc
 # End of rpl_xa_empty_transaction.test
diff --git a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result 
b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
index dd0d132471e..a3f5414b9da 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_prepare_gtid_fail.result
@@ -42,7 +42,7 @@ connection master;
 drop table t1;
 connection slave;
 # TODO: Remove after fixing MDEV-21777
-set @@global.gtid_slave_pos= "0-1-100";
+set @@global.gtid_slave_pos= "0-1-101";
 set @@global.slave_parallel_threads= @save_par_thds;
 set @@global.gtid_strict_mode= @save_strict_mode;
 set @@global.innodb_lock_wait_timeout= @save_innodb_lock_wait_timeout;
diff --git a/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test 
b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test
new file mode 100644
index 00000000000..d762114ff26
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_concurrent_2pc.test
@@ -0,0 +1,111 @@
+#
+#   This test ensures that two-phase XA transactions have their first and
+# second phases parallelized for both XA COMMIT and XA ROLLBACK. It ensures the
+# following behaviors:
+#
+#   Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE and
+# XA COMMIT/ROLLBACK run concurrently. That is, the XA COMMIT/ROLLBACK will
+# wait at group commit until the XA PREPARE binlogs, and then it will wait
+# again until the XA PREPARE finishes preparing in all engines. At this point,
+# the XA COMMIT/ROLLBACK will run to completion.
+#
+#   Test Case 2: If two XA transactions have different XIDs, if XA COMMIT ends
+# a transaction, ensure both phases of both transactions can all execute
+# concurrently.
+#
+#   Test Case 3: Two current 2-phase XA transactions with matching XIDs should
+# run one after the other, while each transaction still allows both phases of
+# its own transaction to run concurrently.
+#
+#   Test Case 4: Error Case. If an XAP errors while its XAC/R is waiting on it,
+# both the XAP and XAC/R should rollback successfully. Note this tests both:
+#    a) XAC/R is waiting in group commit (first phase times out in DMLs)
+#    b) XAC/R is waiting in group commit, with another XAP with a duplicate XID
+#       waiting on it.
+#
+#   Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+# replica should report ER_XAER_NOTA.
+#
+#
+# References:
+#   MDEV-31949: slow parallel replication of user xa
+#
+--source include/have_debug.inc
+--source include/have_innodb.inc
+--source include/have_binlog_format_mixed.inc
+--source include/master-slave.inc
+
+--let $xa_complete_sym= COMMIT
+--source include/rpl_xa_concurrent_2pc.inc
+
+--let $xa_complete_sym= ROLLBACK
+--source include/rpl_xa_concurrent_2pc.inc
+
+
+--echo #
+--echo # Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
+--echo # setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
+--echo # replica should report ER_XAER_NOTA.
+
+--connection master
+create table t1 (a int) engine=innodb;
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+
+--let $i=2
+while ($i)
+{
+       --source include/stop_slave.inc
+       --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
+       change master to master_use_gtid = slave_pos;
+
+  --connection master
+       --let $complete=rollback
+       if ($i == 1)
+       {
+               --let $complete=commit
+       }
+       xa start '1'; insert into t1 set a=1; xa end '1'; xa prepare '1';
+       --eval xa $complete '1'
+       insert into t1 set a=2;
+       --source include/save_master_gtid.inc
+
+       --connection slave
+
+       # reposition the slave to skip one transaction from master
+       set @save_gtid_slave_pos= @@global.gtid_slave_pos;
+       SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
+                                into @gtid_skip
+                FROM mysql.gtid_slave_pos
+                                WHERE seq_no = (SELECT DISTINCT max(seq_no) 
FROM mysql.gtid_slave_pos) limit 1;
+       set @@global.gtid_slave_pos = @gtid_skip;
+
+       start slave;
+       let $slave_sql_errno= 1397; # ER_XAER_NOTA
+       source include/wait_for_slave_sql_error.inc;
+       --eval select count(*) = $i % 2 as 'must be true' from t1;
+       --source include/stop_slave.inc
+
+       --disable_warnings
+       set @@global.gtid_slave_pos = @save_gtid_slave_pos;
+       --enable_warnings
+       --replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
+       show warnings;
+       --source include/start_slave.inc
+       --source include/sync_with_master_gtid.inc
+
+       --dec $i
+}
+
+# MDEV-31949 cleanup
+--connection master
+drop table t1;
+
+--sync_slave_with_master
+
+--source include/rpl_end.inc
+--echo # End of rpl_xa_concurrent_2pc.test
diff --git a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test 
b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
index 61cc0621d5a..f43af653ace 100644
--- a/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
+++ b/mysql-test/suite/rpl/t/rpl_xa_empty_transaction.test
@@ -167,6 +167,95 @@ set @@global.binlog_format = row;
 set @@binlog_format = @sav_binlog_format;
 set @@global.binlog_format = @sav_binlog_format;
 
+
+# MDEV-32257 dangling XA-rollback in binlog from emtpy XA
+# create a case of XA ROLLBACK gets to binlog while its XAP was not and
+# try replicate it.
+# Expected result is both slaves error out.
+--connection server_1
+--let $binlog_start = query_get_value(SHOW MASTER STATUS, Position, 1)
+--let $binlog_file =  query_get_value(SHOW MASTER STATUS, File, 1)
+create table t_not_in_binlog (a int) engine=innodb;
+flush logs;
+--source include/save_master_gtid.inc
+--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
+
+# External connection XID access after disconnect is subject to race.
+# "(" open parenthesis to remember # of connection before ...
+--source include/count_sessions.inc
+
+--connect(con1,localhost,root,,)
+call mtr.add_suppression("XAER_NOTA: Unknown XID");
+
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con1
+
+--connection server_1
+# .. ")" close parenthesis, to wait until con1 fully releases access to xid.
+--source include/wait_until_count_sessions.inc
+xa recover;
+#
+# replicate orphan XAR to server 2,3 and expect the error first
+# after that compensate it.
+
+--error 0
+XA ROLLBACK 'a';
+# cleanup at once
+drop table t_not_in_binlog;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--echo  XAER_NOTA: Unknown XID
+--let $slave_sql_errno= 1397
+--source include/wait_for_slave_sql_error.inc
+
+# "("
+--source include/count_sessions.inc
+
+--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con2
+
+--connection server_2
+# ")"
+--source include/wait_until_count_sessions.inc
+
+xa recover;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
+--connection server_3
+--echo  XAER_NOTA: Unknown XID
+--let $slave_sql_errno= 1397
+--source include/wait_for_slave_sql_error.inc
+
+# "("
+--source include/count_sessions.inc
+
+--connect (con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,)
+SET sql_log_bin=0;
+XA START 'a';
+insert into t_not_in_binlog set a=1;
+XA END 'a';
+XA PREPARE 'a';
+--disconnect con3
+
+--connection server_3
+# ")"
+--source include/wait_until_count_sessions.inc
+
+xa recover;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
 #
 # Cleanup
 --connection server_1
diff --git a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test 
b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
index aa1b088ed23..72589953ac0 100644
--- a/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
+++ b/mysql-test/suite/rpl/t/rpl_xa_prepare_gtid_fail.test
@@ -56,8 +56,8 @@ xa start '1';
 update t1 set b=b+10 where a=1;
 xa end '1';
 xa prepare '1';
---let $new_gtid= `SELECT @@global.gtid_binlog_pos`
 xa commit '1';
+--let $new_gtid= `SELECT @@global.gtid_binlog_pos`
 --source include/save_master_gtid.inc
 
 --connection slave1
diff --git a/sql/handler.cc b/sql/handler.cc
index 1ea1818749c..7fa78e4d9b2 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -2180,13 +2180,13 @@ int ha_rollback_trans(THD *thd, bool all)
       rollback without signalling following transactions. And in release
       builds, we explicitly do the signalling before rolling back.
     */
-    DBUG_ASSERT(
-        !(thd->rgi_slave &&
-          !thd->rgi_slave->worker_error &&
-          thd->rgi_slave->did_mark_start_commit) ||
-        (thd->transaction->xid_state.is_explicit_XA() ||
-         (thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA)));
-
+    DBUG_ASSERT(!(thd->rgi_slave &&
+                  !thd->rgi_slave->worker_error &&
+                  thd->rgi_slave->did_mark_start_commit) ||
+                (thd->transaction->xid_state.is_explicit_XA() ||
+                 (thd->rgi_slave->gtid_ev_flags2 &
+                  (Gtid_log_event::FL_PREPARED_XA |
+                   Gtid_log_event::FL_COMPLETED_XA))));
     if (thd->rgi_slave &&
         !thd->rgi_slave->worker_error &&
         thd->rgi_slave->did_mark_start_commit)
@@ -2343,6 +2343,15 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
   else
     binlog_rollback_by_xid(binlog_hton, xid);
 
+#ifdef ENABLED_DEBUG_SYNC
+    DBUG_EXECUTE_IF(
+        "stop_after_binlog_cor_by_xid",
+        DBUG_ASSERT(!debug_sync_set_action(
+            current_thd,
+            STRING_WITH_LEN(
+                "now SIGNAL xa_cor_binlogged WAIT_FOR continue_xa_cor"))););
+#endif
+
   plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
                  MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
 
diff --git a/sql/log.cc b/sql/log.cc
index eef8d86e4da..78a9419fb15 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -101,6 +101,8 @@ static int binlog_flush_cache(THD *thd, binlog_cache_mngr 
*cache_mngr,
                               Log_event *end_ev, bool all, bool using_stmt,
                               bool using_trx, bool is_ro_1pc);
 
+XID_cache_element *xid_cache_search_maybe_wait(THD *thd);
+
 static const LEX_CSTRING write_error_msg=
     { STRING_WITH_LEN("error writing to the binary log") };
 
@@ -1751,7 +1753,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr 
*cache_mngr,
 
   if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
       (using_trx && !cache_mngr->trx_cache.empty())   ||
-      thd->transaction->xid_state.is_explicit_XA())
+      (thd->transaction->xid_state.is_explicit_XA() ||
+       (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
   {
     if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE))
       DBUG_RETURN(1);
@@ -1858,11 +1861,19 @@ binlog_commit_flush_trx_cache(THD *thd, bool all, 
binlog_cache_mngr *cache_mngr,
   if (thd->lex->sql_command == SQLCOM_XA_COMMIT &&
       thd->lex->xa_opt != XA_ONE_PHASE)
   {
-    DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA());
+    bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac);
+    DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA() || is_async_xac);
     DBUG_ASSERT(thd->transaction->xid_state.get_state_code() ==
-                XA_PREPARED);
-
-    buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
+                XA_PREPARED || is_async_xac);
+    DBUG_ASSERT(is_async_xac ||
+                thd->lex->xid->eq(thd->transaction->xid_state.get_xid()));
+    /*
+      While xid_state.get_xid() is a robust method to access `xid`
+      it can't be used on slave by the asynchronously running XA-"complete".
+      In the latter case thd->lex->xid is safely accessible.
+    */
+    buflen= serialize_with_xid(is_async_xac? thd->lex->xid :
+                               thd->transaction->xid_state.get_xid(),
                                buf, query, q_len);
   }
   Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0);
@@ -1888,13 +1899,19 @@ binlog_rollback_flush_trx_cache(THD *thd, bool all,
   const size_t q_len= sizeof(query) - 1; // do not count trailing 0
   char buf[q_len + ser_buf_size]= "ROLLBACK";
   size_t buflen= sizeof("ROLLBACK") - 1;
+  bool is_async_xac= false;
 
-  if (thd->transaction->xid_state.is_explicit_XA())
+  if (thd->transaction->xid_state.is_explicit_XA() ||
+      (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
   {
     /* for not prepared use plain ROLLBACK */
-    if (thd->transaction->xid_state.get_state_code() == XA_PREPARED)
-      buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(),
+    if (thd->transaction->xid_state.get_state_code() == XA_PREPARED ||
+        is_async_xac)
+    {
+      buflen= serialize_with_xid(is_async_xac? thd->lex->xid :
+                                 thd->transaction->xid_state.get_xid(),
                                  buf, query, q_len);
+    }
   }
   Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0);
 
@@ -1985,11 +2002,71 @@ inline bool is_preparing_xa(THD *thd)
 
 static int binlog_prepare(handlerton *hton, THD *thd, bool all)
 {
+  int rc;
+
   /* Do nothing unless the transaction is a user XA. */
-  return is_preparing_xa(thd) ? binlog_commit(thd, all, FALSE) : 0;
+  if (is_preparing_xa(thd))
+  {
+    DBUG_EXECUTE_IF(
+        "stop_before_binlog_prepare",
+        DBUG_ASSERT(!debug_sync_set_action(
+            thd, STRING_WITH_LEN("now WAIT_FOR binlog_xap"))););
+
+    rc= binlog_commit(thd, all, FALSE);
+
+#ifdef ENABLED_DEBUG_SYNC
+    DBUG_EXECUTE_IF(
+        "stop_after_binlog_prepare",
+        DBUG_ASSERT(!debug_sync_set_action(
+            thd,
+            STRING_WITH_LEN(
+                "now SIGNAL xa_prepare_binlogged WAIT_FOR continue_xap"))););
+#endif
+  }
+  else
+  {
+    rc= 0;
+  }
+
+  return rc;
 }
 
 
+/**
+  @c acquire_xid takes control by slave worker's THD over a xid record the 
system
+  xid cache. Implicitly provided @c xid corresponds to a being asynchronously
+  handled XA-"complete".
+
+  @param  thd   the thread handler
+  @return false as success, true otherwise
+*/
+static bool acquire_xid(THD *thd)
+{
+  bool rc= false;
+
+  if (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+      thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_COMPLETED_XA)
+  {
+    XID_STATE &xid_state= thd->transaction->xid_state;
+
+    auto xs= xid_cache_search_maybe_wait(thd);
+    xid_state.xid_cache_element= xs;
+    if (!xs)
+    {
+      DBUG_ASSERT(thd->is_killed());
+
+      rpl_gtid *gtid= &thd->rgi_slave->current_gtid;
+      my_error(ER_XAER_RMERR, MYF(0));
+      sql_print_error("XA COMMIT of GTID %u-%u-%ll could not complete "
+                      "after having been logged into binary log",
+                      gtid->domain_id, gtid->server_id, gtid->seq_no);
+      rc= true;
+    }
+  }
+
+  return rc;
+}
+
 int binlog_commit_by_xid(handlerton *hton, XID *xid)
 {
   int rc= 0;
@@ -1997,28 +2074,38 @@ int binlog_commit_by_xid(handlerton *hton, XID *xid)
 
   if (thd->is_current_stmt_binlog_disabled())
   {
-    return thd->wait_for_prior_commit();
+    rc= thd->wait_for_prior_commit();
   }
+  else
+  {
+    /* the asserted state can't be reachable with xa commit */
+    DBUG_ASSERT(!thd->get_stmt_da()->is_error() ||
+                thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK);
+    /*
+      This is a recovered user xa transaction commit.
+      Create a "temporary" binlog transaction to write the commit record
+      into binlog.
+    */
+    THD_TRANS trans;
+    trans.ha_list= NULL;
 
-  /* the asserted state can't be reachable with xa commit */
-  DBUG_ASSERT(!thd->get_stmt_da()->is_error() ||
-              thd->get_stmt_da()->sql_errno() != ER_XA_RBROLLBACK);
-  /*
-    This is a recovered user xa transaction commit.
-    Create a "temporary" binlog transaction to write the commit record
-    into binlog.
-  */
-  THD_TRANS trans;
-  trans.ha_list= NULL;
-
-  thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
-  thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
-  (void) thd->binlog_setup_trx_data();
+    thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
+    thd->ha_data[binlog_hton->slot].ha_info[1].set_trx_read_write();
+    (void) thd->binlog_setup_trx_data();
 
-  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
+    DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
 
-  rc= binlog_commit(thd, TRUE, FALSE);
-  thd->ha_data[binlog_hton->slot].ha_info[1].reset();
+    rc= binlog_commit(thd, TRUE, FALSE);
+    thd->ha_data[binlog_hton->slot].ha_info[1].reset();
+  }
+  if (!rc)
+  {
+    rc= acquire_xid(thd);
+  }
+  if (thd->is_current_stmt_binlog_disabled())
+  {
+    thd->wakeup_subsequent_commits(rc);
+  }
 
   return rc;
 }
@@ -2031,33 +2118,54 @@ int binlog_rollback_by_xid(handlerton *hton, XID *xid)
 
   if (thd->is_current_stmt_binlog_disabled())
   {
-    return thd->wait_for_prior_commit();
+    rc= thd->wait_for_prior_commit();
   }
+  else if (thd->get_stmt_da()->is_error() &&
+           thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK)
+    rc= true;
+  else
+  {
+    THD_TRANS trans;
+    trans.ha_list= NULL;
 
-  if (thd->get_stmt_da()->is_error() &&
-      thd->get_stmt_da()->sql_errno() == ER_XA_RBROLLBACK)
-    return rc;
-
-  THD_TRANS trans;
-  trans.ha_list= NULL;
-
-  thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
-  thd->ha_data[hton->slot].ha_info[1].set_trx_read_write();
-  (void) thd->binlog_setup_trx_data();
+    thd->ha_data[hton->slot].ha_info[1].register_ha(&trans, hton);
+    thd->ha_data[hton->slot].ha_info[1].set_trx_read_write();
+    (void) thd->binlog_setup_trx_data();
 
-  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
-              (thd->transaction->xid_state.get_state_code() == 
XA_ROLLBACK_ONLY));
+    DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+                (thd->transaction->xid_state.get_state_code() == 
XA_ROLLBACK_ONLY));
 
-  rc= binlog_rollback(hton, thd, TRUE);
-  thd->ha_data[hton->slot].ha_info[1].reset();
+    rc= binlog_rollback(hton, thd, TRUE);
+    thd->ha_data[hton->slot].ha_info[1].reset();
+  }
+  if (!rc)
+  {
+    rc= acquire_xid(thd);
+  }
+  if (thd->is_current_stmt_binlog_disabled())
+  {
+    thd->wakeup_subsequent_commits(rc);
+  }
 
   return rc;
 }
 
-
+/**
+  @param  thd    thread handler
+  @return true   when thd carries an XA transaction in prepared state,
+                 or the XA transaction is being completed by
+                 asynchronously running "COMPLETE" by slave parallel thread;
+          false  otherwise
+*/
 inline bool is_prepared_xa(THD *thd)
 {
-  return thd->transaction->xid_state.is_explicit_XA() &&
+  bool is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac);
+  DBUG_ASSERT(!is_async_xac ||
+              thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+              thd->lex->sql_command == SQLCOM_XA_COMMIT);
+
+  return is_async_xac ? true :
+    thd->transaction->xid_state.is_explicit_XA() &&
     thd->transaction->xid_state.get_state_code() == XA_PREPARED;
 }
 
@@ -2185,7 +2293,9 @@ int binlog_commit(THD *thd, bool all, bool ro_1pc)
   }
 
   if (cache_mngr->trx_cache.empty() &&
-      (thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
+      ((thd->transaction->xid_state.get_state_code() != XA_PREPARED &&
+       !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+         thd->lex->sql_command == SQLCOM_XA_COMMIT)) ||
        !(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
          thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
   {
@@ -2279,7 +2389,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, 
bool all)
   }
 
   if (!cache_mngr->trx_cache.has_incident() && cache_mngr->trx_cache.empty() &&
-      (thd->transaction->xid_state.get_state_code() != XA_PREPARED ||
+      ((thd->transaction->xid_state.get_state_code() != XA_PREPARED &&
+        !(thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+          thd->lex->sql_command == SQLCOM_XA_ROLLBACK)) ||
        !(thd->ha_data[binlog_hton->slot].ha_info[1].is_started() &&
          thd->ha_data[binlog_hton->slot].ha_info[1].is_trx_read_write())))
   {
@@ -8382,7 +8494,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry 
*leader)
       */
       DBUG_ASSERT(!cache_mngr->stmt_cache.empty() ||
                   !cache_mngr->trx_cache.empty()  ||
-                  current->thd->transaction->xid_state.is_explicit_XA());
+                  (current->thd->transaction->xid_state.is_explicit_XA() ||
+                   (current->thd->rgi_slave &&
+                    current->thd->rgi_slave->is_async_xac)));
 
       if (unlikely((current->error= write_transaction_or_stmt(current,
                                                               commit_id))))
@@ -10510,13 +10624,20 @@ int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool 
all)
 
   binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
   int cookie= 0;
+  int rc= 0;
+
+  if (thd->rgi_slave && thd->is_current_stmt_binlog_disabled())
+  {
+    rc= thd->wait_for_prior_commit();
+    if (rc == 0)
+      thd->wakeup_subsequent_commits(rc);
+    return rc;
+  }
 
   if (!cache_mngr->need_unlog)
   {
     Ha_trx_info *ha_info;
     uint rw_count= ha_count_rw_all(thd, &ha_info);
-    bool rc= false;
-
     /*
       This transaction has not been binlogged as indicated by need_unlog.
       Such exceptional cases include transactions with no effect to engines,
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 92ff401a260..230a7a4667f 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 
seq_no_arg,
   XID_STATE &xid_state= thd->transaction->xid_state;
   if (is_transactional)
   {
-    if (xid_state.is_explicit_XA() &&
-        (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
-         xid_state.get_state_code() == XA_PREPARED))
+    bool is_async_xac= false;
+    if ((xid_state.is_explicit_XA() &&
+         (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
+          xid_state.get_state_code() == XA_PREPARED)) ||
+        (is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
     {
       DBUG_ASSERT(!(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
                     thd->lex->xa_opt == XA_ONE_PHASE));
+      DBUG_ASSERT(!is_async_xac ||
+                  thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
+                  thd->lex->sql_command == SQLCOM_XA_COMMIT);
 
       flags2|= thd->lex->sql_command == SQLCOM_XA_PREPARE ?
         FL_PREPARED_XA : FL_COMPLETED_XA;
-      xid.set(xid_state.get_xid());
+      xid.set(is_async_xac? thd->lex->xid :
+              thd->transaction->xid_state.get_xid());
     }
     /* count non-zero extra recoverable engines; total = extra + 1 */
     if (has_xid)
@@ -4172,9 +4178,6 @@ int XA_prepare_log_event::do_commit()
   thd->lex->xid= &xid;
   if (!one_phase)
   {
-    if ((res= thd->wait_for_prior_commit()))
-      return res;
-
     thd->lex->sql_command= SQLCOM_XA_PREPARE;
     res= trans_xa_prepare(thd);
   }
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c0dd56ab3d5..0f76dcc2351 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -1055,7 +1055,7 @@ PSI_cond_key key_BINLOG_COND_xid_list,
   key_BINLOG_COND_queue_busy;
 PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
   key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
-  key_COND_wait_commit;
+  key_COND_wait_commit, key_COND_wait_commit_dep;
 PSI_cond_key key_RELAYLOG_COND_queue_busy;
 PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
@@ -1083,6 +1083,7 @@ static PSI_cond_info all_server_conds[]=
   { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
   { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
   { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
+  { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit_dep", 0},
   { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 
0},
   { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
   { &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL},
@@ -9224,6 +9225,7 @@ PSI_stage_info stage_waiting_for_deadlock_kill= { 0, 
"Waiting for parallel repli
 PSI_stage_info stage_starting= { 0, "starting", 0};
 PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to 
be flushed", 0};
 PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0};
+PSI_stage_info stage_waiting_for_prior_xa_transaction= { 0, "Waiting for prior 
xa transaction", 0};
 
 PSI_memory_key key_memory_DATE_TIME_FORMAT;
 PSI_memory_key key_memory_DDL_LOG_MEMORY_ENTRY;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index fc8afa06638..4245a1a5e10 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -376,7 +376,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, 
key_BINLOG_update_cond,
   key_COND_start_thread;
 extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
   key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
-  key_COND_wait_commit;
+  key_COND_wait_commit, key_COND_wait_commit_dep;
 extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
 extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
@@ -679,7 +679,7 @@ extern PSI_stage_info 
stage_slave_background_process_request;
 extern PSI_stage_info stage_slave_background_wait_request;
 extern PSI_stage_info stage_waiting_for_deadlock_kill;
 extern PSI_stage_info stage_starting;
-
+extern PSI_stage_info stage_waiting_for_prior_xa_transaction;
 #ifdef HAVE_PSI_STATEMENT_INTERFACE
 /**
   Statement instrumentation keys (sql).
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index c26263401b8..2653e294590 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -9,6 +9,8 @@
 #ifdef WITH_WSREP
 #include "wsrep_trans_observer.h"
 #endif
+#include <algorithm>
+using std::max;
 
 /*
   Code for optional parallel execution of replicated events on the slave.
@@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
     return;
   err_code= thd->get_stmt_da()->sql_errno();
   if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
-       err_code != ER_PRIOR_COMMIT_FAILED) ||
+       (err_code != ER_PRIOR_COMMIT_FAILED &&
+        err_code != ER_XAER_NOTA)) ||
       ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) 
&&
        rgi->killed_for_retry))
   {
@@ -2364,16 +2367,9 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, 
bool *did_enter_cond,
   idx= rpl_thread_idx;
   if (gtid_ev)
   {
-    if (gtid_ev->flags2 &
-        (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
-      idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
-                        gtid_ev->xid.key_length()) % rpl_thread_max;
-    else
-    {
-      ++idx;
-      if (idx >= rpl_thread_max)
-        idx= 0;
-    }
+    ++idx;
+    if (idx >= rpl_thread_max)
+      idx= 0;
     rpl_thread_idx= idx;
   }
   thr= rpl_threads[idx];
@@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element)
   }
   mysql_cond_destroy(&e->COND_parallel_entry);
   mysql_mutex_destroy(&e->LOCK_parallel_entry);
+  e->concurrent_xaps_window.~Dynamic_array();
   my_free(e);
 }
 
@@ -2521,6 +2518,19 @@ rpl_parallel::find(uint32 domain_id)
     e->domain_id= domain_id;
     e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
     e->pause_sub_id= (uint64)ULONGLONG_MAX;
+
+    e->concurrent_xaps_window.init((PSI_memory_key) PSI_INSTRUMENT_ME,
+                                   max((decltype(e->rpl_thread_max)) 2,
+                                        2*e->rpl_thread_max));
+    e->cxap_lhs= e->cxap_rhs= 0;
+
+    /*
+      0 initialize each element
+    */
+    for (size_t i= 0; i < e->concurrent_xaps_window.max_size(); i++)
+    {
+      e->concurrent_xaps_window.at(i)= {0, 0};
+    }
     mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
                      MY_MUTEX_INIT_FAST);
     mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
@@ -2798,6 +2808,90 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread 
*cur_thread,
   mysql_cond_signal(&cur_thread->COND_rpl_thread);
 }
 
+/**
+  Check the concurrency status of @c xid with ones in progress.
+  Any new @c xid of XA-prepare (@c is_xap is true then) is appended to
+  a sliding window designed as circular buffer. Through search in the window
+  the return result is computed.
+
+  @param  e      parallel entry pointer
+  @param  xid    a pointer to the xid of either XA-prepare of XA-"complete"
+  @param  is_xap
+                 true when xid belongs to XA-prepare
+  @return true   when there exists a duplicate xid hash value,
+          false  otherwise.
+*/
+static bool
+handle_xa_prepera_duplicate_xid(rpl_parallel_entry *e, XID *xid, bool is_xap)
+{
+  DBUG_ASSERT(e->current_group_info ||
+              (e->count_queued_event_groups == 0 &&
+               e->cxap_lhs == e->cxap_rhs && e->cxap_lhs == 0));
+  DBUG_ASSERT(xid);
+  DBUG_ASSERT(!xid->is_null());
+  DBUG_ASSERT(xid->key());
+  DBUG_ASSERT(xid->key_length());
+
+  uint64 curr_event_count= e->count_queued_event_groups;
+  uint32 i;
+  bool rc= false;
+  /*
+    We've seen XAP's before, so move the LHS up to a relevant spot.
+    LHS = RHS indicates the empty buffer (which implies RHS is exclusive "edge"
+    of the window.
+    Otherwise RHS always points to a free cell of which one at least must
+    exist at this point.
+    While transaction disribution is Round-robin, potential conflicts with
+    the current input xid can come only from
+    the preceeding 2*|W| - 1 xids, the 2*|W|th in the past is safe.
+  */
+  for (i= e->cxap_lhs; i != e->cxap_rhs;
+       i= (i+1) % (e->concurrent_xaps_window.max_size()))
+  {
+    uint64 old_event_count= e->concurrent_xaps_window.at(i).second;
+    uint64 queued_event_diff= curr_event_count - old_event_count;
+    if (queued_event_diff >= e->rpl_thread_max)
+    {
+      /*
+        Squeeze the window from the left
+        as this XAP can't run in parallel with us.
+      */
+      e->cxap_lhs= (i+1) % (e->concurrent_xaps_window.max_size());
+    }
+    else
+    {
+      // new LHS is determined
+      DBUG_ASSERT(e->cxap_lhs != e->cxap_rhs);
+      break;
+    }
+  }
+
+  std::size_t xid_hash=  std::hash<XID>{}(*xid);
+  for (; i != e->cxap_rhs; i= (i+1) % (e->concurrent_xaps_window.max_size()))
+  {
+    std::size_t old_xid_hash= e->concurrent_xaps_window.at(i).first;
+    if (old_xid_hash == xid_hash)
+    {
+      rc= true;
+      break;
+    }
+  }
+
+  // Add the XAP to the sliding window
+  if (is_xap)
+  {
+    e->concurrent_xaps_window.at(e->cxap_rhs).first= xid_hash;
+    e->concurrent_xaps_window.at(e->cxap_rhs).second= curr_event_count;
+    e->cxap_rhs= (e->cxap_rhs + 1) % (e->concurrent_xaps_window.max_size());
+    if (e->cxap_rhs == e->cxap_lhs)
+    {
+      // the entire array is full therefore the lhs has become stale
+      e->cxap_lhs= (e->cxap_lhs + 1) % (e->concurrent_xaps_window.max_size());
+    }
+  }
+
+  return rc;
+}
 
 /*
   do_event() is executed by the sql_driver_thd thread.
@@ -3046,6 +3140,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, 
Log_event *ev,
     new_gco= true;
     force_switch_flag= 0;
     gco= e->current_gco;
+    /*
+      Take care of duplicate xids in XA-prepare, XA-"complete" should not
+      race its XA-prepare parent either. When the current transaction's xid
+      was seen and its transaction may still be in process this event group
+      gets flagged to wait for prior commits at the start of execution.
+    */
+    if ((gtid_flags & (Gtid_log_event::FL_PREPARED_XA |
+                       Gtid_log_event::FL_COMPLETED_XA)) &&
+        handle_xa_prepera_duplicate_xid(e, &gtid_ev->xid,
+                                        gtid_flags &
+                                        Gtid_log_event::FL_PREPARED_XA))
+      gtid_flags &= ~Gtid_log_event::FL_ALLOW_PARALLEL;
     if (likely(gco))
     {
       uint8 flags= gco->flags;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 0b1d3cf9d80..b76d6749f6d 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -324,6 +324,14 @@ struct rpl_parallel_thread_pool {
   void release_thread(rpl_parallel_thread *rpt);
 };
 
+template <>
+struct std::hash<XID>
+{
+  std::size_t operator()(const XID& xid) const
+  {
+    return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length());
+  }
+};
 
 struct rpl_parallel_entry {
   mysql_mutex_t LOCK_parallel_entry;
@@ -419,6 +427,14 @@ struct rpl_parallel_entry {
   /* The group_commit_orderer object for the events currently being queued. */
   group_commit_orderer *current_gco;
 
+  /*
+    Circular buffer of size slave_parallel_threads to hold XIDs of XA-prepare
+    group of events which may be processed concurrently.
+    See how handle_xa_prepera_duplicate_xid operates on it.
+  */
+  Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window;
+  uint32 cxap_lhs, cxap_rhs;
+
   rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool 
*did_enter_cond,
                                       PSI_stage_info *old_stage,
                                       Gtid_log_event *gtid_ev);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 375fb1d1c58..fdeb0d67872 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2155,12 +2155,14 @@ rpl_group_info::reinit(Relay_log_info *rli)
   gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
   speculation= SPECULATE_NO;
   commit_orderer.reinit();
+  is_async_xac= false;
 }
 
 rpl_group_info::rpl_group_info(Relay_log_info *rli)
   : thd(0), wait_commit_sub_id(0),
     wait_commit_group_info(0), parallel_entry(0),
-    deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+    deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
+    is_async_xac(false)
 {
   reinit(rli);
   bzero(&current_gtid, sizeof(current_gtid));
@@ -2291,7 +2293,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
     if (thd->transaction->xid_state.is_explicit_XA() &&
         thd->transaction->xid_state.get_state_code() != XA_PREPARED)
       xa_trans_force_rollback(thd);
-
+    is_async_xac= false;
     thd->release_transactional_locks();
 
     if (thd == rli->sql_driver_thd)
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index cc807852bf2..06eef910de4 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -835,6 +835,14 @@ struct rpl_group_info
   };
   uchar killed_for_retry;
 
+  /*
+    When true indicates that the user xa transaction is going to
+    complete (with COMMIT or ROLLBACK) by the worker thread,
+    *while* another worker is still preparing it. Once the latter is done
+    the xid will be acquired and the flag gets reset.
+  */
+  bool is_async_xac;
+
   rpl_group_info(Relay_log_info *rli_);
   ~rpl_group_info();
   void reinit(Relay_log_info *rli);
diff --git a/sql/sql_array.h b/sql/sql_array.h
index 8610e971016..c79c0c257a0 100644
--- a/sql/sql_array.h
+++ b/sql/sql_array.h
@@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array
   */
   Elem& at(size_t idx)
   {
-    DBUG_ASSERT(idx < array.elements);
+    DBUG_ASSERT(idx < max_size());
     return *(((Elem*)array.buffer) + idx);
   }
   /// Const variant of at(), which cannot change data
@@ -172,6 +172,8 @@ template <class Elem> class Dynamic_array
 
   size_t size() const { return array.elements; }
 
+  size_t max_size() const { return array.max_element; }
+
   const Elem *end() const
   {
     return back() + 1;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1b78f88bd3c..04338831cd7 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -7960,6 +7960,7 @@ wait_for_commit::wait_for_commit()
 {
   mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, 
MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+  mysql_cond_init(key_COND_wait_commit_dep, &COND_wait_commit_dep, 0);
   reinit();
 }
 
@@ -7989,6 +7990,7 @@ wait_for_commit::~wait_for_commit()
 
   mysql_mutex_destroy(&LOCK_wait_commit);
   mysql_cond_destroy(&COND_wait_commit);
+  mysql_cond_destroy(&COND_wait_commit_dep);
 }
 
 
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e6580d2432c..72ca39c7710 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2367,6 +2367,15 @@ struct wait_for_commit
     event group is fully done.
   */
   bool wakeup_blocked;
+  /*
+    The condition variable servers as a part of facilities to handle various
+    commit time additional dependency between groups of replication events, e.g
+    XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid.
+  */
+  mysql_cond_t COND_wait_commit_dep;
+#ifndef DBUG_OFF
+  bool debug_done;
+#endif
 
   void register_wait_for_prior_commit(wait_for_commit *waitee);
   int wait_for_prior_commit(THD *thd, bool allow_kill=true)
diff --git a/sql/xa.cc b/sql/xa.cc
index 9df9da7acf1..3d1a7a2360a 100644
--- a/sql/xa.cc
+++ b/sql/xa.cc
@@ -22,7 +22,7 @@
 #include "my_cpu.h"
 #include <pfs_transaction_provider.h>
 #include <mysql/psi/mysql_transaction.h>
-
+#include "rpl_rli.h"  // rpl_group_info
 static bool slave_applier_reset_xa_trans(THD *thd);
 
 /***************************************************************************
@@ -79,6 +79,10 @@ class XID_cache_element
   uint rm_error;
   enum xa_states xa_state;
   XID xid;
+  /* parallel slave worker waiters. `c` stands for complete, `p` prepare */
+  std::atomic<wait_for_commit *> c_waiter; // set by asynch run xa-"complete"
+  std::atomic<wait_for_commit *> p_waiter; // set by duplicate xid xa-start
+
   bool is_set(int32_t flag)
   { return m_state.load(std::memory_order_relaxed) & flag; }
   void set(int32_t flag)
@@ -134,6 +138,7 @@ class XID_cache_element
     element->rm_error= 0;
     element->xa_state= new_element->xa_state;
     element->xid.set(new_element->xid);
+    element->c_waiter= element->p_waiter= NULL;
     new_element->xid_cache_element= element;
   }
   static void lf_alloc_constructor(uchar *ptr)
@@ -243,7 +248,7 @@ void xid_cache_free()
   Find recovered XA transaction by XID.
 */
 
-static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
+XID_cache_element *xid_cache_search(THD *thd, XID *xid)
 {
   DBUG_ASSERT(thd->xid_hash_pins);
   XID_cache_element *element=
@@ -254,16 +259,221 @@ static XID_cache_element *xid_cache_search(THD *thd, XID 
*xid)
     /* The element can be removed from lf_hash by other thread, but
     element->acquire_recovered() will return false in this case. */
     if (!element->acquire_recovered())
+    {
       element= 0;
+      if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+      {
+        DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT ||
+                    thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
+        thd->rgi_slave->is_async_xac= true;
+      }
+    }
     lf_hash_search_unpin(thd->xid_hash_pins);
     /* Once the element is acquired (i.e. got the ACQUIRED bit) by this thread,
     only this thread can delete it. The deletion happens in xid_cache_delete().
     See also the XID_cache_element documentation. */
     DEBUG_SYNC(thd, "xa_after_search");
   }
+  else if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+  {
+    DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT ||
+                thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
+  }
+
   return element;
 }
 
+const int SPIN_MAX= 20;
+/**
+  The function tries inserting a xid into the system hash until succeeds.
+  Re-trying can be caused solely by existence of an earlier transaction with
+  a duplicate xid.
+  Analogously to @c xid_cache_search_maybe_wait, it is expecting, here the 
duplicate,
+  xid in the way will be eventually deleted from the hash.
+
+  @param  thd    thread handler
+  @return false  as success,
+          true   otherwise.
+*/
+bool xid_cache_insert_maybe_wait(THD* thd)
+{
+  int i= 0;
+  bool rc;
+
+  do
+  {
+    if ((rc= xid_cache_insert(thd, &thd->transaction->xid_state, 
thd->lex->xid)))
+      ut_delay(1 + i++);
+  }
+  while (rc &&  i < SPIN_MAX);
+
+  if (rc)
+  {
+    wait_for_commit *waiter= NULL;
+    XID *xid= thd->lex->xid;
+    XID_cache_element *element=
+      (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+                                          xid->key(), xid->key_length());
+    if (element)
+    {
+      PSI_stage_info old_stage;
+      wait_for_commit *exp= NULL, *waiter= thd->wait_for_commit_ptr;
+#ifndef DBUG_OFF
+      waiter->debug_done= false;
+#endif
+
+      while (unlikely(!element->
+                      p_waiter.compare_exchange_weak(exp, waiter,
+                                                     
std::memory_order_acq_rel)))
+      {
+        if (exp)
+        {
+          DBUG_ASSERT(exp != waiter);
+          waiter= NULL; // notifier is seen
+
+          break;
+        }
+        else
+        {
+          (void) LF_BACKOFF();
+        }
+      }
+      lf_hash_search_unpin(thd->xid_hash_pins);
+
+      if (waiter) // notifier was not seen
+      {
+        mysql_mutex_lock(&waiter->LOCK_wait_commit);
+        thd->ENTER_COND(&waiter->COND_wait_commit_dep, 
&waiter->LOCK_wait_commit,
+                        &stage_waiting_for_prior_xa_transaction,
+                        &old_stage);
+        if ((element=
+             (XID_cache_element*) lf_hash_search(&xid_cache, 
thd->xid_hash_pins,
+                                                 xid->key(), 
xid->key_length())))
+        {
+          lf_hash_search_unpin(thd->xid_hash_pins);
+          mysql_cond_wait(&waiter->COND_wait_commit_dep,
+                          &waiter->LOCK_wait_commit);
+
+          DBUG_ASSERT(waiter->debug_done || thd->check_killed(1));
+        }
+        thd->EXIT_COND(&old_stage);
+      }
+    }
+    if (!(rc= thd->check_killed(1)))
+    {
+      // (element && waiter = NULL) indicates the duplicate xid is coming
+      do
+        rc= xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid);
+      while (rc && element && !waiter && (ut_delay(1), true));
+    }
+  }
+
+  return rc;
+}
+
+/**
+  XA-"complete" run by parallel slave gets access to its xid.
+  Analogously to @c xid_cache_insert_maybe_wait, it is expecting, here its, xid
+  supplied through the THD argument, will be soon (the parent XAP has already
+  waken up transactions before the current one) released for acquisition.
+
+  @param  thd               thread handler
+  @return XID_cache_element pointer or NULL when the search is interruped
+                                      by kill.
+*/
+XID_cache_element * xid_cache_search_maybe_wait(THD* thd)
+{
+  if (thd->fix_xid_hash_pins())
+  {
+    my_error(ER_OUT_OF_RESOURCES, MYF(0));
+    return NULL;
+  }
+
+  XID_cache_element *xs;
+  XID *xid= thd->lex->xid;
+  int i= 0;
+  do
+  {
+    if (!(xs= xid_cache_search(thd, thd->lex->xid)))
+      ut_delay(1 + i++);
+  }
+  while (!xs && i < SPIN_MAX);
+
+  if (!xs)
+  {
+    XID_cache_element *element=
+      (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+                                          xid->key(), xid->key_length());
+    if (element)
+    {
+      lf_hash_search_unpin(thd->xid_hash_pins);
+      if (!element->acquire_recovered())
+      {
+        wait_for_commit  *exp= NULL, *waiter= thd->wait_for_commit_ptr;
+        bool waiter_done= true;  // assumption
+
+
+        /*
+          Set itself to wait for xid owner while taking care of race with it on
+          marking the xid element. When the element is found to be marked not
+          by us that indicates xid has been released.
+        */
+        while (unlikely(!element->
+                        c_waiter.
+                        compare_exchange_weak(exp, waiter,
+                                              std::memory_order_acq_rel)))
+        {
+          if (exp)
+          {
+            DBUG_ASSERT(exp != waiter);
+            waiter= NULL; // notifier is seen
+
+            break;
+          }
+          else
+          {
+            (void) LF_BACKOFF();
+          }
+        }
+
+        if (waiter) // notifier was not seen
+        {
+          PSI_stage_info old_stage;
+          mysql_mutex_lock(&waiter->LOCK_wait_commit);
+          thd->ENTER_COND(&waiter->COND_wait_commit_dep, 
&waiter->LOCK_wait_commit,
+                          &stage_waiting_for_prior_xa_transaction,
+                          &old_stage);
+          if (element->c_waiter.load(std::memory_order_relaxed) &&
+              likely(!thd->check_killed(1)))
+            mysql_cond_wait(&waiter->COND_wait_commit_dep,
+                            &waiter->LOCK_wait_commit);
+
+          if (element->c_waiter.load(std::memory_order_relaxed))
+          {
+            waiter_done= false;
+            DBUG_ASSERT(thd->check_killed(1));
+          }
+          thd->EXIT_COND(&old_stage);
+        }
+
+        if (waiter_done &&
+            likely(element->is_set(XID_cache_element::RECOVERED |
+                                   XID_cache_element::ACQUIRED)))
+          xs= element;
+        else
+          goto end;
+      }
+      else
+      {
+        xs= element;
+      }
+    }
+  }
+
+end:
+  return xs;
+}
+
 
 bool xid_cache_insert(XID *xid)
 {
@@ -302,7 +512,8 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID 
*xid)
     xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
     break;
   case 1:
-    my_error(ER_XAER_DUPID, MYF(0));
+    if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
+      my_error(ER_XAER_DUPID, MYF(0));
   }
   return res;
 }
@@ -311,9 +522,39 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID 
*xid)
 static void xid_cache_delete(THD *thd, XID_cache_element *&element)
 {
   DBUG_ASSERT(thd->xid_hash_pins);
+
   element->mark_uninitialized();
+  wait_for_commit *waiter= NULL;
+  if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+  {
+    wait_for_commit *notifier= &thd->rgi_slave->commit_orderer;
+    while (unlikely(!element->
+                    p_waiter.compare_exchange_weak(waiter, notifier,
+                                                   std::memory_order_acq_rel)))
+    {
+      if (waiter)
+      {
+        DBUG_ASSERT(notifier != waiter);
+
+        break;
+      }
+      else
+      {
+        (void) LF_BACKOFF();
+      }
+    }
+  }
   lf_hash_delete(&xid_cache, thd->xid_hash_pins,
                  element->xid.key(), element->xid.key_length());
+  if (waiter)
+  {
+    mysql_mutex_lock(&waiter->LOCK_wait_commit);
+#ifndef DBUG_OFF
+    waiter->debug_done= true;
+#endif
+    mysql_cond_signal(&waiter->COND_wait_commit_dep);
+    mysql_mutex_unlock(&waiter->LOCK_wait_commit);
+  }
 }
 
 
@@ -456,7 +697,23 @@ bool trans_xa_start(THD *thd)
   else if (!trans_begin(thd))
   {
     MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, thd->lex->xid, 
XA_ACTIVE);
-    if (xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
+
+    bool parallel_slave_xap_status= true; // `true` presumes ordinary XA START.
+    if (thd->rgi_slave &&
+        thd->rgi_slave->is_parallel_exec)
+    {
+      DBUG_ASSERT(thd->rgi_slave->gtid_ev_flags2 |
+                  Gtid_log_event::FL_PREPARED_XA);
+      /*
+        The status gets refined below normally to flip in which case `false`
+        designates the xid insert is done.
+        Possibly incurred wait is when xid is duplicate.
+      */
+      parallel_slave_xap_status= xid_cache_insert_maybe_wait(thd);
+    }
+
+    if (parallel_slave_xap_status &&
+        xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
     {
       trans_rollback(thd);
       DBUG_RETURN(true);
@@ -602,12 +859,19 @@ bool trans_xa_commit(THD *thd)
       my_error(ER_OUT_OF_RESOURCES, MYF(0));
       DBUG_RETURN(TRUE);
     }
+    DBUG_ASSERT(!thd->rgi_slave || !thd->rgi_slave->is_async_xac);
 
-    if (auto xs= xid_cache_search(thd, thd->lex->xid))
+    /*
+      Parallel slave may not succeed acquiring xid, in which case
+      @c is_async_xac is @c true, it will do that later.
+    */
+    XID_cache_element *xs;
+    if ((xs= xid_cache_search(thd, thd->lex->xid)) ||
+        (thd->rgi_slave && thd->rgi_slave->is_async_xac))
     {
       bool xid_deleted= false;
       MDL_request mdl_request;
-      bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK);
+      bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK);
 
       if (rw_trans && thd->is_read_only_ctx())
       {
@@ -615,8 +879,7 @@ bool trans_xa_commit(THD *thd)
         res= 1;
         goto _end_external_xid;
       }
-
-      res= xa_trans_rolled_back(xs);
+      res= xs ? xa_trans_rolled_back(xs) : 0;
       /*
         Acquire metadata lock which will ensure that COMMIT is blocked
         by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
@@ -645,7 +908,7 @@ bool trans_xa_commit(THD *thd)
       }
       DBUG_ASSERT(!xid_state.xid_cache_element);
 
-      xid_state.xid_cache_element= xs;
+      xid_state.xid_cache_element= xs; // may be NULL on parallel slave
       ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
       if (!res && thd->is_error())
       {
@@ -654,13 +917,16 @@ bool trans_xa_commit(THD *thd)
         res= true;
         goto _end_external_xid;
       }
-      xid_cache_delete(thd, xs);
+      DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+                         xid_state.xid_cache_element));
+
+      xid_cache_delete(thd, xid_state.xid_cache_element);
       xid_deleted= true;
 
   _end_external_xid:
       xid_state.xid_cache_element= 0;
       res= res || thd->is_error();
-      if (!xid_deleted)
+      if (!xid_deleted && xs)
         xs->acquired_to_recovered();
       if (mdl_request.ticket)
       {
@@ -790,12 +1056,14 @@ bool trans_xa_rollback(THD *thd)
       DBUG_RETURN(TRUE);
     }
 
-    if (auto xs= xid_cache_search(thd, thd->lex->xid))
+    XID_cache_element *xs;
+    if ((xs= xid_cache_search(thd, thd->lex->xid)) ||
+        (thd->rgi_slave && thd->rgi_slave->is_async_xac))
     {
       bool res;
       bool xid_deleted= false;
       MDL_request mdl_request;
-      bool rw_trans= (xs->rm_error != ER_XA_RBROLLBACK);
+      bool rw_trans= (xs && xs->rm_error != ER_XA_RBROLLBACK);
 
       if (rw_trans && thd->is_read_only_ctx())
       {
@@ -822,7 +1090,7 @@ bool trans_xa_rollback(THD *thd)
       {
         thd->backup_commit_lock= &mdl_request;
       }
-      res= xa_trans_rolled_back(xs);
+      res= xs ? xa_trans_rolled_back(xs) : 0;
       DBUG_ASSERT(!xid_state.xid_cache_element);
 
       xid_state.xid_cache_element= xs;
@@ -831,12 +1099,15 @@ bool trans_xa_rollback(THD *thd)
       {
         goto _end_external_xid;
       }
-      xid_cache_delete(thd, xs);
+      DBUG_ASSERT(xs || (thd->rgi_slave && thd->rgi_slave->is_async_xac &&
+                         xid_state.xid_cache_element));
+
+      xid_cache_delete(thd, xid_state.xid_cache_element);
       xid_deleted= true;
 
   _end_external_xid:
       xid_state.xid_cache_element= 0;
-      if (!xid_deleted)
+      if (!xid_deleted && xs)
         xs->acquired_to_recovered();
       if (mdl_request.ticket)
       {
@@ -1146,7 +1417,7 @@ static bool slave_applier_reset_xa_trans(THD *thd)
   {
     thd->transaction->xid_state.set_error(ER_XA_RBROLLBACK);
   }
-  thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
+  auto element= thd->transaction->xid_state.xid_cache_element;
   thd->transaction->xid_state.xid_cache_element= 0;
 
   for (Ha_trx_info *ha_info= thd->transaction->all.ha_list, *ha_info_next;
@@ -1158,6 +1429,34 @@ static bool slave_applier_reset_xa_trans(THD *thd)
   thd->transaction->all.ha_list= 0;
 
   ha_close_connection(thd);
+  element->acquired_to_recovered();
+  if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec)
+  {
+    /* make the xid available to a possible (xa-"complete") waiter */
+    wait_for_commit *xac_waiter= NULL,
+      *notifier= &thd->rgi_slave->commit_orderer;
+    while (unlikely(!element->
+                    c_waiter.compare_exchange_weak(xac_waiter, notifier,
+                                                   std::memory_order_acq_rel)))
+    {
+      if (xac_waiter)
+      {
+        break;
+      }
+      else
+      {
+        (void) LF_BACKOFF();
+      }
+    }
+    if (xac_waiter)
+    {
+      // unmark and signal
+      mysql_mutex_lock(&xac_waiter->LOCK_wait_commit);
+      element->c_waiter.store(NULL, std::memory_order_relaxed);
+      mysql_cond_signal(&xac_waiter->COND_wait_commit_dep);
+      mysql_mutex_unlock(&xac_waiter->LOCK_wait_commit);
+    }
+  }
   thd->transaction->cleanup();
   thd->transaction->all.reset();
 
diff --git a/storage/innobase/trx/trx0undo.cc b/storage/innobase/trx/trx0undo.cc
index daf27822085..53ce685fa35 100644
--- a/storage/innobase/trx/trx0undo.cc
+++ b/storage/innobase/trx/trx0undo.cc
@@ -639,8 +639,7 @@ static void trx_undo_write_xid(buf_block_t *block, uint16_t 
offset,
                                  static_cast<uint32_t>(xid.bqual_length));
   const ulint xid_length= static_cast<ulint>(xid.gtrid_length
                                              + xid.bqual_length);
-  mtr->memcpy(*block, &block->page.frame[offset + TRX_UNDO_XA_XID],
-              xid.data, xid_length);
+  mtr->memcpy<mtr_t::MAYBE_NOP>(*block, &block->page.frame[offset + 
TRX_UNDO_XA_XID], xid.data, xid_length);
   if (UNIV_LIKELY(xid_length < XIDDATASIZE))
     mtr->memset(block, offset + TRX_UNDO_XA_XID + xid_length,
                 XIDDATASIZE - xid_length, 0);
-- 
2.30.2

_______________________________________________
commits mailing list -- commits@lists.mariadb.org
To unsubscribe send an email to commits-le...@lists.mariadb.org

Reply via email to