On Thu, Jan 16, 2025 at 04:50:09PM +0900, Michael Paquier wrote: > On Wed, Jan 15, 2025 at 05:00:51PM -0800, Noah Misch wrote: > > I think "using the current epoch" is wrong for half of the nextFullXid > > values > > having epoch > 0. For example, nextFullId==2^32 is in epoch 1, but all the > > allowable XIDs are in epoch 0. (I mean "allowable" in the sense of > > AssertTransactionIdInAllowableRange().) From then until we assign another > > 2^31 XIDs, epochs 0 and 1 are both expected in XID values. After 2^31 XID > > assignments, every allowable XID be in epoch 1. Hence, twophase.c would > > need > > two-epoch logic like we have in widen_snapshot_xid() and > > XLogRecGetFullXid(). > > Is that right? (I wrote this in a hurry, so this email may have more than > > the > > standard level of errors.) Before commit 7e125b2, twophase also had that > > logic. I didn't work out the user-visible consequences of that logic's new > > absence here, but I bet on twophase recovery breakage. Similar problem here > > (up to two epochs are acceptable, not just one): > > > > + /* Discard files from past epochs */ > > + if (EpochFromFullTransactionId(fxid) < > > EpochFromFullTransactionId(nextXid)) > > Oops, you're right. Your suggestion to unify all that in a single > routine is an excellent idea. Missed the bits in xid8funcs.c.
Added. > > I wrote the attached half-baked patch to fix those, but I tend to think it's > > better to use FullTransactionId as many places as possible in twophase.c. > > (We'd still need to convert XIDs that we read from xl_xact_prepare records, > > along the lines of XLogRecGetFullXid().) How do you see it? > > I'm all for integrating more FullTransactionIds now that these reflect > in the file names, and do a deeper cut. > > As far as I understand, the most important point of the logic is to > detect and discard the future files first in restoreTwoPhaseData() -> > ProcessTwoPhaseBuffer() when scanning the contents of pg_twophase at > the beginning of recovery. Once this filtering is done, it should be > safe to use your FullTransactionIdFromAllowableAt() when doing > the fxid <-> xid transitions between the records and the files on disk > flushed by a restartpoint which store an XID, and the shmem state of > GlobalTransactionData with a fxid. (I did not expect that a function called restoreTwoPhaseData() would run before a function called PrescanPreparedTransactions(), but so it is.) How is it that restoreTwoPhaseData() -> ProcessTwoPhaseBuffer() can safely call TransactionIdDidAbort() when we've not replayed WAL to make CLOG consistent? What can it assume about the value of TransamVariables->nextXid at that early time? > With the additions attached, FullTransactionIdFromAllowableAt() gets > down from 8 to 6 calls in twophase.c. The change related to > MarkAsPreparingGuts() seems optional, though. Thanks. It's probably not worth doing at that level of reduction. I tried spreading fxid further and got it down to three conversions, corresponding to redo of each of XLOG_XACT_PREPARE, XLOG_XACT_COMMIT_PREPARED, and XLOG_XACT_ABORT_PREPARED. I'm attaching the WIP of that. It's not as satisfying as I expected, so FullTransactionIdFromCurrentEpoch-v0.patch may yet be the better direction. The ProcessTwoPhaseBuffer() code to remove "past two-phase state" seems best-effort, independent of this change. Just because an XID is in a potentially-acceptable epoch doesn't mean TransactionIdDidCommit() will find clog for it. I've not studied whether that matters. Incidentally, this comment about when a function is called: * PrescanPreparedTransactions * * Scan the shared memory entries of TwoPhaseState and determine the range * of valid XIDs present. This is run during database startup, after we * have completed reading WAL. TransamVariables->nextXid has been set to * one more than the highest XID for which evidence exists in WAL. doesn't match the timing of the actual earliest call: /* REDO */ if (InRecovery) { ... if (ArchiveRecoveryRequested && EnableHotStandby) { ... if (wasShutdown) oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); else oldestActiveXID = checkPoint.oldestActiveXid; ... PerformWalRecovery(); performedWalRecovery = true; > I am trying to figure > out how to write a regression test to trigger this error, lacking a > bit of time today. That's going to require more trickery with > pg_resetwal to make that cheap, or something like that.. Attached are > some suggestions, as of a 0002 that applies on top of your 0001. Thanks. I'd value having your regression test, but manual-ish testing could suffice if it's too hard. > XLogRecGetFullXid() is used nowhere. This could be removed, perhaps, > or not? Maybe. Looks like it was born unused in 67b9b3c (2019-07), so removing may as well be a separate discussion.
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c index 8a8e36d..8273123 100644 --- a/contrib/amcheck/verify_heapam.c +++ b/contrib/amcheck/verify_heapam.c @@ -1877,7 +1877,9 @@ check_tuple(HeapCheckContext *ctx, bool *xmin_commit_status_ok, /* * Convert a TransactionId into a FullTransactionId using our cached values of * the valid transaction ID range. It is the caller's responsibility to have - * already updated the cached values, if necessary. + * already updated the cached values, if necessary. This is akin to + * FullTransactionIdFromAllowableAt(), but it tolerates corruption in the form + * of an xid before epoch 0. */ static FullTransactionId FullTransactionIdFromXidAndCtx(TransactionId xid, const HeapCheckContext *ctx) diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 27ccdf9..4e52792 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -1847,7 +1847,7 @@ AtPrepare_MultiXact(void) * Clean up after successful PREPARE TRANSACTION */ void -PostPrepare_MultiXact(TransactionId xid) +PostPrepare_MultiXact(FullTransactionId fxid) { MultiXactId myOldestMember; @@ -1858,7 +1858,7 @@ PostPrepare_MultiXact(TransactionId xid) myOldestMember = OldestMemberMXactId[MyProcNumber]; if (MultiXactIdIsValid(myOldestMember)) { - ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(xid, false); + ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(fxid, false); /* * Even though storing MultiXactId is atomic, acquire lock to make @@ -1896,10 +1896,10 @@ PostPrepare_MultiXact(TransactionId xid) * Recover the state of a prepared transaction at startup */ void -multixact_twophase_recover(TransactionId xid, uint16 info, +multixact_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { - ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(xid, false); + ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(fxid, false); MultiXactId oldestMember; /* @@ -1917,10 +1917,10 @@ multixact_twophase_recover(TransactionId xid, uint16 info, * Similar to AtEOXact_MultiXact but for COMMIT PREPARED */ void -multixact_twophase_postcommit(TransactionId xid, uint16 info, +multixact_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { - ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(xid, true); + ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(fxid, true); Assert(len == sizeof(MultiXactId)); @@ -1932,10 +1932,10 @@ multixact_twophase_postcommit(TransactionId xid, uint16 info, * This is actually just the same as the COMMIT case. */ void -multixact_twophase_postabort(TransactionId xid, uint16 info, +multixact_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { - multixact_twophase_postcommit(xid, info, recdata, len); + multixact_twophase_postcommit(fxid, info, recdata, len); } /* diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a3190dc..8b2d102 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -159,7 +159,7 @@ typedef struct GlobalTransactionData */ XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */ XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */ - TransactionId xid; /* The GXACT id */ + FullTransactionId fxid; /* The GXACT full xid */ Oid owner; /* ID of user that executed the xact */ ProcNumber locking_backend; /* backend currently working on the xact */ @@ -197,6 +197,7 @@ static GlobalTransaction MyLockedGxact = NULL; static bool twophaseExitRegistered = false; +static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning); static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, @@ -216,19 +217,19 @@ static void RecordTransactionAbortPrepared(TransactionId xid, int nstats, xl_xact_stats_item *stats, const char *gid); -static void ProcessRecords(char *bufptr, TransactionId xid, +static void ProcessRecords(char *bufptr, FullTransactionId fxid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len); -static char *ProcessTwoPhaseBuffer(FullTransactionId xid, +static char *ProcessTwoPhaseBuffer(FullTransactionId fxid, XLogRecPtr prepare_start_lsn, bool fromdisk, bool setParent, bool setNextXid); -static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, +static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning); -static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); +static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len); /* * Initialization of shared memory @@ -356,7 +357,7 @@ PostPrepare_Twophase(void) * Reserve the GID for the given transaction. */ GlobalTransaction -MarkAsPreparing(TransactionId xid, const char *gid, +MarkAsPreparing(FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid) { GlobalTransaction gxact; @@ -407,7 +408,7 @@ MarkAsPreparing(TransactionId xid, const char *gid, gxact = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact->next; - MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid); + MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid); gxact->ondisk = false; @@ -430,11 +431,13 @@ MarkAsPreparing(TransactionId xid, const char *gid, * Note: This function should be called with appropriate locks held. */ static void -MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, - TimestampTz prepared_at, Oid owner, Oid databaseid) +MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid, + const char *gid, TimestampTz prepared_at, Oid owner, + Oid databaseid) { PGPROC *proc; int i; + TransactionId xid = XidFromFullTransactionId(fxid); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); @@ -479,7 +482,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->subxidStatus.count = 0; gxact->prepared_at = prepared_at; - gxact->xid = xid; + gxact->fxid = fxid; gxact->owner = owner; gxact->locking_backend = MyProcNumber; gxact->valid = false; @@ -797,12 +800,12 @@ pg_prepared_xact(PG_FUNCTION_ARGS) * caller had better hold it. */ static GlobalTransaction -TwoPhaseGetGXact(TransactionId xid, bool lock_held) +TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held) { GlobalTransaction result = NULL; int i; - static TransactionId cached_xid = InvalidTransactionId; + static FullTransactionId cached_fxid = {0}; static GlobalTransaction cached_gxact = NULL; Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock)); @@ -811,7 +814,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called * repeatedly for the same XID. We can save work with a simple cache. */ - if (xid == cached_xid) + if (FullTransactionIdEquals(fxid, cached_fxid)) return cached_gxact; if (!lock_held) @@ -821,7 +824,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) { GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; - if (gxact->xid == xid) + if (FullTransactionIdEquals(gxact->fxid, fxid)) { result = gxact; break; @@ -832,9 +835,10 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) LWLockRelease(TwoPhaseStateLock); if (result == NULL) /* should not happen */ - elog(ERROR, "failed to find GlobalTransaction for xid %u", xid); + elog(ERROR, "failed to find GlobalTransaction for xid %u", + XidFromFullTransactionId(fxid)); - cached_xid = xid; + cached_fxid = fxid; cached_gxact = result; return result; @@ -881,7 +885,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, *have_more = true; break; } - result = gxact->xid; + result = XidFromFullTransactionId(gxact->fxid); } } @@ -892,7 +896,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, /* * TwoPhaseGetDummyProcNumber - * Get the dummy proc number for prepared transaction specified by XID + * Get the dummy proc number for prepared transaction * * Dummy proc numbers are similar to proc numbers of real backends. They * start at MaxBackends, and are unique across all currently active real @@ -900,24 +904,24 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, * TwoPhaseStateLock will not be taken, so the caller had better hold it. */ ProcNumber -TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held) +TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held) { - GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held); + GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held); return gxact->pgprocno; } /* * TwoPhaseGetDummyProc - * Get the PGPROC that represents a prepared transaction specified by XID + * Get the PGPROC that represents a prepared transaction * * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the * caller had better hold it. */ PGPROC * -TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) +TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held) { - GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held); + GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held); return GetPGProcByNumber(gxact->pgprocno); } @@ -926,24 +930,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) /* State file support */ /************************************************************************/ -/* - * Compute FullTransactionId for the given TransactionId, using the current - * epoch. - */ -static inline FullTransactionId -FullTransactionIdFromCurrentEpoch(TransactionId xid) -{ - FullTransactionId fxid; - FullTransactionId nextFullXid; - uint32 epoch; - - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); - - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - return fxid; -} - static inline int TwoPhaseFilePath(char *path, FullTransactionId fxid) { @@ -1050,7 +1036,7 @@ void StartPrepare(GlobalTransaction gxact) { PGPROC *proc = GetPGProcByNumber(gxact->pgprocno); - TransactionId xid = gxact->xid; + TransactionId xid = XidFromFullTransactionId(gxact->fxid); TwoPhaseFileHeader hdr; TransactionId *children; RelFileLocator *commitrels; @@ -1283,10 +1269,10 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * contents of the file, issuing an error when finding corrupted data. If * missing_ok is true, which indicates that missing files can be safely * ignored, then return NULL. This state can be reached when doing recovery - * after discarding two-phase files from other epochs. + * after discarding two-phase files from frozen epochs. */ static char * -ReadTwoPhaseFile(TransactionId xid, bool missing_ok) +ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok) { char path[MAXPGPATH]; char *buf; @@ -1297,9 +1283,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) pg_crc32c calc_crc, file_crc; int r; - FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); @@ -1465,6 +1449,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid) char *buf; TwoPhaseFileHeader *hdr; bool result; + FullTransactionId fxid; Assert(TransactionIdIsValid(xid)); @@ -1472,7 +1457,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid) return false; /* nothing to do */ /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, true); + fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid); + buf = ReadTwoPhaseFile(fxid, true); if (buf == NULL) return false; @@ -1492,6 +1478,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) { GlobalTransaction gxact; PGPROC *proc; + FullTransactionId fxid; TransactionId xid; bool ondisk; char *buf; @@ -1513,7 +1500,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) */ gxact = LockGXact(gid, GetUserId()); proc = GetPGProcByNumber(gxact->pgprocno); - xid = gxact->xid; + fxid = gxact->fxid; + xid = XidFromFullTransactionId(fxid); /* * Read and validate 2PC state data. State data will typically be stored @@ -1521,7 +1509,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * to disk if for some reason they have lived for a long time. */ if (gxact->ondisk) - buf = ReadTwoPhaseFile(xid, false); + buf = ReadTwoPhaseFile(fxid, false); else XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); @@ -1640,11 +1628,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* And now do the callbacks */ if (isCommit) - ProcessRecords(bufptr, xid, twophase_postcommit_callbacks); + ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks); else - ProcessRecords(bufptr, xid, twophase_postabort_callbacks); + ProcessRecords(bufptr, fxid, twophase_postabort_callbacks); - PredicateLockTwoPhaseFinish(xid, isCommit); + PredicateLockTwoPhaseFinish(fxid, isCommit); /* * Read this value while holding the two-phase lock, as the on-disk 2PC @@ -1664,17 +1652,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit, false); - /* - * And now we can clean up any files we may have left. These should be - * from the current epoch. - */ + /* And now we can clean up any files we may have left. */ if (ondisk) - { - FullTransactionId fxid; - - fxid = FullTransactionIdFromCurrentEpoch(xid); RemoveTwoPhaseFile(fxid, true); - } MyLockedGxact = NULL; @@ -1687,7 +1667,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record. */ static void -ProcessRecords(char *bufptr, TransactionId xid, +ProcessRecords(char *bufptr, FullTransactionId fxid, const TwoPhaseCallback callbacks[]) { for (;;) @@ -1701,14 +1681,14 @@ ProcessRecords(char *bufptr, TransactionId xid, bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk)); if (callbacks[record->rmid] != NULL) - callbacks[record->rmid] (xid, record->info, bufptr, record->len); + callbacks[record->rmid] (fxid, record->info, bufptr, record->len); bufptr += MAXALIGN(record->len); } } /* - * Remove the 2PC file for the specified XID. + * Remove the 2PC file. * * If giveWarning is false, do not complain about file-not-present; * this is an expected case during WAL replay. @@ -1737,20 +1717,17 @@ RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning) * Note: content and len don't include CRC. */ static void -RecreateTwoPhaseFile(TransactionId xid, void *content, int len) +RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len) { char path[MAXPGPATH]; pg_crc32c statefile_crc; int fd; - FullTransactionId fxid; /* Recompute CRC */ INIT_CRC32C(statefile_crc); COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, @@ -1863,7 +1840,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) int len; XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len); - RecreateTwoPhaseFile(gxact->xid, buf, len); + RecreateTwoPhaseFile(gxact->fxid, buf, len); gxact->ondisk = true; gxact->prepare_start_lsn = InvalidXLogRecPtr; gxact->prepare_end_lsn = InvalidXLogRecPtr; @@ -1900,7 +1877,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * This is called once at the beginning of recovery, saving any extra * lookups in the future. Two-phase files that are newer than the * minimum XID horizon are discarded on the way. Two-phase files with - * an epoch older or newer than the current checkpoint's record epoch + * an epoch frozen relative to the current checkpoint's record epoch * are also discarded. */ void @@ -1925,7 +1902,7 @@ restoreTwoPhaseData(void) if (buf == NULL) continue; - PrepareRedoAdd(buf, InvalidXLogRecPtr, + PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr, InvalidXLogRecPtr, InvalidRepOriginId); } } @@ -1971,7 +1948,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId result = origNextXid; TransactionId *xids = NULL; - uint32 epoch = EpochFromFullTransactionId(nextXid); int nxids = 0; int allocsize = 0; int i; @@ -1979,20 +1955,15 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - TransactionId xid; FullTransactionId fxid; + TransactionId xid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; Assert(gxact->inredo); - xid = gxact->xid; - - /* - * All two-phase files with past and future epoch in pg_twophase are - * gone at this point, so we're OK to rely on only the current epoch. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = gxact->fxid; + xid = XidFromFullTransactionId(fxid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2055,31 +2026,16 @@ void StandbyRecoverPreparedTransactions(void) { int i; - uint32 epoch; - FullTransactionId nextFullXid; - - /* get current epoch */ - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - TransactionId xid; - FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; Assert(gxact->inredo); - xid = gxact->xid; - - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(gxact->fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf != NULL) @@ -2108,17 +2064,10 @@ void RecoverPreparedTransactions(void) { int i; - uint32 epoch; - FullTransactionId nextFullXid; - - /* get current epoch */ - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { - TransactionId xid; FullTransactionId fxid; char *buf; GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; @@ -2128,12 +2077,6 @@ RecoverPreparedTransactions(void) const char *gid; /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ - xid = gxact->xid; - - /* * Reconstruct subtrans state for the transaction --- needed because * pg_subtrans is not preserved over a restart. Note that we are * linking all the subtransactions directly to the top-level XID; @@ -2142,18 +2085,20 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - buf = ProcessTwoPhaseBuffer(fxid, + buf = ProcessTwoPhaseBuffer(gxact->fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); if (buf == NULL) continue; ereport(LOG, - (errmsg("recovering prepared transaction %u from shared memory", xid))); + (errmsg("recovering prepared transaction %u of epoch %u from shared memory", + XidFromFullTransactionId(gxact->fxid), + EpochFromFullTransactionId(gxact->fxid)))); hdr = (TwoPhaseFileHeader *) buf; - Assert(TransactionIdEquals(hdr->xid, xid)); + Assert(TransactionIdEquals(hdr->xid, + XidFromFullTransactionId(gxact->fxid))); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); gid = (const char *) bufptr; bufptr += MAXALIGN(hdr->gidlen); @@ -2169,7 +2114,7 @@ RecoverPreparedTransactions(void) * Recreate its GXACT and dummy PGPROC. But, check whether it was * added in redo and already has a shmem entry for it. */ - MarkAsPreparingGuts(gxact, xid, gid, + MarkAsPreparingGuts(gxact, gxact->fxid, gid, hdr->prepared_at, hdr->owner, hdr->database); @@ -2184,7 +2129,7 @@ RecoverPreparedTransactions(void) /* * Recover other state (notably locks) using resource managers. */ - ProcessRecords(bufptr, xid, twophase_recover_callbacks); + ProcessRecords(bufptr, fxid, twophase_recover_callbacks); /* * Release locks held by the standby process after we process each @@ -2192,7 +2137,7 @@ RecoverPreparedTransactions(void) * additional locks at any one time. */ if (InHotStandby) - StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids); /* * We're done with recovering this transaction. Clear MyLockedGxact, @@ -2226,20 +2171,22 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, bool setParent, bool setNextXid) { FullTransactionId nextXid = TransamVariables->nextXid; + TransactionId xid = XidFromFullTransactionId(fxid); TransactionId *subxids; char *buf; TwoPhaseFileHeader *hdr; int i; - TransactionId xid = XidFromFullTransactionId(fxid); + Assert(InRecovery); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); /* - * Reject full XID if too new. Note that this discards files from future - * epochs. + * Reject future XIDs and delete their files; WAL will recreate them if + * needed. This is a normal outcome if the base backup process copied + * twophase files after a long time copying other files. */ if (FullTransactionIdFollowsOrEquals(fxid, nextXid)) { @@ -2255,13 +2202,23 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, ereport(WARNING, (errmsg("removing future two-phase state from memory for transaction %u", xid))); - PrepareRedoRemove(xid, true); + PrepareRedoRemoveFull(fxid, true); } return NULL; } - /* Discard files from past epochs */ - if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid)) + /* + * Reject XIDs for which we've already trimmed clog. This should not + * happen, because redo begins at a moment in the XID assignments before + * any pg_twophase file that existed during the base backup. + * + * Since AssignTransactionId() ensures subxact XIDs follow the top XID, an + * allowable top XID implies allowable subxact XIDs. + * + * FIXME reimplement not in terms of epochs + */ + if (EpochFromFullTransactionId(fxid) + 1 < + EpochFromFullTransactionId(nextXid)) { if (fromdisk) { @@ -2275,13 +2232,14 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, ereport(WARNING, (errmsg("removing past two-phase state from memory for transaction %u", xid))); - PrepareRedoRemove(xid, true); + PrepareRedoRemoveFull(fxid, true); } return NULL; } - /* Already processed? */ - if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + /* Reject already-resolved XIDs, deleting their files. FIXME can this happen? */ + if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) || + TransactionIdDidAbort(XidFromFullTransactionId(fxid))) { if (fromdisk) { @@ -2295,7 +2253,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, ereport(WARNING, (errmsg("removing stale two-phase state from memory for transaction %u", xid))); - PrepareRedoRemove(xid, true); + PrepareRedoRemoveFull(fxid, true); } return NULL; } @@ -2303,7 +2261,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, if (fromdisk) { /* Read and validate file */ - buf = ReadTwoPhaseFile(xid, false); + buf = ReadTwoPhaseFile(fxid, false); } else { @@ -2536,8 +2494,9 @@ RecordTransactionAbortPrepared(TransactionId xid, * data, the entry is marked as located on disk. */ void -PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, - XLogRecPtr end_lsn, RepOriginId origin_id) +PrepareRedoAdd(FullTransactionId fxid, char *buf, + XLogRecPtr start_lsn, XLogRecPtr end_lsn, + RepOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2545,7 +2504,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, GlobalTransaction gxact; Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); - Assert(RecoveryInProgress()); + Assert(InRecovery); + + if (!FullTransactionIdIsValid(fxid)) + fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid, + hdr->xid); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); gid = (const char *) bufptr; @@ -2574,10 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, if (!XLogRecPtrIsInvalid(start_lsn)) { char path[MAXPGPATH]; - FullTransactionId fxid; - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(hdr->xid); + Assert(InRecovery); TwoPhaseFilePath(path, fxid); if (access(path, F_OK) == 0) @@ -2609,7 +2570,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, gxact->prepared_at = hdr->prepared_at; gxact->prepare_start_lsn = start_lsn; gxact->prepare_end_lsn = end_lsn; - gxact->xid = hdr->xid; + gxact->fxid = fxid; gxact->owner = hdr->owner; gxact->locking_backend = INVALID_PROC_NUMBER; gxact->valid = false; @@ -2628,7 +2589,9 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, false /* backward */ , false /* WAL */ ); } - elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); + elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u", + XidFromFullTransactionId(gxact->fxid), + EpochFromFullTransactionId(gxact->fxid)); } /* @@ -2640,8 +2603,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState * is updated. */ -void -PrepareRedoRemove(TransactionId xid, bool giveWarning) +static void +PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning) { GlobalTransaction gxact = NULL; int i; @@ -2654,7 +2617,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) { gxact = TwoPhaseState->prepXacts[i]; - if (gxact->xid == xid) + if (FullTransactionIdEquals(gxact->fxid, fxid)) { Assert(gxact->inredo); found = true; @@ -2671,20 +2634,25 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) /* * And now we can clean up any files we may have left. */ - elog(DEBUG2, "removing 2PC data for transaction %u", xid); - if (gxact->ondisk) - { - FullTransactionId fxid; + elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ", + XidFromFullTransactionId(fxid), + EpochFromFullTransactionId(fxid)); - /* - * We should deal with a file at the current epoch here. - */ - fxid = FullTransactionIdFromCurrentEpoch(xid); + if (gxact->ondisk) RemoveTwoPhaseFile(fxid, giveWarning); - } + RemoveGXact(gxact); } +void +PrepareRedoRemove(TransactionId xid, bool giveWarning) +{ + FullTransactionId fxid = + FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid); + + PrepareRedoRemoveFull(fxid, giveWarning); +} + /* * LookupGXact * Check if the prepared transaction with the given GID, lsn and timestamp @@ -2729,7 +2697,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, * between publisher and subscriber. */ if (gxact->ondisk) - buf = ReadTwoPhaseFile(gxact->xid, false); + buf = ReadTwoPhaseFile(gxact->fxid, false); else { Assert(gxact->prepare_start_lsn); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d331ab9..8cd0c8b 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2512,7 +2512,7 @@ static void PrepareTransaction(void) { TransactionState s = CurrentTransactionState; - TransactionId xid = GetCurrentTransactionId(); + FullTransactionId fxid = GetCurrentFullTransactionId(); GlobalTransaction gxact; TimestampTz prepared_at; @@ -2641,7 +2641,7 @@ PrepareTransaction(void) * Reserve the GID for this transaction. This could fail if the requested * GID is invalid or already in use. */ - gxact = MarkAsPreparing(xid, prepareGID, prepared_at, + gxact = MarkAsPreparing(fxid, prepareGID, prepared_at, GetUserId(), MyDatabaseId); prepareGID = NULL; @@ -2691,7 +2691,7 @@ PrepareTransaction(void) * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would * conclude "xact already committed or aborted" for our locks. */ - PostPrepare_Locks(xid); + PostPrepare_Locks(fxid); /* * Let others know about no transaction in progress by me. This has to be @@ -2733,9 +2733,9 @@ PrepareTransaction(void) PostPrepare_smgr(); - PostPrepare_MultiXact(xid); + PostPrepare_MultiXact(fxid); - PostPrepare_PredicateLocks(xid); + PostPrepare_PredicateLocks(fxid); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, @@ -6408,7 +6408,8 @@ xact_redo(XLogReaderState *record) * gxact entry. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - PrepareRedoAdd(XLogRecGetData(record), + PrepareRedoAdd(InvalidFullTransactionId, + XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr, XLogRecGetOrigin(record)); diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3596af0..91b6a91 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) FullTransactionId XLogRecGetFullXid(XLogReaderState *record) { - TransactionId xid, - next_xid; - uint32 epoch; - /* * This function is only safe during replay, because it depends on the * replay state. See AdvanceNextFullTransactionIdPastXid() for more. */ Assert(AmStartupProcess() || !IsUnderPostmaster); - xid = XLogRecGetXid(record); - next_xid = XidFromFullTransactionId(TransamVariables->nextXid); - epoch = EpochFromFullTransactionId(TransamVariables->nextXid); - - /* - * If xid is numerically greater than next_xid, it has to be from the last - * epoch. - */ - if (unlikely(xid > next_xid)) - --epoch; - - return FullTransactionIdFromEpochAndXid(epoch, xid); + return FullTransactionIdFromAllowableAt(TransamVariables->nextXid, + XLogRecGetXid(record)); } #endif diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 3e2f98b..e9bac94 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -3476,9 +3476,9 @@ AtPrepare_Locks(void) * but that probably costs more cycles. */ void -PostPrepare_Locks(TransactionId xid) +PostPrepare_Locks(FullTransactionId fxid) { - PGPROC *newproc = TwoPhaseGetDummyProc(xid, false); + PGPROC *newproc = TwoPhaseGetDummyProc(fxid, false); HASH_SEQ_STATUS status; LOCALLOCK *locallock; LOCK *lock; @@ -4261,11 +4261,11 @@ DumpAllLocks(void) * and PANIC anyway. */ void -lock_twophase_recover(TransactionId xid, uint16 info, +lock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; - PGPROC *proc = TwoPhaseGetDummyProc(xid, false); + PGPROC *proc = TwoPhaseGetDummyProc(fxid, false); LOCKTAG *locktag; LOCKMODE lockmode; LOCKMETHODID lockmethodid; @@ -4442,7 +4442,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, * starting up into hot standby mode. */ void -lock_twophase_standby_recover(TransactionId xid, uint16 info, +lock_twophase_standby_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; @@ -4461,7 +4461,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info, if (lockmode == AccessExclusiveLock && locktag->locktag_type == LOCKTAG_RELATION) { - StandbyAcquireAccessExclusiveLock(xid, + StandbyAcquireAccessExclusiveLock(XidFromFullTransactionId(fxid), locktag->locktag_field1 /* dboid */ , locktag->locktag_field2 /* reloid */ ); } @@ -4474,11 +4474,11 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info, * Find and release the lock indicated by the 2PC record. */ void -lock_twophase_postcommit(TransactionId xid, uint16 info, +lock_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; - PGPROC *proc = TwoPhaseGetDummyProc(xid, true); + PGPROC *proc = TwoPhaseGetDummyProc(fxid, true); LOCKTAG *locktag; LOCKMETHODID lockmethodid; LockMethod lockMethodTable; @@ -4500,10 +4500,10 @@ lock_twophase_postcommit(TransactionId xid, uint16 info, * This is actually just the same as the COMMIT case. */ void -lock_twophase_postabort(TransactionId xid, uint16 info, +lock_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { - lock_twophase_postcommit(xid, info, recdata, len); + lock_twophase_postcommit(fxid, info, recdata, len); } /* diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 5b21a05..928647d 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -191,7 +191,7 @@ * AtPrepare_PredicateLocks(void); * PostPrepare_PredicateLocks(TransactionId xid); * PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); - * predicatelock_twophase_recover(TransactionId xid, uint16 info, + * predicatelock_twophase_recover(FullTransactionId fxid, uint16 info, * void *recdata, uint32 len); */ @@ -4846,7 +4846,7 @@ AtPrepare_PredicateLocks(void) * anyway. We only need to clean up our local state. */ void -PostPrepare_PredicateLocks(TransactionId xid) +PostPrepare_PredicateLocks(FullTransactionId fxid) { if (MySerializableXact == InvalidSerializableXact) return; @@ -4869,12 +4869,12 @@ PostPrepare_PredicateLocks(TransactionId xid) * commits or aborts. */ void -PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) +PredicateLockTwoPhaseFinish(FullTransactionId fxid, bool isCommit) { SERIALIZABLEXID *sxid; SERIALIZABLEXIDTAG sxidtag; - sxidtag.xid = xid; + sxidtag.xid = XidFromFullTransactionId(fxid); LWLockAcquire(SerializableXactHashLock, LW_SHARED); sxid = (SERIALIZABLEXID *) @@ -4896,10 +4896,11 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit) * Re-acquire a predicate lock belonging to a transaction that was prepared. */ void -predicatelock_twophase_recover(TransactionId xid, uint16 info, +predicatelock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhasePredicateRecord *record; + TransactionId xid = XidFromFullTransactionId(fxid); Assert(len == sizeof(TwoPhasePredicateRecord)); diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c index 09247ba..8f5945a 100644 --- a/src/backend/utils/activity/pgstat_relation.c +++ b/src/backend/utils/activity/pgstat_relation.c @@ -731,7 +731,7 @@ PostPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state) * Load the saved counts into our local pgstats state. */ void -pgstat_twophase_postcommit(TransactionId xid, uint16 info, +pgstat_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata; @@ -767,7 +767,7 @@ pgstat_twophase_postcommit(TransactionId xid, uint16 info, * as aborted. */ void -pgstat_twophase_postabort(TransactionId xid, uint16 info, +pgstat_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len) { TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata; diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c index 4736755..20b28b2 100644 --- a/src/backend/utils/adt/xid8funcs.c +++ b/src/backend/utils/adt/xid8funcs.c @@ -97,15 +97,11 @@ static bool TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid) { TransactionId xid = XidFromFullTransactionId(fxid); - uint32 now_epoch; - TransactionId now_epoch_next_xid; FullTransactionId now_fullxid; - TransactionId oldest_xid; - FullTransactionId oldest_fxid; + TransactionId oldest_clog_xid; + FullTransactionId oldest_clog_fxid; now_fullxid = ReadNextFullTransactionId(); - now_epoch_next_xid = XidFromFullTransactionId(now_fullxid); - now_epoch = EpochFromFullTransactionId(now_fullxid); if (extracted_xid != NULL) *extracted_xid = xid; @@ -135,52 +131,19 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid) /* * If fxid is not older than TransamVariables->oldestClogXid, the relevant - * CLOG entry is guaranteed to still exist. Convert - * TransamVariables->oldestClogXid into a FullTransactionId to compare it - * with fxid. Determine the right epoch knowing that oldest_fxid - * shouldn't be more than 2^31 older than now_fullxid. - */ - oldest_xid = TransamVariables->oldestClogXid; - Assert(TransactionIdPrecedesOrEquals(oldest_xid, now_epoch_next_xid)); - if (oldest_xid <= now_epoch_next_xid) - { - oldest_fxid = FullTransactionIdFromEpochAndXid(now_epoch, oldest_xid); - } - else - { - Assert(now_epoch > 0); - oldest_fxid = FullTransactionIdFromEpochAndXid(now_epoch - 1, oldest_xid); - } - return !FullTransactionIdPrecedes(fxid, oldest_fxid); -} - -/* - * Convert a TransactionId obtained from a snapshot held by the caller to a - * FullTransactionId. Use next_fxid as a reference FullTransactionId, so that - * we can compute the high order bits. It must have been obtained by the - * caller with ReadNextFullTransactionId() after the snapshot was created. - */ -static FullTransactionId -widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid) -{ - TransactionId next_xid = XidFromFullTransactionId(next_fxid); - uint32 epoch = EpochFromFullTransactionId(next_fxid); - - /* Special transaction ID. */ - if (!TransactionIdIsNormal(xid)) - return FullTransactionIdFromEpochAndXid(0, xid); - - /* - * The 64 bit result must be <= next_fxid, since next_fxid hadn't been - * issued yet when the snapshot was created. Every TransactionId in the - * snapshot must therefore be from the same epoch as next_fxid, or the - * epoch before. We know this because next_fxid is never allow to get - * more than one epoch ahead of the TransactionIds in any snapshot. + * CLOG entry is guaranteed to still exist. + * + * TransamVariables->oldestXid governs allowable XIDs. Usually, + * oldestClogXid==oldestXid. It's also possible for oldestClogXid to + * follow oldestXid, in which case oldestXid might advance after our + * ReadNextFullTransactionId() call. If oldestXid has advanced, that + * advancement reinstated the usual oldestClogXid==oldestXid. Whether or + * not that happened, oldestClogXid is allowable relative to now_fullxid. */ - if (xid > next_xid) - epoch--; - - return FullTransactionIdFromEpochAndXid(epoch, xid); + oldest_clog_xid = TransamVariables->oldestClogXid; + oldest_clog_fxid = + FullTransactionIdFromAllowableAt(now_fullxid, oldest_clog_xid); + return !FullTransactionIdPrecedes(fxid, oldest_clog_fxid); } /* @@ -420,12 +383,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS) nxip = cur->xcnt; snap = palloc(PG_SNAPSHOT_SIZE(nxip)); - /* fill */ - snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid); - snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid); + /* + * Fill. This is the current backend's active snapshot, so MyProc->xmin + * is <= all these XIDs. As long as that remains so, oldestXid can't + * advance past any of these XIDs. Hence, these XIDs remain allowable + * relative to next_fxid. + */ + snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin); + snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax); snap->nxip = nxip; for (i = 0; i < nxip; i++) - snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid); + snap->xip[i] = + FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]); /* * We want them guaranteed to be in ascending order. This also removes diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h index 4e6b0ee..b876e98 100644 --- a/src/include/access/multixact.h +++ b/src/include/access/multixact.h @@ -11,6 +11,7 @@ #ifndef MULTIXACT_H #define MULTIXACT_H +#include "access/transam.h" #include "access/xlogreader.h" #include "lib/stringinfo.h" #include "storage/sync.h" @@ -119,7 +120,7 @@ extern int multixactmemberssyncfiletag(const FileTag *ftag, char *path); extern void AtEOXact_MultiXact(void); extern void AtPrepare_MultiXact(void); -extern void PostPrepare_MultiXact(TransactionId xid); +extern void PostPrepare_MultiXact(FullTransactionId fxid); extern Size MultiXactShmemSize(void); extern void MultiXactShmemInit(void); @@ -145,11 +146,11 @@ extern void MultiXactAdvanceNextMXact(MultiXactId minMulti, extern void MultiXactAdvanceOldest(MultiXactId oldestMulti, Oid oldestMultiDB); extern int MultiXactMemberFreezeThreshold(void); -extern void multixact_twophase_recover(TransactionId xid, uint16 info, +extern void multixact_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void multixact_twophase_postcommit(TransactionId xid, uint16 info, +extern void multixact_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void multixact_twophase_postabort(TransactionId xid, uint16 info, +extern void multixact_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); extern void multixact_redo(XLogReaderState *record); diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 0cab865..604c5d0 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -370,6 +370,43 @@ FullTransactionIdNewer(FullTransactionId a, FullTransactionId b) return b; } +/* + * Compute FullTransactionId for the given TransactionId, assuming xid was + * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was + * nextFullXid. When adding calls, evaluate what prevents xid from preceding + * oldestXid if SetTransactionIdLimit() runs between the collection of xid and + * the collection of nextFullXid. + */ +static inline FullTransactionId +FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, + TransactionId xid) +{ + uint32 epoch; + + /* Special transaction ID. */ + if (!TransactionIdIsNormal(xid)) + return FullTransactionIdFromEpochAndXid(0, xid); + + Assert(TransactionIdPrecedesOrEquals(xid, + XidFromFullTransactionId(nextFullXid))); + + /* + * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't been + * issued yet when xid was in the past. The xid must therefore be from + * the epoch of nextFullXid or the epoch before. We know this because we + * must remove (by freezing) an XID before assigning the XID half an epoch + * ahead of it. + */ + epoch = EpochFromFullTransactionId(nextFullXid); + if (xid > XidFromFullTransactionId(nextFullXid)) + { + Assert(epoch != 0); + epoch--; + } + + return FullTransactionIdFromEpochAndXid(epoch, xid); +} + #endif /* FRONTEND */ #endif /* TRANSAM_H */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 9fa8235..0ab8b3e 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -14,6 +14,7 @@ #ifndef TWOPHASE_H #define TWOPHASE_H +#include "access/transam.h" #include "access/xact.h" #include "access/xlogdefs.h" #include "datatype/timestamp.h" @@ -36,10 +37,10 @@ extern void PostPrepare_Twophase(void); extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid, bool *have_more); -extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held); -extern int TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held); +extern PGPROC *TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held); +extern int TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held); -extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, +extern GlobalTransaction MarkAsPreparing(FullTransactionId fxid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); @@ -56,8 +57,9 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); -extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, - XLogRecPtr end_lsn, RepOriginId origin_id); +extern void PrepareRedoAdd(FullTransactionId fxid, char *buf, + XLogRecPtr start_lsn, XLogRecPtr end_lsn, + RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, diff --git a/src/include/access/twophase_rmgr.h b/src/include/access/twophase_rmgr.h index 3ed154b..8f57640 100644 --- a/src/include/access/twophase_rmgr.h +++ b/src/include/access/twophase_rmgr.h @@ -14,7 +14,9 @@ #ifndef TWOPHASE_RMGR_H #define TWOPHASE_RMGR_H -typedef void (*TwoPhaseCallback) (TransactionId xid, uint16 info, +#include "access/transam.h" + +typedef void (*TwoPhaseCallback) (FullTransactionId fxid, uint16 info, void *recdata, uint32 len); typedef uint8 TwoPhaseRmgrId; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 6475889..a8c09de 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -734,9 +734,9 @@ extern void pgstat_count_heap_delete(Relation rel); extern void pgstat_count_truncate(Relation rel); extern void pgstat_update_heap_dead_tuples(Relation rel, int delta); -extern void pgstat_twophase_postcommit(TransactionId xid, uint16 info, +extern void pgstat_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, +extern void pgstat_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 1076995..3feedfc 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -18,6 +18,7 @@ #error "lock.h may not be included from frontend code" #endif +#include "access/transam.h" #include "lib/ilist.h" #include "storage/lockdefs.h" #include "storage/lwlock.h" @@ -579,7 +580,7 @@ extern bool LockHasWaiters(const LOCKTAG *locktag, extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp); extern void AtPrepare_Locks(void); -extern void PostPrepare_Locks(TransactionId xid); +extern void PostPrepare_Locks(FullTransactionId fxid); extern bool LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock); @@ -594,13 +595,13 @@ extern BlockedProcsData *GetBlockerStatusData(int blocked_pid); extern xl_standby_lock *GetRunningTransactionLocks(int *nlocks); extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode); -extern void lock_twophase_recover(TransactionId xid, uint16 info, +extern void lock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void lock_twophase_postcommit(TransactionId xid, uint16 info, +extern void lock_twophase_postcommit(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void lock_twophase_postabort(TransactionId xid, uint16 info, +extern void lock_twophase_postabort(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); -extern void lock_twophase_standby_recover(TransactionId xid, uint16 info, +extern void lock_twophase_standby_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); extern DeadLockState DeadLockCheck(PGPROC *proc); diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 267d5d9..4d3f218 100644 --- a/src/include/storage/predicate.h +++ b/src/include/storage/predicate.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_H #define PREDICATE_H +#include "access/transam.h" #include "storage/itemptr.h" #include "storage/lock.h" #include "utils/relcache.h" @@ -72,9 +73,9 @@ extern void PreCommit_CheckForSerializationFailure(void); /* two-phase commit support */ extern void AtPrepare_PredicateLocks(void); -extern void PostPrepare_PredicateLocks(TransactionId xid); -extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit); -extern void predicatelock_twophase_recover(TransactionId xid, uint16 info, +extern void PostPrepare_PredicateLocks(FullTransactionId fxid); +extern void PredicateLockTwoPhaseFinish(FullTransactionId xid, bool isCommit); +extern void predicatelock_twophase_recover(FullTransactionId fxid, uint16 info, void *recdata, uint32 len); /* parallel query support */