At Tue, 12 Oct 2021 13:59:59 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> So I came up with the attached version.

Sorry, it was losing a piece of change.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 424f405b4c9d41471eae1edd48cdbb6a6d47217e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Tue, 12 Oct 2021 13:53:27 +0900
Subject: [PATCH v2] Allow overflowed snapshot when creating logical
 replication slot

Snapshot can hold top XIDs up to the number of server processes but
SnapBuildInitialSnapshot tries to store all top-level and sub XIDs to
there and fails for certain circumstances. Instead, create a
"takenDuringRecovery" snapshot instead, which stores all XIDs in
subxip array.  Addition to that, allow to create an overflowed
snapshot by adding to reorder buffer a function to tell whether an XID
is a top-level or not.
---
 .../replication/logical/reorderbuffer.c       | 20 +++++++
 src/backend/replication/logical/snapbuild.c   | 56 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  1 +
 3 files changed, 65 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 46e66608cf..4e452cce7c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3326,6 +3326,26 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
 }
 
 
+/*
+ * ReorderBufferXidIsKnownSubXact
+ *		Returns true if the xid is a known subtransaction.
+ */
+bool
+ReorderBufferXidIsKnownSubXact(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false,
+								NULL, InvalidXLogRecPtr, false);
+
+	/* a known subtxn? */
+	if (txn && rbtxn_is_known_subxact(txn))
+		return true;
+
+	return false;
+}
+
+
 /*
  * ---------------------------------------
  * Disk serialization support
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a5333349a8..46c691df20 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -531,8 +531,9 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 {
 	Snapshot	snap;
 	TransactionId xid;
-	TransactionId *newxip;
-	int			newxcnt = 0;
+	TransactionId *newsubxip;
+	int			newsubxcnt;
+	bool		overflowed = false;
 
 	Assert(!FirstSnapshotSet);
 	Assert(XactIsoLevel == XACT_REPEATABLE_READ);
@@ -568,9 +569,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 	MyProc->xmin = snap->xmin;
 
-	/* allocate in transaction context */
-	newxip = (TransactionId *)
-		palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
+	/*
+	 * Allocate in transaction context. We use only subxip to store xids. See
+	 * GetSnapshotData for details.
+	 */
+	newsubxip = (TransactionId *)
+		palloc(sizeof(TransactionId) * GetMaxSnapshotSubxidCount());
 
 	/*
 	 * snapbuild.c builds transactions in an "inverted" manner, which means it
@@ -578,6 +582,9 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	 * classical snapshot by marking all non-committed transactions as
 	 * in-progress. This can be expensive.
 	 */
+retry:
+	newsubxcnt = 0;
+
 	for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
 	{
 		void	   *test;
@@ -591,12 +598,29 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 		if (test == NULL)
 		{
-			if (newxcnt >= GetMaxSnapshotXidCount())
-				ereport(ERROR,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-						 errmsg("initial slot snapshot too large")));
+			bool add = true;
 
-			newxip[newxcnt++] = xid;
+			/* exlude subxids when subxip is overflowed */
+			if (overflowed &&
+				ReorderBufferXidIsKnownSubXact(builder->reorder, xid))
+				add = false;
+
+			if (add)
+			{
+				if (newsubxcnt >= GetMaxSnapshotSubxidCount())
+				{
+					if (overflowed)
+						ereport(ERROR,
+								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+								 errmsg("initial slot snapshot too large")));
+
+					/* retry excluding subxids */
+					overflowed = true;
+					goto retry;
+				}
+
+				newsubxip[newsubxcnt++] = xid;
+			}
 		}
 
 		TransactionIdAdvance(xid);
@@ -604,8 +628,16 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 	/* adjust remaining snapshot fields as needed */
 	snap->snapshot_type = SNAPSHOT_MVCC;
-	snap->xcnt = newxcnt;
-	snap->xip = newxip;
+	snap->xcnt = 0;
+	snap->subxcnt = newsubxcnt;
+	snap->subxip = newsubxip;
+	snap->suboverflowed = overflowed;
+
+	/*
+	 * Although this snapshot is taken actually not during recovery, all XIDs
+	 * are stored in subxip even if it is not overflowed.
+	 */
+	snap->takenDuringRecovery = true;
 
 	return snap;
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff75f7..e5fa1051d7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -669,6 +669,7 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferXidIsKnownSubXact(ReorderBuffer *rb, TransactionId xid);
 bool		ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 											 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
 											 TimestampTz prepare_time,
-- 
2.27.0

Reply via email to