On 29 March 2017 at 08:11, Craig Ringer <cr...@2ndquadrant.com> wrote: > On 29 March 2017 at 08:01, Craig Ringer <cr...@2ndquadrant.com> wrote: > >> I just notice that I failed to remove the docs changes regarding >> dropping slots becoming db-specific, so I'll post a follow-up for that >> in a sec. > > Attached.
... and here's the next in the patch series. Both this and the immediately prior minor patch fix-drop-slot-docs.patch are pending now. Notable changes in this patch since review: * Split oldestCatalogXmin tracking into separate patch * Critically, fix use of procArray->replication_slot_catalog_xmin in GetSnapshotData's setting of RecentGlobalXmin and RecentGlobalDataXmin so it instead uses ShmemVariableCache->oldestCatalogXmin . This could've led to tuples newer than oldestCatalogXmin being removed. * Memory barrier in UpdateOldestCatalogXmin and SetOldestCatalogXmin. It still does a pre-check before deciding if it needs to take ProcArrayLock, recheck, and advance, since we don't want to unnecessarily contest ProcArrayLock. * Remove unnecessary volatile usage (retained in UpdateOldestCatalogXmin due to barrier) * Remove unnecessary test for XLogInsertAllowed() in XactLogCatalogXminUpdate * EnsureActiveLogicalSlotValid(void) - add (void) * pgidented changes in this diff; have left unrelated changes alone Re: > what does > > + TransactionId oldestCatalogXmin; /* oldest xid where complete catalog > state > + * > is guaranteed to still exist */ > > mean? I complained about the overall justification in the commit > already, but looking at this commit alone, the justification for this > part of the change is quite hard to understand. The patch now contains TransactionId oldestCatalogXmin; /* oldest xid it is guaranteed to be safe * to create a historic snapshot for; see * also * procArray->replication_slot_catalog_xmin * */ which I think is an improvement. I've also sought to explain the purpose of this change better with /* * If necessary, copy the current catalog_xmin needed by replication slots to * the effective catalog_xmin used for dead tuple removal and write a WAL * record recording the change. * * This allows standbys to know the oldest xid for which it is safe to create * a historic snapshot for logical decoding. VACUUM or other cleanup may have * removed catalog tuple versions needed to correctly decode transactions older * than this threshold. Standbys can use this information to cancel conflicting * decoding sessions and invalidate slots that need discarded information. * * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for * this, since they don't identify the relation as a catalog or not. Nor can a * standby look up the relcache to get the Relation for the affected * relfilenode to check if it is a catalog. The standby would also have no way * to know the oldest safe position at startup if it wasn't in the control * file.) */ void UpdateOldestCatalogXmin(void) { ... Does that help? (Sidenote for later: ResolveRecoveryConflictWithLogicalDecoding will need a read barrier too, when the next patch adds it.) -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 4b8e3aaa52539ef8cf3c79d1ed0319cc44800a32 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 22 Mar 2017 13:36:49 +0800 Subject: [PATCH] Log catalog_xmin advances before removing catalog tuples Write a WAL record before advancing the oldest catalog_xmin preserved by VACUUM and other tuple removal. Previously GetOldestXmin would use procArray->replication_slot_catalog_xmin as the xid limit for vacuuming catalog tuples, so it was not possible for standbys to determine whether all catalog tuples needed for a catalog snapshot for a given xid would still exist. Logging catalog_xmin advances allows standbys to determine if a logical slot on the standby has become unsafe to use. It can then refuse to start logical decoding on that slot or, if decoding is in progress, raise a conflict with recovery. Note that we only emit new WAL records if catalog_xmin changes, which happens due to changes in slot state. So this won't generate WAL whenever oldestXmin advances. --- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/rmgrdesc/xactdesc.c | 9 +++ src/backend/access/transam/varsup.c | 14 ++++ src/backend/access/transam/xact.c | 35 ++++++++ src/backend/access/transam/xlog.c | 12 ++- src/backend/commands/vacuum.c | 9 +++ src/backend/postmaster/bgwriter.c | 10 +++ src/backend/replication/logical/decode.c | 11 +++ src/backend/replication/logical/logical.c | 38 +++++++++ src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 13 +++ src/backend/storage/ipc/procarray.c | 119 +++++++++++++++++++++++++--- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/transam.h | 6 ++ src/include/access/xact.h | 12 ++- src/include/catalog/pg_control.h | 1 + src/include/storage/procarray.h | 5 +- src/test/recovery/t/006_logical_decoding.pl | 12 ++- 18 files changed, 294 insertions(+), 19 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index d7f65a5..36bbb98 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state) if (!state->rs_logical_rewrite) return; - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + /* Use the catalog_xmin being retained by vacuum */ + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); /* * If there are no logical slots in progress we don't need to do anything, diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 735f8c5..96ea163 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin); + } } const char * @@ -324,6 +330,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_CATALOG_XMIN_ADV: + id = "CATALOG_XMIN"; + break; } return id; diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 5efbfbd..6cf939f 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -414,6 +414,20 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) } } +/* + * Set the global oldest catalog_xmin used to determine when tuples + * may be removed from catalogs and user-catalogs accessible from logical + * decoding. + * + * Only to be called from the startup process or by UpdateOldestCatalogXmin(), + * which ensures the update is properly written to xlog first. + */ +void +SetOldestCatalogXmin(TransactionId oldestCatalogXmin) +{ + pg_write_barrier(); + ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin; +} /* * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating? diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c8751c6..0e3b870 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5652,6 +5652,41 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + /* + * Apply the new catalog_xmin limit immediately. New decoding sessions + * will refuse to start if their slot is past it, and old ones will + * notice when we signal them with a recovery conflict. There's no + * effect on the catalogs themselves yet, so it's safe for backends + * with older catalog_xmins to still exist. + * + * We don't have to take ProcArrayLock since only the startup process + * is allowed to change oldestCatalogXmin when we're in recovery. + * + * Existing sessions are not notified and must check the safe xmin. + */ + SetOldestCatalogXmin(xlrec->new_catalog_xmin); + + } else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * Record when we advance the catalog_xmin used for tuple removal + * so standbys find out before we remove catalog tuples they might + * need for logical decoding. + */ +XLogRecPtr +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin) +{ + xl_xact_catalog_xmin_advance xlrec; + xlrec.new_catalog_xmin = new_catalog_xmin; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance); + return XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f09..a3ac2c1 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5008,6 +5008,7 @@ BootStrapXLOG(void) checkPoint.nextMultiOffset = 0; checkPoint.oldestXid = FirstNormalTransactionId; checkPoint.oldestXidDB = TemplateDbOid; + checkPoint.oldestCatalogXmin = InvalidTransactionId; checkPoint.oldestMulti = FirstMultiXactId; checkPoint.oldestMultiDB = TemplateDbOid; checkPoint.oldestCommitTsXid = InvalidTransactionId; @@ -5021,6 +5022,7 @@ BootStrapXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); @@ -6611,6 +6613,9 @@ StartupXLOG(void) (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u", checkPoint.oldestXid, checkPoint.oldestXidDB))); ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, (errmsg_internal("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); ereport(DEBUG1, @@ -6628,6 +6633,7 @@ StartupXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid); @@ -8702,6 +8708,7 @@ CreateCheckPoint(int flags) checkPoint.nextXid = ShmemVariableCache->nextXid; checkPoint.oldestXid = ShmemVariableCache->oldestXid; checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB; + checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin; LWLockRelease(XidGenLock); LWLockAcquire(CommitTsLock, LW_SHARED); @@ -9631,6 +9638,7 @@ xlog_redo(XLogReaderState *record) * redo an xl_clog_truncate if it changed since initialization. */ SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); /* * If we see a shutdown checkpoint while waiting for an end-of-backup @@ -9729,8 +9737,8 @@ xlog_redo(XLogReaderState *record) checkPoint.oldestMultiDB); if (TransactionIdPrecedes(ShmemVariableCache->oldestXid, checkPoint.oldestXid)) - SetTransactionIdLimit(checkPoint.oldestXid, - checkPoint.oldestXidDB); + SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 9fbb0eb..ae41dc3 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -518,6 +518,15 @@ vacuum_set_xid_limits(Relation rel, MultiXactId safeMxactLimit; /* + * When logical decoding is enabled, we must write any advance of + * catalog_xmin to xlog before we allow VACUUM to remove those tuples. + * This ensures that any standbys doing logical decoding can cancel + * decoding sessions and invalidate slots if we remove tuples they + * still need. + */ + UpdateOldestCatalogXmin(); + + /* * We can always ignore processes running lazy vacuum. This is because we * use these values only for deciding which tuples we must keep in the * tables. Since lazy vacuum doesn't write its XID anywhere, it's safe to diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index dcb4cf2..df239e0 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -51,6 +51,7 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" @@ -295,6 +296,15 @@ BackgroundWriterMain(void) } /* + * Eagerly advance the catalog_xmin used by vacuum if we're not + * a standby. This ensures that standbys waiting for catalog_xmin + * confirmation receive it promptly, even if we haven't had a + * recent vacuum run. + */ + if (!RecoveryInProgress()) + UpdateOldestCatalogXmin(); + + /* * Log a new xl_running_xacts every now and then so replication can * get into a consistent state faster (think of suboverflowed * snapshots) and clean up resources (locks, KnownXids*) more diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5c13d26..07a120d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -288,6 +288,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; + case XLOG_XACT_CATALOG_XMIN_ADV: + /* + * The global catalog_xmin has been advanced. By the time we see + * this in logical decoding it no longer matters, since it's + * guaranteed that all later records will be consistent with the + * advanced catalog_xmin, so we ignore it here. If we were running + * on a standby and it applied a catalog xmin advance past our + * needed catalog_xmin we would've already been terminated with a + * conflict with standby error. + */ + break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..28d04d1 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -68,6 +68,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -126,6 +128,8 @@ StartupDecodingContext(List *output_plugin_options, /* shorter lines... */ slot = MyReplicationSlot; + EnsureActiveLogicalSlotValid(); + context = AllocSetContextCreate(CurrentMemoryContext, "Logical decoding context", ALLOCSET_DEFAULT_SIZES); @@ -963,3 +967,37 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid(void) +{ + Assert(MyReplicationSlot != NULL); + + /* + * Currently a logical can only become unusable if we're doing logical + * decoding on standby and the master advanced its catalog_xmin past the + * threshold we need, removing tuples that we'll require to start decoding + * at our restart_lsn. + */ + if (RecoveryInProgress()) + { + /* + * Check if enough catalog is retained for this slot. No locking is + * needed here since oldestCatalogXmin can only advance, so if it's + * past what we need that's not going to change. We have marked our + * slot as active so redo won't replay past our catalog_xmin without + * first terminating our session. + */ + TransactionId shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)))); + } +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 771ac30..c2ad791 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1233,7 +1233,7 @@ XLogWalRcvSendHSFeedback(bool immed) xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); - ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); + ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin); if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cfc3fba..cdc5f95 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1658,6 +1658,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) * be energy wasted - the worst lost information can do here is give us * wrong information in a statistics view - we'll just potentially be more * conservative in removing files. + * + * We don't have to do any effective_xmin / effective_catalog_xmin testing + * here either, like for LogicalConfirmReceivedLocation. If we received + * the xmin and catalog_xmin from downstream replication slots we know they + * were already confirmed there, */ } @@ -1778,6 +1783,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + /* + * If the physical slot is relaying catalog_xmin for logical replication + * slots on the replica it's safe to act on catalog_xmin advances + * immediately too. The replica will only send a new catalog_xmin via + * feedback when it advances its effective_catalog_xmin, so it's done the + * delay-until-confirmed dance for us and knows it won't need the data + * we're protecting from vacuum again. + */ if (!TransactionIdIsNormal(slot->data.catalog_xmin) || !TransactionIdIsNormal(feedbackCatalogXmin) || TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 7c2e1e1..381c230 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -87,7 +87,11 @@ typedef struct ProcArrayStruct /* oldest xmin of any replication slot */ TransactionId replication_slot_xmin; - /* oldest catalog xmin of any replication slot */ + /* + * Oldest catalog xmin of any replication slot + * + * See also ShmemVariableCache->oldestGlobalXmin + */ TransactionId replication_slot_catalog_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ @@ -1306,6 +1310,9 @@ TransactionIdIsActive(TransactionId xid) * The return value is also adjusted with vacuum_defer_cleanup_age, so * increasing that setting on the fly is another easy way to make * GetOldestXmin() move backwards, with no consequences for data integrity. + * + * When changing GetOldestXmin, check to see whether RecentGlobalXmin + * computation in GetSnapshotData also needs changing. */ TransactionId GetOldestXmin(Relation rel, int flags) @@ -1444,6 +1451,79 @@ GetOldestXmin(Relation rel, int flags) } /* + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated + * to reflect an advance in procArray->replication_slot_catalog_xmin or + * it becoming newly set or unset. + * + */ +static bool +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin) +{ + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin) + || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin))); +} + +/* + * If necessary, copy the current catalog_xmin needed by replication slots to + * the effective catalog_xmin used for dead tuple removal and write a WAL + * record recording the change. + * + * This allows standbys to know the oldest xid for which it is safe to create + * a historic snapshot for logical decoding. VACUUM or other cleanup may have + * removed catalog tuple versions needed to correctly decode transactions older + * than this threshold. Standbys can use this information to cancel conflicting + * decoding sessions and invalidate slots that need discarded information. + * + * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for + * this, since they don't identify the relation as a catalog or not. Nor can a + * standby look up the relcache to get the Relation for the affected + * relfilenode to check if it is a catalog. The standby would also have no way + * to know the oldest safe position at startup if it wasn't in the control + * file.) + */ +void +UpdateOldestCatalogXmin(void) +{ + TransactionId vacuum_catalog_xmin; + TransactionId slots_catalog_xmin; + + Assert(XLogInsertAllowed()); + + /* + * Do an unlocked check to see if there's a new catalog_xmin in procarray, + * so we can avoid taking a lock and writing xlog if they're unchanged, + * as is most likely. + * + * The read barrier is for oldestCatalogXmin, we don't care whether we see + * the very latest replication_slot_catalog_xmin or not. + */ + pg_read_barrier(); + vacuum_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + slots_catalog_xmin = procArray->replication_slot_catalog_xmin; + + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + { + XactLogCatalogXminUpdate(slots_catalog_xmin); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + /* + * A concurrent updater could've changed the oldestCatalogXmin so we + * need to re-check under ProcArrayLock before updating. The LWLock + * provides a barrier. + * + * We must not re-read replication_slot_catalog_xmin even if it has + * advanced, since we xlog'd the older value. A later check will + * advance it again. + */ + vacuum_catalog_xmin = *((volatile TransactionId *) &ShmemVariableCache->oldestCatalogXmin); + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + SetOldestCatalogXmin(slots_catalog_xmin); + LWLockRelease(ProcArrayLock); + } +} + +/* * GetMaxSnapshotXidCount -- get max size for snapshot XID array * * We have to export this for use by snapmgr.c. @@ -1493,7 +1573,8 @@ GetMaxSnapshotSubxidCount(void) * older than this are known not running any more. * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions, except those running LAZY VACUUM). This is - * the same computation done by GetOldestXmin(true, true). + * the same computation done by + * GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_FLAGS_VACUUM) * RecentGlobalDataXmin: the global xmin for non-catalog tables * >= RecentGlobalXmin * @@ -1700,7 +1781,7 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; - replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1711,6 +1792,9 @@ GetSnapshotData(Snapshot snapshot) * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. + * + * If you change computation of RecentGlobalXmin here you may need to + * change GetOldestXmin(...) as well. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; @@ -2168,14 +2252,14 @@ GetOldestSafeDecodingTransactionId(void) oldestSafeXid = ShmemVariableCache->nextXid; /* - * If there's already a slot pegging the xmin horizon, we can start with - * that value, it's guaranteed to be safe since it's computed by this - * routine initially and has been enforced since. + * If there's already an effectiveCatalogXmin held down by an existing + * replication slot it's definitely safe to start there, and it can't + * advance while we hold ProcArrayLock. */ - if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && - TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, + if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) && + TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, oldestSafeXid)) - oldestSafeXid = procArray->replication_slot_catalog_xmin; + oldestSafeXid = ShmemVariableCache->oldestCatalogXmin; /* * If we're not in recovery, we walk over the procarray and collect the @@ -2965,18 +3049,29 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, * * Return the current slot xmin limits. That's useful to be able to remove * data that's older than those limits. + * + * For logical replication slots' catalog_xmin, we return both the effective + * catalog_xmin being used for tuple removal (retained catalog_xmin) and the + * catalog_xmin actually needed by replication slots (needed_catalog_xmin). + * retained_catalog_xmin should be older than needed_catalog_xmin but is not + * guaranteed to be if there are replication slots on a replica currently + * attempting to start up and reserve catalogs. */ void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin) + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin) { LWLockAcquire(ProcArrayLock, LW_SHARED); if (xmin != NULL) *xmin = procArray->replication_slot_xmin; - if (catalog_xmin != NULL) - *catalog_xmin = procArray->replication_slot_catalog_xmin; + if (retained_catalog_xmin != NULL) + *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (needed_catalog_xmin != NULL) + *needed_catalog_xmin = procArray->replication_slot_catalog_xmin; LWLockRelease(ProcArrayLock); } diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 2ea8931..5c7eb77 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -248,6 +248,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"), + ControlFile->checkPointCopy.oldestCatalogXmin); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/transam.h b/src/include/access/transam.h index d25a2dd..fe5e67c 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -134,6 +134,11 @@ typedef struct VariableCacheData */ TransactionId latestCompletedXid; /* newest XID that has committed or * aborted */ + TransactionId oldestCatalogXmin; /* oldest xid it is guaranteed to be safe + * to create a historic snapshot for; see + * also + * procArray->replication_slot_catalog_xmin + * */ /* * These fields are protected by CLogTruncationLock @@ -180,6 +185,7 @@ extern TransactionId ReadNewTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid); extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid); +extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin); extern bool ForceTransactionIdLimitUpdate(void); extern Oid GetNewObjectId(void); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5b37c05..6d18d18 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -137,7 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_CATALOG_XMIN_ADV 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -187,6 +187,13 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +typedef struct xl_xact_catalog_xmin_advance +{ + TransactionId new_catalog_xmin; +} xl_xact_catalog_xmin_advance; + +#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId)) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we @@ -391,6 +398,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, int xactflags, TransactionId twophase_xid); + +extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 3a25cc8..1fe89ae 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -45,6 +45,7 @@ typedef struct CheckPoint MultiXactOffset nextMultiOffset; /* next free MultiXact offset */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ Oid oldestXidDB; /* database with minimum datfrozenxid */ + TransactionId oldestCatalogXmin; /* catalog retained after this xid */ MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 9b42e49..69a82d7 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -120,6 +120,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked); extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin); + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin); + +extern void UpdateOldestCatalogXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index bf9b50a..f38b38a 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 25; # Initialize master node my $node_master = get_new_node('master'); @@ -17,6 +17,10 @@ $node_master->append_conf( wal_level = logical )); $node_master->start; + +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after start"); + my $backup_name = 'master_backup'; $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]); @@ -96,9 +100,15 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero"); is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint"); # done with the node $node_master->stop; -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers