From 24be9c26f50a02338ea08afb9978364919a6336d Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Tue, 6 Sep 2022 09:11:25 +0000
Subject: [PATCH] (PoC) Fix assertion failure during logical decoding.

---
 .../expected/catalog_change_snapshot.out      | 45 +++++++++++++++++++
 .../specs/catalog_change_snapshot.spec        |  2 +
 .../replication/logical/reorderbuffer.c       | 41 +++++++++++++++++
 3 files changed, 88 insertions(+)

diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index dc4f9b7018..246cda8099 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -42,3 +42,48 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_truncate: TRUNCATE tbl1;
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+table public.tbl1: TRUNCATE: (no-flags)                      
+COMMIT                                                       
+(4 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 2971ddc69c..fed823c836 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -37,3 +37,5 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
 # record written by bgwriter.  One might think we can either stop the bgwriter or
 # increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
 permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 3194e41851..cbe9cc74bb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -220,6 +220,8 @@ static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 											   XLogRecPtr lsn, bool create_as_top);
 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 											  ReorderBufferTXN *subtxn);
+static void ReorderBufferTransferInvalToParent(ReorderBufferTXN *txn,
+											  ReorderBufferTXN *subtxn);
 
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
 
@@ -1114,6 +1116,9 @@ ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 			subtxn->base_snapshot = NULL;
 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
 			dlist_delete(&subtxn->base_snapshot_node);
+
+			/* Invalidaitons must be also transported */
+			ReorderBufferTransferInvalToParent(txn, subtxn);
 		}
 		else
 		{
@@ -1126,6 +1131,42 @@ ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 	}
 }
 
+/*
+ * ReorderBufferTransferInvalToParent
+ *		Transfer invalidation messages from subtxn to top-level txn
+ *
+ * XXX: Do we have to transport ReorderBufferChange at the same time?
+ */
+static void
+ReorderBufferTransferInvalToParent(ReorderBufferTXN *txn,
+								   ReorderBufferTXN *subtxn)
+{
+	Assert(subtxn->toplevel_xid == txn->xid);
+
+	if (subtxn->ninvalidations > 0)
+	{
+		if (txn->ninvalidations == 0)
+		{
+			txn->invalidations = (SharedInvalidationMessage *)
+				palloc(sizeof(SharedInvalidationMessage) * subtxn->ninvalidations);
+			memcpy(txn->invalidations, subtxn->invalidations,
+				sizeof(SharedInvalidationMessage) * subtxn->ninvalidations);
+			txn->ninvalidations = subtxn->ninvalidations;
+		}
+		else
+		{
+			txn->invalidations = (SharedInvalidationMessage *)
+				repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+						(txn->ninvalidations + subtxn->ninvalidations));
+			memcpy(txn->invalidations + txn->ninvalidations, subtxn->invalidations,
+				subtxn->ninvalidations * sizeof(SharedInvalidationMessage));
+			txn->ninvalidations += subtxn->ninvalidations;
+		}
+		subtxn->invalidations = NULL;
+		subtxn->ninvalidations = 0;
+	}
+}
+
 /*
  * Associate a subtransaction with its toplevel transaction at commit
  * time. There may be no further changes added after this.
-- 
2.27.0

