On Wed, Jan 15, 2025 at 05:00:51PM -0800, Noah Misch wrote: > I agree with accepting +4 bytes in GlobalTransactionData.
Let's just bite the bullet and do that on HEAD and v17, then, integrating deeper FullTransactionIds into the internals of twophase.c. > I think "using the current epoch" is wrong for half of the nextFullXid values > having epoch > 0. For example, nextFullId==2^32 is in epoch 1, but all the > allowable XIDs are in epoch 0. (I mean "allowable" in the sense of > AssertTransactionIdInAllowableRange().) From then until we assign another > 2^31 XIDs, epochs 0 and 1 are both expected in XID values. After 2^31 XID > assignments, every allowable XID be in epoch 1. Hence, twophase.c would need > two-epoch logic like we have in widen_snapshot_xid() and XLogRecGetFullXid(). > Is that right? (I wrote this in a hurry, so this email may have more than the > standard level of errors.) Before commit 7e125b2, twophase also had that > logic. I didn't work out the user-visible consequences of that logic's new > absence here, but I bet on twophase recovery breakage. Similar problem here > (up to two epochs are acceptable, not just one): > > + /* Discard files from past epochs */ > + if (EpochFromFullTransactionId(fxid) < > EpochFromFullTransactionId(nextXid)) Oops, you're right. Your suggestion to unify all that in a single routine is an excellent idea. Missed the bits in xid8funcs.c. > I wrote the attached half-baked patch to fix those, but I tend to think it's > better to use FullTransactionId as many places as possible in twophase.c. > (We'd still need to convert XIDs that we read from xl_xact_prepare records, > along the lines of XLogRecGetFullXid().) How do you see it? I'm all for integrating more FullTransactionIds now that these reflect in the file names, and do a deeper cut. As far as I understand, the most important point of the logic is to detect and discard the future files first in restoreTwoPhaseData() -> ProcessTwoPhaseBuffer() when scanning the contents of pg_twophase at the beginning of recovery. Once this filtering is done, it should be safe to use your FullTransactionIdFromAllowableAt() when doing the fxid <-> xid transitions between the records and the files on disk flushed by a restartpoint which store an XID, and the shmem state of GlobalTransactionData with a fxid. With the additions attached, FullTransactionIdFromAllowableAt() gets down from 8 to 6 calls in twophase.c. The change related to MarkAsPreparingGuts() seems optional, though. I am trying to figure out how to write a regression test to trigger this error, lacking a bit of time today. That's going to require more trickery with pg_resetwal to make that cheap, or something like that.. Attached are some suggestions, as of a 0002 that applies on top of your 0001. XLogRecGetFullXid() is used nowhere. This could be removed, perhaps, or not? -- Michael
From 0e1538fa72ede3566ea3dd03958540b80e9fd15c Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Thu, 16 Jan 2025 15:14:06 +0900 Subject: [PATCH v2 1/2] Fix twophase.c XID epoch tracking Half the time after epoch 0, allowable XIDs span two epochs. This would have no user-visible consequences during epoch 0, but I expect (unconfirmed) twophase breakage during other epochs. FIXME likely rework this in favor of broader fulltransaction use in twophase.c --- src/include/access/transam.h | 29 ++++++++++ src/backend/access/transam/twophase.c | 75 ++++++------------------- src/backend/access/transam/xlogreader.c | 18 +----- src/backend/utils/adt/xid8funcs.c | 43 ++++---------- 4 files changed, 58 insertions(+), 107 deletions(-) diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 0cab8653f1..48fce16aeb 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -77,6 +77,35 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid) return result; } +/* + * Compute FullTransactionId for the given TransactionId, assuming xid was + * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was + * nextFullXid. + */ +static inline FullTransactionId +FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, + TransactionId xid) +{ + uint32 epoch; + + /* Special transaction ID. */ + if (!TransactionIdIsNormal(xid)) + return FullTransactionIdFromEpochAndXid(0, xid); + + /* + * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't been + * issued yet when xid was in the past. The xid must therefore be from + * the epoch of nextFullXid or the epoch before. We know this because we + * must remove (by freezing) an XID before assigning the XID half an epoch + * ahead of it. + */ + epoch = EpochFromFullTransactionId(nextFullXid); + if (xid > XidFromFullTransactionId(nextFullXid)) + epoch--; + + return FullTransactionIdFromEpochAndXid(epoch, xid); +} + static inline FullTransactionId FullTransactionIdFromU64(uint64 value) { diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a3190dc4f1..7a162938f4 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -926,24 +926,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) /* State file support */ /************************************************************************/ -/* - * Compute FullTransactionId for the given TransactionId, using the current - * epoch. - */ -static inline FullTransactionId -FullTransactionIdFromCurrentEpoch(TransactionId xid) -{ - FullTransactionId fxid; - FullTransactionId nextFullXid; - uint32 epoch; - - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); - - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - return fxid; -} - static inline int TwoPhaseFilePath(char *path, FullTransactionId fxid) { @@ -1283,7 +1265,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * contents of the file, issuing an error when finding corrupted data. If * missing_ok is true, which indicates that missing files can be safely * ignored, then return NULL. This state can be reached when doing recovery - * after discarding two-phase files from other epochs. + * after discarding two-phase files from frozen epochs. */ static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok) @@ -1299,7 +1281,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) int r; FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); @@ -1664,15 +1646,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit, false); - /* - * And now we can clean up any files we may have left. These should be - * from the current epoch. - */ + /* And now we can clean up any files we may have left. */ if (ondisk) { FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); RemoveTwoPhaseFile(fxid, true); } @@ -1749,8 +1729,7 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, @@ -1900,7 +1879,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * This is called once at the beginning of recovery, saving any extra * lookups in the future. Two-phase files that are newer than the * minimum XID horizon are discarded on the way. Two-phase files with - * an epoch older or newer than the current checkpoint's record epoch + * an epoch frozen relative to the current checkpoint's record epoch * are also discarded. */ void @@ -1971,7 +1950,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId result = origNextXid; TransactionId *xids = NULL; - uint32 epoch = EpochFromFullTransactionId(nextXid); int nxids = 0; int allocsize = 0; int i; @@ -1988,11 +1966,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) xid = gxact->xid; - /* - * All two-phase files with past and future epoch in pg_twophase are - * gone at this point, so we're OK to rely on only the current epoch. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2055,12 +2029,9 @@ void StandbyRecoverPreparedTransactions(void) { int i; - uint32 epoch; FullTransactionId nextFullXid; - /* get current epoch */ nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2074,11 +2045,7 @@ StandbyRecoverPreparedTransactions(void) xid = gxact->xid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); @@ -2108,12 +2075,9 @@ void RecoverPreparedTransactions(void) { int i; - uint32 epoch; FullTransactionId nextFullXid; - /* get current epoch */ nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2127,10 +2091,6 @@ RecoverPreparedTransactions(void) TransactionId *subxids; const char *gid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ xid = gxact->xid; /* @@ -2142,7 +2102,7 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); @@ -2260,8 +2220,9 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, return NULL; } - /* Discard files from past epochs */ - if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid)) + /* Discard files from frozen epochs */ + if (EpochFromFullTransactionId(fxid) + 1 < + EpochFromFullTransactionId(nextXid)) { if (fromdisk) { @@ -2576,8 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, char path[MAXPGPATH]; FullTransactionId fxid; - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(hdr->xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + hdr->xid); TwoPhaseFilePath(path, fxid); if (access(path, F_OK) == 0) @@ -2676,10 +2637,8 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) { FullTransactionId fxid; - /* - * We should deal with a file at the current epoch here. - */ - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); RemoveTwoPhaseFile(fxid, giveWarning); } RemoveGXact(gxact); diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3596af0617..91b6a91767 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) FullTransactionId XLogRecGetFullXid(XLogReaderState *record) { - TransactionId xid, - next_xid; - uint32 epoch; - /* * This function is only safe during replay, because it depends on the * replay state. See AdvanceNextFullTransactionIdPastXid() for more. */ Assert(AmStartupProcess() || !IsUnderPostmaster); - xid = XLogRecGetXid(record); - next_xid = XidFromFullTransactionId(TransamVariables->nextXid); - epoch = EpochFromFullTransactionId(TransamVariables->nextXid); - - /* - * If xid is numerically greater than next_xid, it has to be from the last - * epoch. - */ - if (unlikely(xid > next_xid)) - --epoch; - - return FullTransactionIdFromEpochAndXid(epoch, xid); + return FullTransactionIdFromAllowableAt(TransamVariables->nextXid, + XLogRecGetXid(record)); } #endif diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c index 4736755b29..b17395617f 100644 --- a/src/backend/utils/adt/xid8funcs.c +++ b/src/backend/utils/adt/xid8funcs.c @@ -154,35 +154,6 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid) return !FullTransactionIdPrecedes(fxid, oldest_fxid); } -/* - * Convert a TransactionId obtained from a snapshot held by the caller to a - * FullTransactionId. Use next_fxid as a reference FullTransactionId, so that - * we can compute the high order bits. It must have been obtained by the - * caller with ReadNextFullTransactionId() after the snapshot was created. - */ -static FullTransactionId -widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid) -{ - TransactionId next_xid = XidFromFullTransactionId(next_fxid); - uint32 epoch = EpochFromFullTransactionId(next_fxid); - - /* Special transaction ID. */ - if (!TransactionIdIsNormal(xid)) - return FullTransactionIdFromEpochAndXid(0, xid); - - /* - * The 64 bit result must be <= next_fxid, since next_fxid hadn't been - * issued yet when the snapshot was created. Every TransactionId in the - * snapshot must therefore be from the same epoch as next_fxid, or the - * epoch before. We know this because next_fxid is never allow to get - * more than one epoch ahead of the TransactionIds in any snapshot. - */ - if (xid > next_xid) - epoch--; - - return FullTransactionIdFromEpochAndXid(epoch, xid); -} - /* * txid comparator for qsort/bsearch */ @@ -420,12 +391,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS) nxip = cur->xcnt; snap = palloc(PG_SNAPSHOT_SIZE(nxip)); - /* fill */ - snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid); - snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid); + /* + * Fill. This is the current backend's active snapshot, so MyProc->xmin + * is <= all these XIDs. As long as that remains so, oldestXid can't + * advance past any of these XIDs. Hence, these XIDs remain allowable + * relative to next_fxid. + */ + snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin); + snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax); snap->nxip = nxip; for (i = 0; i < nxip; i++) - snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid); + snap->xip[i] = + FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]); /* * We want them guaranteed to be in ascending order. This also removes -- 2.47.1
From 55b6d87b3cf032a65efdce260f5422fdee6426f1 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Thu, 16 Jan 2025 16:45:33 +0900 Subject: [PATCH v2 2/2] Integrate deeper FullTransactionIds into twophase.c --- src/backend/access/transam/twophase.c | 115 ++++++++++++-------------- 1 file changed, 55 insertions(+), 60 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 7a162938f4..75008fb949 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -159,7 +159,7 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ - TransactionId xid; /* The GXACT id */ + FullTransactionId fxid; /* The GXACT full xid */ Oid owner; /* ID of user that executed the xact */ ProcNumber locking_backend; /* backend currently working on the xact */ @@ -224,11 +224,11 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); static char *ProcessTwoPhaseBuffer(FullTransactionId xid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid); -static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, +static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning); -static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); +static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len); /* * Initialization of shared memory @@ -360,6 +360,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid) { GlobalTransaction gxact; + FullTransactionId fxid; int i; if (strlen(gid) >= GIDSIZE) @@ -407,7 +408,9 @@ MarkAsPreparing(TransactionId xid, const char *gid, gxact = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact->next; - MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); + MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid); gxact->ondisk = false; @@ -430,11 +433,13 @@ MarkAsPreparing(TransactionId xid, const char *gid, * Note: This function should be called with appropriate locks held. */ static void -MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, - TimestampTz prepared_at, Oid owner, Oid databaseid) +MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid, + const char *gid, TimestampTz prepared_at, Oid owner, + Oid databaseid) { PGPROC *proc; int i; + TransactionId xid = XidFromFullTransactionId(fxid); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); @@ -479,7 +484,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->subxidStatus.count = 0; gxact->prepared_at = prepared_at; - gxact->xid = xid; + gxact->fxid = fxid; gxact->owner = owner; gxact->locking_backend = MyProcNumber; gxact->valid = false; @@ -800,6 +805,7 @@ static GlobalTransaction TwoPhaseGetGXact(TransactionId xid, bool lock_held) { GlobalTransaction result = NULL; + FullTransactionId fxid; int i; static TransactionId cached_xid = InvalidTransactionId; @@ -817,11 +823,14 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) if (!lock_held) LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - if (gxact->xid == xid) + if (FullTransactionIdEquals(gxact->fxid, fxid)) { result = gxact; break; @@ -881,7 +890,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, *have_more = true; break; } - result = gxact->xid; + result = XidFromFullTransactionId(gxact->fxid); } } @@ -1032,7 +1041,7 @@ void StartPrepare(GlobalTransaction gxact) { PGPROC *proc = GetPGProcByNumber(gxact->pgprocno); - TransactionId xid = gxact->xid; + TransactionId xid = XidFromFullTransactionId(gxact->fxid); TwoPhaseFileHeader hdr; TransactionId *children; RelFileLocator *commitrels; @@ -1268,7 +1277,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * after discarding two-phase files from frozen epochs. */ static char * -ReadTwoPhaseFile(TransactionId xid, bool missing_ok) +ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok) { char path[MAXPGPATH]; char *buf; @@ -1279,9 +1288,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) pg_crc32c calc_crc, file_crc; int r; - FullTransactionId fxid; - fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); @@ -1447,6 +1454,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid) char *buf; TwoPhaseFileHeader *hdr; bool result; + FullTransactionId fxid; Assert(TransactionIdIsValid(xid)); @@ -1454,7 +1462,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid) return false; /* nothing to do */ /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); + buf = ReadTwoPhaseFile(fxid, true); if (buf == NULL) return false; @@ -1474,6 +1483,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) { GlobalTransaction gxact; PGPROC *proc; + FullTransactionId fxid; TransactionId xid; bool ondisk; char *buf; @@ -1495,7 +1505,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) */ gxact = LockGXact(gid, GetUserId()); proc = GetPGProcByNumber(gxact->pgprocno); - xid = gxact->xid; + fxid = gxact->fxid; + xid = XidFromFullTransactionId(fxid); /* * Read and validate 2PC state data. State data will typically be stored @@ -1503,7 +1514,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * to disk if for some reason they have lived for a long time. */ if (gxact->ondisk) - buf = ReadTwoPhaseFile(xid, false); + buf = ReadTwoPhaseFile(fxid, false); else XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); @@ -1648,13 +1659,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* And now we can clean up any files we may have left. */ if (ondisk) - { - FullTransactionId fxid; - - fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), - xid); RemoveTwoPhaseFile(fxid, true); - } MyLockedGxact = NULL; @@ -1717,19 +1722,17 @@ RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning) * Note: content and len don't include CRC. */ static void -RecreateTwoPhaseFile(TransactionId xid, void *content, int len) +RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len) { char path[MAXPGPATH]; pg_crc32c statefile_crc; int fd; - FullTransactionId fxid; /* Recompute CRC */ INIT_CRC32C(statefile_crc); COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, @@ -1842,7 +1845,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(gxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->fxid, buf, len); gxact->ondisk = true; gxact->prepare_start_lsn = InvalidXLogRecPtr; gxact->prepare_end_lsn = InvalidXLogRecPtr; @@ -1964,9 +1967,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) Assert(gxact->inredo); - xid = gxact->xid; - - fxid = FullTransactionIdFromAllowableAt(nextXid, xid); + fxid = gxact->fxid; + xid = XidFromFullTransactionId(fxid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2029,24 +2031,16 @@ void StandbyRecoverPreparedTransactions(void) { int i; - FullTransactionId nextFullXid; - - nextFullXid = ReadNextFullTransactionId(); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; Assert(gxact->inredo); - xid = gxact->xid; - - fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(gxact->fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf != NULL) @@ -2075,15 +2069,11 @@ void RecoverPreparedTransactions(void) { int i; - FullTransactionId nextFullXid; - - nextFullXid = ReadNextFullTransactionId(); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; char *bufptr; @@ -2091,8 +2081,6 @@ RecoverPreparedTransactions(void) TransactionId *subxids; const char *gid; - xid = gxact->xid; - /* * Reconstruct subtrans state for the transaction --- needed because * pg_subtrans is not preserved over a restart. Note that we are @@ -2102,15 +2090,17 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(gxact->fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf == NULL) continue; + xid = XidFromFullTransactionId(gxact->fxid); ereport(LOG, - (errmsg("recovering prepared transaction %u from shared memory", xid))); + (errmsg("recovering prepared transaction %u of epoch %u from shared memory", + XidFromFullTransactionId(gxact->fxid), + EpochFromFullTransactionId(gxact->fxid)))); hdr = (TwoPhaseFileHeader *) buf; Assert(TransactionIdEquals(hdr->xid, xid)); @@ -2129,7 +2119,7 @@ RecoverPreparedTransactions(void) * Recreate its GXACT and dummy PGPROC. But, check whether it was * added in redo and already has a shmem entry for it. */ - MarkAsPreparingGuts(gxact, xid, gid, + MarkAsPreparingGuts(gxact, gxact->fxid, gid, hdr->prepared_at, hdr->owner, hdr->database); @@ -2264,7 +2254,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, if (fromdisk) { /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, false); + buf = ReadTwoPhaseFile(fxid, false); } else { @@ -2570,7 +2560,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, gxact->prepared_at = hdr->prepared_at; gxact->prepare_start_lsn = start_lsn; gxact->prepare_end_lsn = end_lsn; - gxact->xid = hdr->xid; + gxact->fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + hdr->xid); gxact->owner = hdr->owner; gxact->locking_backend = INVALID_PROC_NUMBER; gxact->valid = false; @@ -2589,7 +2580,9 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, false /* backward */ , false /* WAL */ ); } - elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); + elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u", + XidFromFullTransactionId(gxact->fxid), + EpochFromFullTransactionId(gxact->fxid)); } /* @@ -2605,17 +2598,21 @@ void PrepareRedoRemove(TransactionId xid, bool giveWarning) { GlobalTransaction gxact = NULL; + FullTransactionId fxid; int i; bool found = false; Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; - if (gxact->xid == xid) + if (FullTransactionIdEquals(gxact->fxid, fxid)) { Assert(gxact->inredo); found = true; @@ -2632,15 +2629,13 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) /* * And now we can clean up any files we may have left. */ - elog(DEBUG2, "removing 2PC data for transaction %u", xid); - if (gxact->ondisk) - { - FullTransactionId fxid; + elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ", + XidFromFullTransactionId(fxid), + EpochFromFullTransactionId(fxid)); - fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), - xid); + if (gxact->ondisk) RemoveTwoPhaseFile(fxid, giveWarning); - } + RemoveGXact(gxact); } @@ -2688,7 +2683,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, * between publisher and subscriber. */ if (gxact->ondisk) - buf = ReadTwoPhaseFile(gxact->xid, false); + buf = ReadTwoPhaseFile(gxact->fxid, false); else { Assert(gxact->prepare_start_lsn); -- 2.47.1
signature.asc
Description: PGP signature