On Wed, Jan 15, 2025 at 05:00:51PM -0800, Noah Misch wrote:
> I agree with accepting +4 bytes in GlobalTransactionData.

Let's just bite the bullet and do that on HEAD and v17, then,
integrating deeper FullTransactionIds into the internals of
twophase.c.

> I think "using the current epoch" is wrong for half of the nextFullXid values
> having epoch > 0.  For example, nextFullId==2^32 is in epoch 1, but all the
> allowable XIDs are in epoch 0.  (I mean "allowable" in the sense of
> AssertTransactionIdInAllowableRange().)  From then until we assign another
> 2^31 XIDs, epochs 0 and 1 are both expected in XID values.  After 2^31 XID
> assignments, every allowable XID be in epoch 1.  Hence, twophase.c would need
> two-epoch logic like we have in widen_snapshot_xid() and XLogRecGetFullXid().
> Is that right?  (I wrote this in a hurry, so this email may have more than the
> standard level of errors.)  Before commit 7e125b2, twophase also had that
> logic.  I didn't work out the user-visible consequences of that logic's new
> absence here, but I bet on twophase recovery breakage.  Similar problem here
> (up to two epochs are acceptable, not just one):
> 
> +     /* Discard files from past epochs */
> +     if (EpochFromFullTransactionId(fxid) < 
> EpochFromFullTransactionId(nextXid))

Oops, you're right.  Your suggestion to unify all that in a single
routine is an excellent idea.  Missed the bits in xid8funcs.c.

> I wrote the attached half-baked patch to fix those, but I tend to think it's
> better to use FullTransactionId as many places as possible in twophase.c.
> (We'd still need to convert XIDs that we read from xl_xact_prepare records,
> along the lines of XLogRecGetFullXid().)  How do you see it?

I'm all for integrating more FullTransactionIds now that these reflect
in the file names, and do a deeper cut.

As far as I understand, the most important point of the logic is to
detect and discard the future files first in restoreTwoPhaseData() ->
ProcessTwoPhaseBuffer() when scanning the contents of pg_twophase at
the beginning of recovery.  Once this filtering is done, it should be
safe to use your FullTransactionIdFromAllowableAt() when doing
the fxid <-> xid transitions between the records and the files on disk
flushed by a restartpoint which store an XID, and the shmem state of
GlobalTransactionData with a fxid.

With the additions attached, FullTransactionIdFromAllowableAt() gets
down from 8 to 6 calls in twophase.c.  The change related to
MarkAsPreparingGuts() seems optional, though.  I am trying to figure
out how to write a regression test to trigger this error, lacking a
bit of time today.  That's going to require more trickery with
pg_resetwal to make that cheap, or something like that..  Attached are
some suggestions, as of a 0002 that applies on top of your 0001.

XLogRecGetFullXid() is used nowhere.  This could be removed, perhaps,
or not?
--
Michael
From 0e1538fa72ede3566ea3dd03958540b80e9fd15c Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 16 Jan 2025 15:14:06 +0900
Subject: [PATCH v2 1/2] Fix twophase.c XID epoch tracking

Half the time after epoch 0, allowable XIDs span two epochs.  This would
have no user-visible consequences during epoch 0, but I expect
(unconfirmed) twophase breakage during other epochs.

FIXME likely rework this in favor of broader fulltransaction use in
twophase.c
---
 src/include/access/transam.h            | 29 ++++++++++
 src/backend/access/transam/twophase.c   | 75 ++++++-------------------
 src/backend/access/transam/xlogreader.c | 18 +-----
 src/backend/utils/adt/xid8funcs.c       | 43 ++++----------
 4 files changed, 58 insertions(+), 107 deletions(-)

diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 0cab8653f1..48fce16aeb 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -77,6 +77,35 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
 	return result;
 }
 
+/*
+ * Compute FullTransactionId for the given TransactionId, assuming xid was
+ * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was
+ * nextFullXid.
+ */
+static inline FullTransactionId
+FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid,
+								 TransactionId xid)
+{
+	uint32		epoch;
+
+	/* Special transaction ID. */
+	if (!TransactionIdIsNormal(xid))
+		return FullTransactionIdFromEpochAndXid(0, xid);
+
+	/*
+	 * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't been
+	 * issued yet when xid was in the past.  The xid must therefore be from
+	 * the epoch of nextFullXid or the epoch before.  We know this because we
+	 * must remove (by freezing) an XID before assigning the XID half an epoch
+	 * ahead of it.
+	 */
+	epoch = EpochFromFullTransactionId(nextFullXid);
+	if (xid > XidFromFullTransactionId(nextFullXid))
+		epoch--;
+
+	return FullTransactionIdFromEpochAndXid(epoch, xid);
+}
+
 static inline FullTransactionId
 FullTransactionIdFromU64(uint64 value)
 {
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index a3190dc4f1..7a162938f4 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -926,24 +926,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
 /* State file support													*/
 /************************************************************************/
 
-/*
- * Compute FullTransactionId for the given TransactionId, using the current
- * epoch.
- */
-static inline FullTransactionId
-FullTransactionIdFromCurrentEpoch(TransactionId xid)
-{
-	FullTransactionId fxid;
-	FullTransactionId nextFullXid;
-	uint32		epoch;
-
-	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
-
-	fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
-	return fxid;
-}
-
 static inline int
 TwoPhaseFilePath(char *path, FullTransactionId fxid)
 {
@@ -1283,7 +1265,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * contents of the file, issuing an error when finding corrupted data.  If
  * missing_ok is true, which indicates that missing files can be safely
  * ignored, then return NULL.  This state can be reached when doing recovery
- * after discarding two-phase files from other epochs.
+ * after discarding two-phase files from frozen epochs.
  */
 static char *
 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
@@ -1299,7 +1281,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
 	int			r;
 	FullTransactionId fxid;
 
-	fxid = FullTransactionIdFromCurrentEpoch(xid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
@@ -1664,15 +1646,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/* Count the prepared xact as committed or aborted */
 	AtEOXact_PgStat(isCommit, false);
 
-	/*
-	 * And now we can clean up any files we may have left.  These should be
-	 * from the current epoch.
-	 */
+	/* And now we can clean up any files we may have left. */
 	if (ondisk)
 	{
 		FullTransactionId fxid;
 
-		fxid = FullTransactionIdFromCurrentEpoch(xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												xid);
 		RemoveTwoPhaseFile(fxid, true);
 	}
 
@@ -1749,8 +1729,7 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 	COMP_CRC32C(statefile_crc, content, len);
 	FIN_CRC32C(statefile_crc);
 
-	/* Use current epoch */
-	fxid = FullTransactionIdFromCurrentEpoch(xid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path,
@@ -1900,7 +1879,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * This is called once at the beginning of recovery, saving any extra
  * lookups in the future.  Two-phase files that are newer than the
  * minimum XID horizon are discarded on the way.  Two-phase files with
- * an epoch older or newer than the current checkpoint's record epoch
+ * an epoch frozen relative to the current checkpoint's record epoch
  * are also discarded.
  */
 void
@@ -1971,7 +1950,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
 	TransactionId result = origNextXid;
 	TransactionId *xids = NULL;
-	uint32		epoch = EpochFromFullTransactionId(nextXid);
 	int			nxids = 0;
 	int			allocsize = 0;
 	int			i;
@@ -1988,11 +1966,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		xid = gxact->xid;
 
-		/*
-		 * All two-phase files with past and future epoch in pg_twophase are
-		 * gone at this point, so we're OK to rely on only the current epoch.
-		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
@@ -2055,12 +2029,9 @@ void
 StandbyRecoverPreparedTransactions(void)
 {
 	int			i;
-	uint32		epoch;
 	FullTransactionId nextFullXid;
 
-	/* get current epoch */
 	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2074,11 +2045,7 @@ StandbyRecoverPreparedTransactions(void)
 
 		xid = gxact->xid;
 
-		/*
-		 * At this stage, we're OK to work with the current epoch as all past
-		 * and future files have been already discarded.
-		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
@@ -2108,12 +2075,9 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
-	uint32		epoch;
 	FullTransactionId nextFullXid;
 
-	/* get current epoch */
 	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2127,10 +2091,6 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
-		/*
-		 * At this stage, we're OK to work with the current epoch as all past
-		 * and future files have been already discarded.
-		 */
 		xid = gxact->xid;
 
 		/*
@@ -2142,7 +2102,7 @@ RecoverPreparedTransactions(void)
 		 * SubTransSetParent has been set before, if the prepared transaction
 		 * generated xid assignment records.
 		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
@@ -2260,8 +2220,9 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 		return NULL;
 	}
 
-	/* Discard files from past epochs */
-	if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
+	/* Discard files from frozen epochs */
+	if (EpochFromFullTransactionId(fxid) + 1 <
+		EpochFromFullTransactionId(nextXid))
 	{
 		if (fromdisk)
 		{
@@ -2576,8 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 		char		path[MAXPGPATH];
 		FullTransactionId fxid;
 
-		/* Use current epoch */
-		fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												hdr->xid);
 		TwoPhaseFilePath(path, fxid);
 
 		if (access(path, F_OK) == 0)
@@ -2676,10 +2637,8 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	{
 		FullTransactionId fxid;
 
-		/*
-		 * We should deal with a file at the current epoch here.
-		 */
-		fxid = FullTransactionIdFromCurrentEpoch(xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												xid);
 		RemoveTwoPhaseFile(fxid, giveWarning);
 	}
 	RemoveGXact(gxact);
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..91b6a91767 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 FullTransactionId
 XLogRecGetFullXid(XLogReaderState *record)
 {
-	TransactionId xid,
-				next_xid;
-	uint32		epoch;
-
 	/*
 	 * This function is only safe during replay, because it depends on the
 	 * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
 	 */
 	Assert(AmStartupProcess() || !IsUnderPostmaster);
 
-	xid = XLogRecGetXid(record);
-	next_xid = XidFromFullTransactionId(TransamVariables->nextXid);
-	epoch = EpochFromFullTransactionId(TransamVariables->nextXid);
-
-	/*
-	 * If xid is numerically greater than next_xid, it has to be from the last
-	 * epoch.
-	 */
-	if (unlikely(xid > next_xid))
-		--epoch;
-
-	return FullTransactionIdFromEpochAndXid(epoch, xid);
+	return FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
+											XLogRecGetXid(record));
 }
 
 #endif
diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c
index 4736755b29..b17395617f 100644
--- a/src/backend/utils/adt/xid8funcs.c
+++ b/src/backend/utils/adt/xid8funcs.c
@@ -154,35 +154,6 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
 	return !FullTransactionIdPrecedes(fxid, oldest_fxid);
 }
 
-/*
- * Convert a TransactionId obtained from a snapshot held by the caller to a
- * FullTransactionId.  Use next_fxid as a reference FullTransactionId, so that
- * we can compute the high order bits.  It must have been obtained by the
- * caller with ReadNextFullTransactionId() after the snapshot was created.
- */
-static FullTransactionId
-widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
-{
-	TransactionId next_xid = XidFromFullTransactionId(next_fxid);
-	uint32		epoch = EpochFromFullTransactionId(next_fxid);
-
-	/* Special transaction ID. */
-	if (!TransactionIdIsNormal(xid))
-		return FullTransactionIdFromEpochAndXid(0, xid);
-
-	/*
-	 * The 64 bit result must be <= next_fxid, since next_fxid hadn't been
-	 * issued yet when the snapshot was created.  Every TransactionId in the
-	 * snapshot must therefore be from the same epoch as next_fxid, or the
-	 * epoch before.  We know this because next_fxid is never allow to get
-	 * more than one epoch ahead of the TransactionIds in any snapshot.
-	 */
-	if (xid > next_xid)
-		epoch--;
-
-	return FullTransactionIdFromEpochAndXid(epoch, xid);
-}
-
 /*
  * txid comparator for qsort/bsearch
  */
@@ -420,12 +391,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS)
 	nxip = cur->xcnt;
 	snap = palloc(PG_SNAPSHOT_SIZE(nxip));
 
-	/* fill */
-	snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
-	snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
+	/*
+	 * Fill.  This is the current backend's active snapshot, so MyProc->xmin
+	 * is <= all these XIDs.  As long as that remains so, oldestXid can't
+	 * advance past any of these XIDs.  Hence, these XIDs remain allowable
+	 * relative to next_fxid.
+	 */
+	snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin);
+	snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax);
 	snap->nxip = nxip;
 	for (i = 0; i < nxip; i++)
-		snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
+		snap->xip[i] =
+			FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]);
 
 	/*
 	 * We want them guaranteed to be in ascending order.  This also removes
-- 
2.47.1

From 55b6d87b3cf032a65efdce260f5422fdee6426f1 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 16 Jan 2025 16:45:33 +0900
Subject: [PATCH v2 2/2] Integrate deeper FullTransactionIds into twophase.c

---
 src/backend/access/transam/twophase.c | 115 ++++++++++++--------------
 1 file changed, 55 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 7a162938f4..75008fb949 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -159,7 +159,7 @@ typedef struct GlobalTransactionData
 	 */
 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
-	TransactionId xid;			/* The GXACT id */
+	FullTransactionId fxid;		/* The GXACT full xid */
 
 	Oid			owner;			/* ID of user that executed the xact */
 	ProcNumber	locking_backend;	/* backend currently working on the xact */
@@ -224,11 +224,11 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
 								   XLogRecPtr prepare_start_lsn,
 								   bool fromdisk, bool setParent, bool setNextXid);
-static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
+static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
 								const char *gid, TimestampTz prepared_at, Oid owner,
 								Oid databaseid);
 static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
-static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
 
 /*
  * Initialization of shared memory
@@ -360,6 +360,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 				TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
 	GlobalTransaction gxact;
+	FullTransactionId fxid;
 	int			i;
 
 	if (strlen(gid) >= GIDSIZE)
@@ -407,7 +408,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	gxact = TwoPhaseState->freeGXacts;
 	TwoPhaseState->freeGXacts = gxact->next;
 
-	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+	MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
 
 	gxact->ondisk = false;
 
@@ -430,11 +433,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
  * Note: This function should be called with appropriate locks held.
  */
 static void
-MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
-					TimestampTz prepared_at, Oid owner, Oid databaseid)
+MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
+					const char *gid, TimestampTz prepared_at, Oid owner,
+					Oid databaseid)
 {
 	PGPROC	   *proc;
 	int			i;
+	TransactionId xid = XidFromFullTransactionId(fxid);
 
 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
@@ -479,7 +484,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
 	proc->subxidStatus.count = 0;
 
 	gxact->prepared_at = prepared_at;
-	gxact->xid = xid;
+	gxact->fxid = fxid;
 	gxact->owner = owner;
 	gxact->locking_backend = MyProcNumber;
 	gxact->valid = false;
@@ -800,6 +805,7 @@ static GlobalTransaction
 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
 {
 	GlobalTransaction result = NULL;
+	FullTransactionId fxid;
 	int			i;
 
 	static TransactionId cached_xid = InvalidTransactionId;
@@ -817,11 +823,14 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
 	if (!lock_held)
 		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
-		if (gxact->xid == xid)
+		if (FullTransactionIdEquals(gxact->fxid, fxid))
 		{
 			result = gxact;
 			break;
@@ -881,7 +890,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
 				*have_more = true;
 				break;
 			}
-			result = gxact->xid;
+			result = XidFromFullTransactionId(gxact->fxid);
 		}
 	}
 
@@ -1032,7 +1041,7 @@ void
 StartPrepare(GlobalTransaction gxact)
 {
 	PGPROC	   *proc = GetPGProcByNumber(gxact->pgprocno);
-	TransactionId xid = gxact->xid;
+	TransactionId xid = XidFromFullTransactionId(gxact->fxid);
 	TwoPhaseFileHeader hdr;
 	TransactionId *children;
 	RelFileLocator *commitrels;
@@ -1268,7 +1277,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * after discarding two-phase files from frozen epochs.
  */
 static char *
-ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
+ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
 {
 	char		path[MAXPGPATH];
 	char	   *buf;
@@ -1279,9 +1288,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
 	pg_crc32c	calc_crc,
 				file_crc;
 	int			r;
-	FullTransactionId fxid;
 
-	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
@@ -1447,6 +1454,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
 	bool		result;
+	FullTransactionId fxid;
 
 	Assert(TransactionIdIsValid(xid));
 
@@ -1454,7 +1462,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 		return false;			/* nothing to do */
 
 	/* Read and validate file */
-	buf = ReadTwoPhaseFile(xid, true);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
+	buf = ReadTwoPhaseFile(fxid, true);
 	if (buf == NULL)
 		return false;
 
@@ -1474,6 +1483,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 {
 	GlobalTransaction gxact;
 	PGPROC	   *proc;
+	FullTransactionId fxid;
 	TransactionId xid;
 	bool		ondisk;
 	char	   *buf;
@@ -1495,7 +1505,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 */
 	gxact = LockGXact(gid, GetUserId());
 	proc = GetPGProcByNumber(gxact->pgprocno);
-	xid = gxact->xid;
+	fxid = gxact->fxid;
+	xid = XidFromFullTransactionId(fxid);
 
 	/*
 	 * Read and validate 2PC state data. State data will typically be stored
@@ -1503,7 +1514,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * to disk if for some reason they have lived for a long time.
 	 */
 	if (gxact->ondisk)
-		buf = ReadTwoPhaseFile(xid, false);
+		buf = ReadTwoPhaseFile(fxid, false);
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
@@ -1648,13 +1659,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 
 	/* And now we can clean up any files we may have left. */
 	if (ondisk)
-	{
-		FullTransactionId fxid;
-
-		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
-												xid);
 		RemoveTwoPhaseFile(fxid, true);
-	}
 
 	MyLockedGxact = NULL;
 
@@ -1717,19 +1722,17 @@ RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
  * Note: content and len don't include CRC.
  */
 static void
-RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
 {
 	char		path[MAXPGPATH];
 	pg_crc32c	statefile_crc;
 	int			fd;
-	FullTransactionId fxid;
 
 	/* Recompute CRC */
 	INIT_CRC32C(statefile_crc);
 	COMP_CRC32C(statefile_crc, content, len);
 	FIN_CRC32C(statefile_crc);
 
-	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path,
@@ -1842,7 +1845,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 			int			len;
 
 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
-			RecreateTwoPhaseFile(gxact->xid, buf, len);
+			RecreateTwoPhaseFile(gxact->fxid, buf, len);
 			gxact->ondisk = true;
 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
@@ -1964,9 +1967,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		Assert(gxact->inredo);
 
-		xid = gxact->xid;
-
-		fxid = FullTransactionIdFromAllowableAt(nextXid, xid);
+		fxid = gxact->fxid;
+		xid = XidFromFullTransactionId(fxid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
@@ -2029,24 +2031,16 @@ void
 StandbyRecoverPreparedTransactions(void)
 {
 	int			i;
-	FullTransactionId nextFullXid;
-
-	nextFullXid = ReadNextFullTransactionId();
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
-		TransactionId xid;
-		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
 		Assert(gxact->inredo);
 
-		xid = gxact->xid;
-
-		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
-		buf = ProcessTwoPhaseBuffer(fxid,
+		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf != NULL)
@@ -2075,15 +2069,11 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
-	FullTransactionId nextFullXid;
-
-	nextFullXid = ReadNextFullTransactionId();
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		TransactionId xid;
-		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 		char	   *bufptr;
@@ -2091,8 +2081,6 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
-		xid = gxact->xid;
-
 		/*
 		 * Reconstruct subtrans state for the transaction --- needed because
 		 * pg_subtrans is not preserved over a restart.  Note that we are
@@ -2102,15 +2090,17 @@ RecoverPreparedTransactions(void)
 		 * SubTransSetParent has been set before, if the prepared transaction
 		 * generated xid assignment records.
 		 */
-		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
-		buf = ProcessTwoPhaseBuffer(fxid,
+		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf == NULL)
 			continue;
 
+		xid = XidFromFullTransactionId(gxact->fxid);
 		ereport(LOG,
-				(errmsg("recovering prepared transaction %u from shared memory", xid)));
+				(errmsg("recovering prepared transaction %u of epoch %u from shared memory",
+						XidFromFullTransactionId(gxact->fxid),
+						EpochFromFullTransactionId(gxact->fxid))));
 
 		hdr = (TwoPhaseFileHeader *) buf;
 		Assert(TransactionIdEquals(hdr->xid, xid));
@@ -2129,7 +2119,7 @@ RecoverPreparedTransactions(void)
 		 * Recreate its GXACT and dummy PGPROC. But, check whether it was
 		 * added in redo and already has a shmem entry for it.
 		 */
-		MarkAsPreparingGuts(gxact, xid, gid,
+		MarkAsPreparingGuts(gxact, gxact->fxid, gid,
 							hdr->prepared_at,
 							hdr->owner, hdr->database);
 
@@ -2264,7 +2254,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 	if (fromdisk)
 	{
 		/* Read and validate file */
-		buf = ReadTwoPhaseFile(xid, false);
+		buf = ReadTwoPhaseFile(fxid, false);
 	}
 	else
 	{
@@ -2570,7 +2560,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 	gxact->prepared_at = hdr->prepared_at;
 	gxact->prepare_start_lsn = start_lsn;
 	gxact->prepare_end_lsn = end_lsn;
-	gxact->xid = hdr->xid;
+	gxact->fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												   hdr->xid);
 	gxact->owner = hdr->owner;
 	gxact->locking_backend = INVALID_PROC_NUMBER;
 	gxact->valid = false;
@@ -2589,7 +2580,9 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 						   false /* backward */ , false /* WAL */ );
 	}
 
-	elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
+	elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
+		 XidFromFullTransactionId(gxact->fxid),
+		 EpochFromFullTransactionId(gxact->fxid));
 }
 
 /*
@@ -2605,17 +2598,21 @@ void
 PrepareRedoRemove(TransactionId xid, bool giveWarning)
 {
 	GlobalTransaction gxact = NULL;
+	FullTransactionId fxid;
 	int			i;
 	bool		found = false;
 
 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 	Assert(RecoveryInProgress());
 
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		gxact = TwoPhaseState->prepXacts[i];
 
-		if (gxact->xid == xid)
+		if (FullTransactionIdEquals(gxact->fxid, fxid))
 		{
 			Assert(gxact->inredo);
 			found = true;
@@ -2632,15 +2629,13 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	/*
 	 * And now we can clean up any files we may have left.
 	 */
-	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
-	if (gxact->ondisk)
-	{
-		FullTransactionId fxid;
+	elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
+		 XidFromFullTransactionId(fxid),
+		 EpochFromFullTransactionId(fxid));
 
-		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
-												xid);
+	if (gxact->ondisk)
 		RemoveTwoPhaseFile(fxid, giveWarning);
-	}
+
 	RemoveGXact(gxact);
 }
 
@@ -2688,7 +2683,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 			 * between publisher and subscriber.
 			 */
 			if (gxact->ondisk)
-				buf = ReadTwoPhaseFile(gxact->xid, false);
+				buf = ReadTwoPhaseFile(gxact->fxid, false);
 			else
 			{
 				Assert(gxact->prepare_start_lsn);
-- 
2.47.1

Attachment: signature.asc
Description: PGP signature

Reply via email to