On Thu, Dec 26, 2024 at 06:11:25PM +0300, Давыдов Виталий wrote:
> I concerned about twophase file name generation. The function
> TwoPhaseFilePath() is pretty straitforward and unambiguous in 16 and
> earlier versions. The logic of this function in 17+ seems to be more
> complex. I do not understand it clearly. But, I guess, it will work
> incorrectly after turning to a newer epoch, because the epoch is
> calculated from TransamVariables->nextXid, but not from problem
> xid. The same problem may happen if we are in epoch 1 or greater. It
> will produce a wrong file name, because the epoch will be obtained
> from the last xid, not from file name xid. In another words, the
> function AdjustToFullTransactionId assumes that if xid >
> TransamVariables->nextXid, then the xid from the previous epoch. I
> may be not the case in our scenario.

Yeah.  At this stage, we can pretty much say that the whole idea of
relying AdjustToFullTransactionId() is broken, because we would build
2PC file names based on wrong assumptions, while orphaned files could
be in the far past or far future depending on the epoch.

TBH, we'll live better if we remove AdjustToFullTransactionId() and
sprinkle a bit more the knowledge of FullTransactionIds to build
correctly the 2PC file path in v17~.  I've been playing with this code
for a couple of hours and finished with the attached patch.  I have
wondered if ReadTwoPhaseFile() should gain the same knowledge as
TwoPhaseFilePath(), but decided to limit the invasiness because we
always call ReadTwoPhaseFile() once we don't have any orphaned files,
for all the phases of recovery.  So while this requires a bit more
logic that depends on FullTransactionIdFromEpochAndXid() and
ReadNextFullTransactionId() to build a FullTransactionId and get the
current epoch, that's still acceptable as we only store an XID in the
2PC file.

So please see the attached.  You will note that RemoveTwoPhaseFile(),
ProcessTwoPhaseBuffer() and TwoPhaseFilePath() now require a
FullTransactionId, hence callers need to think about the epoch to use.
That should limit future errors compared to passing the file name as
optional argument.

What do you think?
--
Michael
From bb250c0cd303f0d8db0fee2d26353bf0e0cb1275 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 27 Dec 2024 16:34:44 +0900
Subject: [PATCH v2] Fix failure with incorrect epoch handling for 2PC files at
 recovery

At the beginning of recovery, an orphaned two-phase file in an epoch
different than the one defined in the checkpoint record could not be
removed based on the assumption that AdjustToFullTransactionId() relies
on, assuming that all files would be either from the current epoch for
the epoch before that.

If the checkpoint epoch was 0 while the 2PC file was orphaned and in the
future, AdjustToFullTransactionId() would underflow the epoch used to
build the 2PC file path.  In non-assert builds, this would create a
WARNING message referring to a 2PC file with an epoch of "FFFFFFFF" (or
UINT32_MAX), as an effect of the underflow calculation.

Some tests are added with dummy 2PC files in the past and the future,
checking that these are properly removed.

Issue introduced by 5a1dfde8334b.

Reported-by: Vitaly Davydov
Discussion: https://postgr.es/m/13b5b6-676c3080-4d-531db900@47931709
Backpatch-through: 17
---
 src/backend/access/transam/twophase.c | 231 +++++++++++++++++---------
 src/test/recovery/t/009_twophase.pl   |  31 ++++
 2 files changed, 184 insertions(+), 78 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 49be1df91c..01f5d728be 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
 static void RemoveGXact(GlobalTransaction gxact);
 
 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(TransactionId xid,
+static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
 								   XLogRecPtr prepare_start_lsn,
 								   bool fromdisk, bool setParent, bool setNextXid);
 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
 								const char *gid, TimestampTz prepared_at, Oid owner,
 								Oid databaseid);
-static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
 
 /*
@@ -926,42 +926,9 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
 /* State file support													*/
 /************************************************************************/
 
-/*
- * Compute the FullTransactionId for the given TransactionId.
- *
- * The wrap logic is safe here because the span of active xids cannot exceed one
- * epoch at any given time.
- */
-static inline FullTransactionId
-AdjustToFullTransactionId(TransactionId xid)
-{
-	FullTransactionId nextFullXid;
-	TransactionId nextXid;
-	uint32		epoch;
-
-	Assert(TransactionIdIsValid(xid));
-
-	LWLockAcquire(XidGenLock, LW_SHARED);
-	nextFullXid = TransamVariables->nextXid;
-	LWLockRelease(XidGenLock);
-
-	nextXid = XidFromFullTransactionId(nextFullXid);
-	epoch = EpochFromFullTransactionId(nextFullXid);
-	if (unlikely(xid > nextXid))
-	{
-		/* Wraparound occurred, must be from a prev epoch. */
-		Assert(epoch > 0);
-		epoch--;
-	}
-
-	return FullTransactionIdFromEpochAndXid(epoch, xid);
-}
-
 static inline int
-TwoPhaseFilePath(char *path, TransactionId xid)
+TwoPhaseFilePath(char *path, FullTransactionId fxid)
 {
-	FullTransactionId fxid = AdjustToFullTransactionId(xid);
-
 	return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
 					EpochFromFullTransactionId(fxid),
 					XidFromFullTransactionId(fxid));
@@ -1297,7 +1264,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * If it looks OK (has a valid magic number and CRC), return the palloc'd
  * contents of the file, issuing an error when finding corrupted data.  If
  * missing_ok is true, which indicates that missing files can be safely
- * ignored, then return NULL.  This state can be reached when doing recovery.
+ * ignored, then return NULL.  This state can be reached when doing recovery
+ * after discarding twophase files from other epochs.
  */
 static char *
 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
@@ -1311,8 +1279,16 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
 	pg_crc32c	calc_crc,
 				file_crc;
 	int			r;
+	FullTransactionId fxid;
+	FullTransactionId nextFullXid;
+	uint32		epoch;
 
-	TwoPhaseFilePath(path, xid);
+	/* get current epoch */
+	nextFullXid = ReadNextFullTransactionId();
+	epoch = EpochFromFullTransactionId(nextFullXid);
+
+	fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
 	if (fd < 0)
@@ -1537,7 +1513,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
-
 	/*
 	 * Disassemble the header area
 	 */
@@ -1677,10 +1652,22 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	AtEOXact_PgStat(isCommit, false);
 
 	/*
-	 * And now we can clean up any files we may have left.
+	 * And now we can clean up any files we may have left.  These should
+	 * be from the current epoch.
 	 */
 	if (ondisk)
-		RemoveTwoPhaseFile(xid, true);
+	{
+		uint32		epoch;
+		FullTransactionId nextFullXid;
+		FullTransactionId fxid;
+
+		/* get current epoch */
+		nextFullXid = ReadNextFullTransactionId();
+		epoch = EpochFromFullTransactionId(nextFullXid);
+
+		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		RemoveTwoPhaseFile(fxid, true);
+	}
 
 	MyLockedGxact = NULL;
 
@@ -1718,13 +1705,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
  *
  * If giveWarning is false, do not complain about file-not-present;
  * this is an expected case during WAL replay.
+ *
+ * This routine is used at early stages at recovery where future and
+ * past orphaned files are checked, hence the FullTransactionId to build
+ * a complete file name fit for the removal.
  */
 static void
-RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
+RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
 {
 	char		path[MAXPGPATH];
 
-	TwoPhaseFilePath(path, xid);
+	TwoPhaseFilePath(path, fxid);
 	if (unlink(path))
 		if (errno != ENOENT || giveWarning)
 			ereport(WARNING,
@@ -1744,13 +1735,21 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 	char		path[MAXPGPATH];
 	pg_crc32c	statefile_crc;
 	int			fd;
+	FullTransactionId fxid;
+	FullTransactionId nextFullXid;
+	uint32		epoch;
 
 	/* Recompute CRC */
 	INIT_CRC32C(statefile_crc);
 	COMP_CRC32C(statefile_crc, content, len);
 	FIN_CRC32C(statefile_crc);
 
-	TwoPhaseFilePath(path, xid);
+	/* Use current epoch */
+	nextFullXid = ReadNextFullTransactionId();
+	epoch = EpochFromFullTransactionId(nextFullXid);
+
+	fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path,
 						   O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
@@ -1898,7 +1897,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
  * This is called once at the beginning of recovery, saving any extra
  * lookups in the future.  Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way.
+ * minimum XID horizon are discarded on the way.  Two phase files with
+ * an epoch older or newer than the current checkpoint's record epoch
+ * are also discarded.
  */
 void
 restoreTwoPhaseData(void)
@@ -1913,14 +1914,11 @@ restoreTwoPhaseData(void)
 		if (strlen(clde->d_name) == 16 &&
 			strspn(clde->d_name, "0123456789ABCDEF") == 16)
 		{
-			TransactionId xid;
 			FullTransactionId fxid;
 			char	   *buf;
 
 			fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
-			xid = XidFromFullTransactionId(fxid);
-
-			buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
+			buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
 										true, false, false);
 			if (buf == NULL)
 				continue;
@@ -1971,6 +1969,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
 	TransactionId result = origNextXid;
 	TransactionId *xids = NULL;
+	uint32		epoch = EpochFromFullTransactionId(nextXid);
 	int			nxids = 0;
 	int			allocsize = 0;
 	int			i;
@@ -1979,6 +1978,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		TransactionId xid;
+		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
@@ -1986,7 +1986,12 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		xid = gxact->xid;
 
-		buf = ProcessTwoPhaseBuffer(xid,
+		/*
+		 * All two-phase files with past and future epoch in pg_twophase are
+		 * gone at this point, so we're OK to rely on only the current epoch.
+		 */
+		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
 
@@ -2048,11 +2053,18 @@ void
 StandbyRecoverPreparedTransactions(void)
 {
 	int			i;
+	uint32		epoch;
+	FullTransactionId nextFullXid;
+
+	/* get current epoch */
+	nextFullXid = ReadNextFullTransactionId();
+	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		TransactionId xid;
+		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
@@ -2060,7 +2072,12 @@ StandbyRecoverPreparedTransactions(void)
 
 		xid = gxact->xid;
 
-		buf = ProcessTwoPhaseBuffer(xid,
+		/*
+		 * At this stage, we're OK to work with the current epoch
+		 * as all past and future files have been already discarded.
+		 */
+		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf != NULL)
@@ -2089,11 +2106,18 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
+	uint32		epoch;
+	FullTransactionId nextFullXid;
+
+	/* get current epoch */
+	nextFullXid = ReadNextFullTransactionId();
+	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		TransactionId xid;
+		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 		char	   *bufptr;
@@ -2101,6 +2125,10 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
+		/*
+		 * At this stage, we're OK to work with the current epoch
+		 * as all past and future files have been already discarded.
+		 */
 		xid = gxact->xid;
 
 		/*
@@ -2112,7 +2140,8 @@ RecoverPreparedTransactions(void)
 		 * SubTransSetParent has been set before, if the prepared transaction
 		 * generated xid assignment records.
 		 */
-		buf = ProcessTwoPhaseBuffer(xid,
+		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf == NULL)
@@ -2180,7 +2209,7 @@ RecoverPreparedTransactions(void)
 /*
  * ProcessTwoPhaseBuffer
  *
- * Given a transaction id, read it either from disk or read it directly
+ * Given a FullTransactionId, read it either from disk or read it directly
  * via shmem xlog record pointer using the provided "prepare_start_lsn".
  *
  * If setParent is true, set up subtransaction parent linkages.
@@ -2189,23 +2218,66 @@ RecoverPreparedTransactions(void)
  * value scanned.
  */
 static char *
-ProcessTwoPhaseBuffer(TransactionId xid,
+ProcessTwoPhaseBuffer(FullTransactionId fxid,
 					  XLogRecPtr prepare_start_lsn,
 					  bool fromdisk,
 					  bool setParent, bool setNextXid)
 {
 	FullTransactionId nextXid = TransamVariables->nextXid;
-	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
 	TransactionId *subxids;
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
 	int			i;
+	TransactionId xid = XidFromFullTransactionId(fxid);
 
 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
 	if (!fromdisk)
 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
+	/*
+	 * Reject full XID if too new.  Note that this discards files from future
+	 * epochs.
+	 */
+	if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
+	{
+		if (fromdisk)
+		{
+			ereport(WARNING,
+					(errmsg("removing future two-phase state file of epoch %u for transaction %u",
+							EpochFromFullTransactionId(fxid), xid)));
+			RemoveTwoPhaseFile(fxid, true);
+		}
+		else
+		{
+			ereport(WARNING,
+					(errmsg("removing future two-phase state from memory for transaction %u",
+							xid)));
+			PrepareRedoRemove(xid, true);
+		}
+		return NULL;
+	}
+
+	/* Discard files from past epochs */
+	if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
+	{
+		if (fromdisk)
+		{
+			ereport(WARNING,
+					(errmsg("removing past two-phase state file of epoch %u for transaction %u",
+							EpochFromFullTransactionId(fxid), xid)));
+			RemoveTwoPhaseFile(fxid, true);
+		}
+		else
+		{
+			ereport(WARNING,
+					(errmsg("removing past two-phase state from memory for transaction %u",
+							xid)));
+			PrepareRedoRemove(xid, true);
+		}
+		return NULL;
+	}
+
 	/* Already processed? */
 	if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
 	{
@@ -2214,7 +2286,8 @@ ProcessTwoPhaseBuffer(TransactionId xid,
 			ereport(WARNING,
 					(errmsg("removing stale two-phase state file for transaction %u",
 							xid)));
-			RemoveTwoPhaseFile(xid, true);
+
+			RemoveTwoPhaseFile(fxid, true);
 		}
 		else
 		{
@@ -2226,26 +2299,6 @@ ProcessTwoPhaseBuffer(TransactionId xid,
 		return NULL;
 	}
 
-	/* Reject XID if too new */
-	if (TransactionIdFollowsOrEquals(xid, origNextXid))
-	{
-		if (fromdisk)
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state file for transaction %u",
-							xid)));
-			RemoveTwoPhaseFile(xid, true);
-		}
-		else
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state from memory for transaction %u",
-							xid)));
-			PrepareRedoRemove(xid, true);
-		}
-		return NULL;
-	}
-
 	if (fromdisk)
 	{
 		/* Read and validate file */
@@ -2520,8 +2573,16 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 	if (!XLogRecPtrIsInvalid(start_lsn))
 	{
 		char		path[MAXPGPATH];
+		uint32		epoch;
+		FullTransactionId fxid;
+		FullTransactionId nextFullXid;
 
-		TwoPhaseFilePath(path, hdr->xid);
+		/* Use current epoch */
+		nextFullXid = ReadNextFullTransactionId();
+		epoch = EpochFromFullTransactionId(nextFullXid);
+
+		fxid = FullTransactionIdFromEpochAndXid(epoch, hdr->xid);
+		TwoPhaseFilePath(path, fxid);
 
 		if (access(path, F_OK) == 0)
 		{
@@ -2616,7 +2677,21 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	 */
 	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
 	if (gxact->ondisk)
-		RemoveTwoPhaseFile(xid, giveWarning);
+	{
+		uint32		epoch;
+		FullTransactionId nextFullXid;
+		FullTransactionId fxid;
+
+		/*
+		 * We should deal with a file at the current epoch here, so
+		 * grab it.
+		 */
+		nextFullXid = ReadNextFullTransactionId();
+		epoch = EpochFromFullTransactionId(nextFullXid);
+		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+
+		RemoveTwoPhaseFile(fxid, giveWarning);
+	}
 	RemoveGXact(gxact);
 }
 
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
index 4b3e0f77dc..2463b7ea30 100644
--- a/src/test/recovery/t/009_twophase.pl
+++ b/src/test/recovery/t/009_twophase.pl
@@ -572,4 +572,35 @@ my $nsubtrans = $cur_primary->safe_psql('postgres',
 );
 isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
 
+###############################################################################
+# Check handling of orphaned 2PC files at recovery.
+###############################################################################
+
+$cur_standby->teardown_node;
+$cur_primary->teardown_node;
+
+# Grab location in logs of primary
+my $log_offset = -s $cur_primary->logfile;
+
+# Create fake files with a transaction ID large or low enough to be in the
+# future or the past, in different epochs, then check that the primary is able
+# to start and remove these files at recovery.
+
+# First bump the epoch with pg_resetwal.
+$cur_primary->command_ok([ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ],
+						 'bump epoch of primary');
+
+my $future_2pc_file = $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF';
+append_to_file $future_2pc_file, "";
+my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF';
+append_to_file $past_2pc_file, "";
+
+$cur_primary->start;
+$cur_primary->log_check(
+	"two-phase files removed at recovery",
+	$log_offset,
+	log_like =>
+	[qr/removing past two-phase state file of epoch 238 for transaction 4095/,
+	qr/removing future two-phase state file of epoch 511 for transaction 4095/]);
+
 done_testing();
-- 
2.45.2

Attachment: signature.asc
Description: PGP signature

Reply via email to