On 5 April 2017 at 04:19, Andres Freund <and...@anarazel.de> wrote: > On 2017-04-04 22:32:40 +0800, Craig Ringer wrote: >> I'm much happier with this. I'm still fixing some issues in the tests >> for 03 and tidying them up, but 03 should allow 01 and 02 to be >> reviewed in their proper context now. > > To me this very clearly is too late for v10, and now should be moved to > the next CF.
I tend to agree that it's late in the piece. It's still worth cleaning it up into a state ready for early pg11 though. I've just fixed an issue where hot_standby_feedback on a physical slot could cause oldestCatalogXmin to go backwards. When the slot's catalog_xmin was 0 and is being set for the first time the standby's supplied catalog_xmin is trusted. To fix it, in PhysicalReplicationSlotNewXmin when setting catalog_xmin from 0, clamp the value to the master's GetOldestSafeDecodingTransactionId(). Tests are cleaned up and fixed. This series adds full support for logical decoding on a standby. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 24e2baea15c4f435789c7fda5ddc9feae8a7012f Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 22 Mar 2017 13:36:49 +0800 Subject: [PATCH 1/3] Log catalog_xmin advances before removing catalog tuples Before advancing the effective catalog_xmin we use to remove old catalog tuple versions, make sure it is written to WAL. This allows standbys to know the oldest xid they can safely create a historic snapshot for. They can then refuse to start decoding from a slot or raise a recovery conflict. The catalog_xmin advance is logged in a new xl_catalog_xmin_advance record, emitted before vacuum or periodically by the bgwriter. WAL is only written if the lowest catalog_xmin needed by any replication slot has advanced. --- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/rmgrdesc/xactdesc.c | 9 ++ src/backend/access/rmgrdesc/xlogdesc.c | 3 +- src/backend/access/transam/varsup.c | 15 ++++ src/backend/access/transam/xact.c | 36 ++++++++ src/backend/access/transam/xlog.c | 23 ++++- src/backend/postmaster/bgwriter.c | 9 ++ src/backend/replication/logical/decode.c | 12 +++ src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 46 +++++++++- src/backend/storage/ipc/procarray.c | 134 ++++++++++++++++++++++++++-- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/transam.h | 5 ++ src/include/access/xact.h | 12 ++- src/include/catalog/pg_control.h | 1 + src/include/storage/procarray.h | 5 +- src/test/recovery/t/006_logical_decoding.pl | 90 +++++++++++++++++-- 17 files changed, 383 insertions(+), 24 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index d7f65a5..d1400ec 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state) if (!state->rs_logical_rewrite) return; - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + /* Use oldestCatalogXmin here */ + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); /* * If there are no logical slots in progress we don't need to do anything, diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 735f8c5..96ea163 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin); + } } const char * @@ -324,6 +330,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_CATALOG_XMIN_ADV: + id = "CATALOG_XMIN"; + break; } return id; diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 5f07eb1..a66cfc6 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -47,7 +47,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " - "oldest running xid %u; %s", + "oldest running xid %u; oldest catalog xmin %u; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -63,6 +63,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->oldestCommitTsXid, checkpoint->newestCommitTsXid, checkpoint->oldestActiveXid, + checkpoint->oldestCatalogXmin, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } else if (info == XLOG_NEXTOID) diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 5efbfbd..ffabf1c 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -414,6 +414,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) } } +/* + * Set the global oldest catalog_xmin used to determine when tuples + * may be removed from catalogs and user-catalogs accessible from logical + * decoding. + * + * Only to be called from the startup process or from LogCurrentRunningXacts() + * which ensures the update is properly written to xlog first. + */ +void +SetOldestCatalogXmin(TransactionId oldestCatalogXmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin; + LWLockRelease(ProcArrayLock); +} /* * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating? diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c8751c6..63453d7 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5652,6 +5652,42 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + /* + * Apply the new catalog_xmin limit immediately. New decoding sessions + * will refuse to start if their slot is past it, and old ones will + * notice when we signal them with a recovery conflict. There's no + * effect on the catalogs themselves yet, so it's safe for backends + * with older catalog_xmins to still exist. + * + * We don't have to take ProcArrayLock since only the startup process + * is allowed to change oldestCatalogXmin when we're in recovery. + * + * Existing sessions are not notified and must check the safe xmin. + */ + SetOldestCatalogXmin(xlrec->new_catalog_xmin); + + } else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * Record when we advance the catalog_xmin used for tuple removal + * so standbys find out before we remove catalog tuples they might + * need for logical decoding. + */ +XLogRecPtr +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin) +{ + xl_xact_catalog_xmin_advance xlrec; + + xlrec.new_catalog_xmin = new_catalog_xmin; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance); + return XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f09..8d713e9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5021,6 +5021,7 @@ BootStrapXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); @@ -6611,6 +6612,12 @@ StartupXLOG(void) (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u", checkPoint.oldestXid, checkPoint.oldestXidDB))); ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, (errmsg_internal("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); ereport(DEBUG1, @@ -6628,6 +6635,7 @@ StartupXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid); @@ -8537,6 +8545,9 @@ CreateCheckPoint(int flags) */ InitXLogInsert(); + /* Checkpoints are a handy time to update the effective catalog_xmin */ + UpdateOldestCatalogXmin(); + /* * Acquire CheckpointLock to ensure only one checkpoint happens at a time. * (This is just pro forma, since in the present system structure there is @@ -8726,6 +8737,10 @@ CreateCheckPoint(int flags) &checkPoint.oldestMulti, &checkPoint.oldestMultiDB); + LWLockAcquire(ProcArrayLock, LW_SHARED); + checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + /* * Having constructed the checkpoint record, ensure all shmem disk buffers * and commit-log buffers are flushed to disk. @@ -9632,6 +9647,8 @@ xlog_redo(XLogReaderState *record) */ SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); + /* * If we see a shutdown checkpoint while waiting for an end-of-backup * record, the backup was canceled and the end-of-backup record will @@ -9729,8 +9746,10 @@ xlog_redo(XLogReaderState *record) checkPoint.oldestMultiDB); if (TransactionIdPrecedes(ShmemVariableCache->oldestXid, checkPoint.oldestXid)) - SetTransactionIdLimit(checkPoint.oldestXid, - checkPoint.oldestXidDB); + SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); + /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index dcb4cf2..3bb5200 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -51,6 +51,7 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" @@ -333,6 +334,14 @@ BackgroundWriterMain(void) last_snapshot_lsn = LogStandbySnapshot(); last_snapshot_ts = now; } + + /* + * We can also advance the threshold used for catalog tuple + * cleanup, rate-limited so we don't write it too often. The delay + * slightly increases catalog bloat but reduces the volume of + * catalog_xmin advance records written. + */ + UpdateOldestCatalogXmin(); } /* diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5c13d26..b5084b9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -288,6 +288,18 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; + case XLOG_XACT_CATALOG_XMIN_ADV: + + /* + * The global catalog_xmin has been advanced. By the time we see + * this in logical decoding it no longer matters, since it's + * guaranteed that all later records will be consistent with the + * advanced catalog_xmin, so we ignore it here. If we were running + * on a standby and it applied a catalog xmin advance past our + * needed catalog_xmin we would've already been terminated with a + * conflict with standby error. + */ + break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index df93265..277f196 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed) xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); - ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); + ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin); if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dbb10c7..e64054b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1778,15 +1778,55 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + /* + * If the physical slot is relaying catalog_xmin for logical replication + * slots on the replica it's safe to act on catalog_xmin advances + * immediately too. The replica will only send a new catalog_xmin via + * feedback when it advances its effective_catalog_xmin, so it's done the + * delay-until-confirmed dance for us and knows it won't need the data + * we're protecting from vacuum again. + */ if (!TransactionIdIsNormal(slot->data.catalog_xmin) || !TransactionIdIsNormal(feedbackCatalogXmin) || TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) { + /* + * If the standby is setting a catalog_xmin for the first time we must + * check that it's within our global xmin horizon so we don't lock in a + * value we might've already removed tuples for. The standby might have + * an outdated catalog_xmin locally if it's lagging and we can't blindly + * trust it, since we'd then update oldestCatalogXmin with a value that's + * not actually safe. + */ + if (TransactionIdIsValid(feedbackCatalogXmin) && + !TransactionIdIsValid(slot->effective_catalog_xmin)) + { + TransactionId lowerBound; + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + lowerBound = GetOldestSafeDecodingTransactionId(); + if (TransactionIdPrecedes(feedbackCatalogXmin, lowerBound)) + feedbackCatalogXmin = lowerBound; + + slot->effective_catalog_xmin = feedbackCatalogXmin; + slot->data.catalog_xmin = slot->effective_catalog_xmin; + + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + } + else + { + slot->data.catalog_xmin = feedbackCatalogXmin; + slot->effective_catalog_xmin = feedbackCatalogXmin; + SpinLockRelease(&slot->mutex); + } changed = true; - slot->data.catalog_xmin = feedbackCatalogXmin; - slot->effective_catalog_xmin = feedbackCatalogXmin; } - SpinLockRelease(&slot->mutex); + else + SpinLockRelease(&slot->mutex); if (changed) { diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 7c2e1e1..9e98af8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -87,7 +87,12 @@ typedef struct ProcArrayStruct /* oldest xmin of any replication slot */ TransactionId replication_slot_xmin; - /* oldest catalog xmin of any replication slot */ + + /* + * Oldest catalog xmin of any replication slot + * + * See also ShmemVariableCache->oldestGlobalXmin + */ TransactionId replication_slot_catalog_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ @@ -1306,6 +1311,9 @@ TransactionIdIsActive(TransactionId xid) * The return value is also adjusted with vacuum_defer_cleanup_age, so * increasing that setting on the fly is another easy way to make * GetOldestXmin() move backwards, with no consequences for data integrity. + * + * When changing GetOldestXmin, check to see whether RecentGlobalXmin + * computation in GetSnapshotData also needs changing. */ TransactionId GetOldestXmin(Relation rel, int flags) @@ -1444,6 +1452,89 @@ GetOldestXmin(Relation rel, int flags) } /* + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated + * to reflect an advance in procArray->replication_slot_catalog_xmin or + * it becoming newly set or unset. + * + */ +static bool +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin) +{ + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin) + || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin))); +} + +/* + * If necessary, copy the current catalog_xmin needed by replication slots to + * the effective catalog_xmin used for dead tuple removal and write a WAL + * record recording the change. + * + * This allows standbys to know the oldest xid for which it is safe to create + * a historic snapshot for logical decoding. VACUUM or other cleanup may have + * removed catalog tuple versions needed to correctly decode transactions older + * than this threshold. Standbys can use this information to cancel conflicting + * decoding sessions and invalidate slots that need discarded information. + * + * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for + * this, since they don't identify the relation as a catalog or not. Nor can a + * standby look up the relcache to get the Relation for the affected + * relfilenode to check if it is a catalog. The standby would also have no way + * to know the oldest safe position at startup if it wasn't in the control + * file.) + */ +void +UpdateOldestCatalogXmin(void) +{ + TransactionId vacuum_catalog_xmin; + TransactionId slots_catalog_xmin; + + Assert(XLogInsertAllowed()); + + /* + * It's most likely that replication_slot_catalog_xmin and + * oldestCatalogXmin will be the same and no action is required, so do a + * pre-check before doing expensive WAL writing and exclusive locking. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + vacuum_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + slots_catalog_xmin = procArray->replication_slot_catalog_xmin; + LWLockRelease(ProcArrayLock); + + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + { + /* + * We must prevent a concurrent checkpoint, otherwise the catalog xmin + * advance xlog record with the new value might be written before the + * checkpoint but the checkpoint may still see the old + * oldestCatalogXmin value. + */ + if (!LWLockConditionalAcquire(CheckpointLock, LW_SHARED)) + /* Couldn't get checkpointer lock; will retry later */ + return; + + XactLogCatalogXminUpdate(slots_catalog_xmin); + + /* + * A concurrent updater could've changed the oldestCatalogXmin so we + * need to re-check under ProcArrayLock before updating. The LWLock + * provides a barrier. + * + * We must not re-read replication_slot_catalog_xmin even if it has + * advanced, since we xlog'd the older value. If it advanced since, a + * later run will xlog the new value and advance. + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + vacuum_catalog_xmin = *((volatile TransactionId *) &ShmemVariableCache->oldestCatalogXmin); + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + ShmemVariableCache->oldestCatalogXmin = slots_catalog_xmin; + LWLockRelease(ProcArrayLock); + + LWLockRelease(CheckpointLock); + } + +} + +/* * GetMaxSnapshotXidCount -- get max size for snapshot XID array * * We have to export this for use by snapmgr.c. @@ -1700,7 +1791,7 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; - replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1711,6 +1802,9 @@ GetSnapshotData(Snapshot snapshot) * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. + * + * If you change computation of RecentGlobalXmin here you may need to + * change GetOldestXmin(...) as well. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; @@ -2041,12 +2135,16 @@ GetRunningTransactionData(void) } /* - * It's important *not* to include the limits set by slots here because + * It's important *not* to include the xmin set by slots here because * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those * were to be included here the initial value could never increase because - * of a circular dependency where slots only increase their limits when - * running xacts increases oldestRunningXid and running xacts only + * of a circular dependency where slots only increase their xmin limits + * when running xacts increases oldestRunningXid and running xacts only * increases if slots do. + * + * We can safely report the catalog_xmin limit for replication slots here + * because it's only used to advance oldestCatalogXmin. Slots' + * catalog_xmin advance does not depend on it so there's no circularity. */ CurrentRunningXacts->xcnt = count - subcount; @@ -2171,6 +2269,13 @@ GetOldestSafeDecodingTransactionId(void) * If there's already a slot pegging the xmin horizon, we can start with * that value, it's guaranteed to be safe since it's computed by this * routine initially and has been enforced since. + * + * We don't use ShmemVariableCache->oldestCatalogXmin here because another + * backend may have already logged its intention to advance it to a higher + * value (still <= replication_slot_catalog_xmin) and just be waiting on + * ProcArrayLock to actually apply the change. On a standby + * replication_slot_catalog_xmin is what the walreceiver will be sending + * in hot_standby_feedback, not oldestCatalogXmin. */ if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, @@ -2965,18 +3070,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, * * Return the current slot xmin limits. That's useful to be able to remove * data that's older than those limits. + * + * For logical replication slots' catalog_xmins, we return both the effective + * catalog_xmin being used for tuple removal (retained catalog_xmin) and the + * catalog_xmin actually needed by replication slots (needed_catalog_xmin). + * + * retained_catalog_xmin should be older than needed_catalog_xmin but is not + * guaranteed to be if there are replication slots on a replica currently + * attempting to start up and reserve catalogs, outdated replicas sending + * feedback, etc. */ void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin) + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin) { LWLockAcquire(ProcArrayLock, LW_SHARED); if (xmin != NULL) *xmin = procArray->replication_slot_xmin; - if (catalog_xmin != NULL) - *catalog_xmin = procArray->replication_slot_catalog_xmin; + if (retained_catalog_xmin != NULL) + *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (needed_catalog_xmin != NULL) + *needed_catalog_xmin = procArray->replication_slot_catalog_xmin; LWLockRelease(ProcArrayLock); } diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 2ea8931..5c7eb77 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -248,6 +248,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"), + ControlFile->checkPointCopy.oldestCatalogXmin); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/transam.h b/src/include/access/transam.h index d25a2dd..c2cb0a1 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -134,6 +134,10 @@ typedef struct VariableCacheData */ TransactionId latestCompletedXid; /* newest XID that has committed or * aborted */ + TransactionId oldestCatalogXmin; /* oldestCatalogXmin guarantees that + * no valid catalog tuples >= than it + * are removed. That property is used + * for logical decoding. */ /* * These fields are protected by CLogTruncationLock @@ -179,6 +183,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact); extern TransactionId ReadNewTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid); +extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin); extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid); extern bool ForceTransactionIdLimitUpdate(void); extern Oid GetNewObjectId(void); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5b37c05..6d18d18 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -137,7 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_CATALOG_XMIN_ADV 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -187,6 +187,13 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +typedef struct xl_xact_catalog_xmin_advance +{ + TransactionId new_catalog_xmin; +} xl_xact_catalog_xmin_advance; + +#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId)) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we @@ -391,6 +398,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, int xactflags, TransactionId twophase_xid); + +extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 3a25cc8..1fe89ae 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -45,6 +45,7 @@ typedef struct CheckPoint MultiXactOffset nextMultiOffset; /* next free MultiXact offset */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ Oid oldestXidDB; /* database with minimum datfrozenxid */ + TransactionId oldestCatalogXmin; /* catalog retained after this xid */ MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 9b42e49..69a82d7 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -120,6 +120,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked); extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin); + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin); + +extern void UpdateOldestCatalogXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index bf9b50a..80b976b 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,24 +7,79 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 44; # Initialize master node my $node_master = get_new_node('master'); $node_master->init(allows_streaming => 1); -$node_master->append_conf( - 'postgresql.conf', qq( +$node_master->append_conf('postgresql.conf', qq( wal_level = logical +hot_standby_feedback = on +wal_receiver_status_interval = 1 +log_min_messages = debug1 )); $node_master->start; -my $backup_name = 'master_backup'; +# Set up some changes before we make base backups $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]); $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]); $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); +# Launch two streaming replicas, one with and one without +# physical replication slots. We'll use these for tests +# involving interaction of logical and physical standby. +# +# Both backups are created with pg_basebackup. +# +my $backup_name = 'master_backup'; +$node_master->backup($backup_name); + +$node_master->safe_psql('postgres', q[SELECT pg_create_physical_replication_slot('slot_replica');]); +my $node_slot_replica = get_new_node('slot_replica'); +$node_slot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1); +$node_slot_replica->append_conf('recovery.conf', "primary_slot_name = 'slot_replica'"); + +my $node_noslot_replica = get_new_node('noslot_replica'); +$node_noslot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1); + +$node_slot_replica->start; +$node_noslot_replica->start; + +sub restartpoint_standbys +{ + # Force restartpoints to update control files on replicas + $node_slot_replica->safe_psql('postgres', 'CHECKPOINT'); + $node_noslot_replica->safe_psql('postgres', 'CHECKPOINT'); +} + +sub wait_standbys +{ + my $lsn = $node_master->lsn('insert'); + $node_master->wait_for_catchup($node_noslot_replica, 'replay', $lsn); + $node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn); +} + +# pg_basebackup doesn't copy replication slots +is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef, + 'logical slot test_slot on master not copied by pg_basebackup'); + +# Make sure oldestCatalogXmin lands in the control file on master +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica); + +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + # Master had an oldestCatalogXmin, so we must've inherited it via checkpoint + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero after start on " . $node->name); +} + # Basic decoding works my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); @@ -64,6 +119,9 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo chomp($stdout_recv); is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); +# Create a second DB we'll use for testing dropping and accessing slots across +# databases. This matters since logical slots are globally visible objects that +# can only actually be used on one DB for most purposes. $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb'); is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3, @@ -96,9 +154,29 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero on " . $node->name); +} + +# Dropping the slot must clear catalog_xmin is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name); +} -# done with the node -$node_master->stop; +foreach my $node (@nodes) +{ + $node->stop; +} -- 2.5.5
From 9036702eb645acaf3ec660d511c62b09b816f73e Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Mon, 3 Apr 2017 17:31:19 +0800 Subject: [PATCH 2/3] Support conflict with standby on logical walsender Detect and resolve conflicts between walsenders or SQL-level logical decoding sessions and catalog_xmin advances. Refuse to start decoding from a logical slot whose catalog_xmin is below the cluster-wide known-safe threshold so new sessions cannot start. Slots are not persistently marked as invalid and will continue to hold down xlog and (on master) catalog retention. There is no way to restore them to working order, so the application or administrator must drop them to release resources. --- src/backend/access/heap/heapam.c | 2 +- src/backend/access/transam/xact.c | 6 +- src/backend/access/transam/xlog.c | 3 +- src/backend/replication/logical/logical.c | 135 ++++++++++++++++++++++++++++++ src/backend/replication/slot.c | 4 +- src/backend/replication/walsender.c | 14 +--- src/backend/storage/ipc/procarray.c | 51 +++++++++++ src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 7 +- src/backend/tcop/postgres.c | 43 +++++++--- src/include/storage/procarray.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 3 + 13 files changed, 241 insertions(+), 33 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0c3e2b0..93bf143 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7273,7 +7273,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid, * ratchet forwards latestRemovedXid to the greatest one found. * This is used as the basis for generating Hot Standby conflicts, so * if a tuple was never visible then removing it should not conflict - * with queries. + * with queries or logical decoding output plugin callbacks. */ void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 63453d7..48ca884 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5662,14 +5662,10 @@ xact_redo(XLogReaderState *record) * notice when we signal them with a recovery conflict. There's no * effect on the catalogs themselves yet, so it's safe for backends * with older catalog_xmins to still exist. - * - * We don't have to take ProcArrayLock since only the startup process - * is allowed to change oldestCatalogXmin when we're in recovery. - * - * Existing sessions are not notified and must check the safe xmin. */ SetOldestCatalogXmin(xlrec->new_catalog_xmin); + ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin); } else elog(PANIC, "xact_redo: unknown op code %u", info); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8d713e9..a98601a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8546,7 +8546,8 @@ CreateCheckPoint(int flags) InitXLogInsert(); /* Checkpoints are a handy time to update the effective catalog_xmin */ - UpdateOldestCatalogXmin(); + if (XLogInsertAllowed()) + UpdateOldestCatalogXmin(); /* * Acquire CheckpointLock to ensure only one checkpoint happens at a time. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..4a15d55 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "pgstat.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -38,11 +39,14 @@ #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/memutils.h" +#include "utils/ps_status.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState @@ -68,6 +72,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -279,6 +285,16 @@ CreateInitDecodingContext(char *plugin, LWLockRelease(ProcArrayLock); /* + * If this is the first slot created on the master we won't have a + * persistent record of the oldest safe xid for historic snapshots yet. + * Force one to be recorded so that when we go to replay from this slot we + * know it's safe. + */ + if (!RecoveryInProgress() && + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) + UpdateOldestCatalogXmin(); + + /* * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. */ @@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } + EnsureActiveLogicalSlotValid(); + ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, read_page, prepare_write, do_write); @@ -963,3 +981,120 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid(void) +{ + TransactionId shmem_catalog_xmin; + + Assert(MyReplicationSlot != NULL); + + /* + * A logical slot can become unusable if we're doing logical decoding on a + * standby or using a slot created before we were promoted from standby + * to master. If the master advanced its global catalog_xmin past the + * threshold we need it could've removed catalog tuple versions that + * we'll require to start decoding at our restart_lsn. + */ + + LWLockAcquire(ProcArrayLock, LW_SHARED); + shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)), + errdetail("need catalog_xmin %u, have oldestCatalogXmin %u", + MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin))); +} + +/* + * Scan to see if any clients are using replication slots that are below a + * newly-applied new catalog_xmin theshold and signal them to terminate with a + * recovery conflict. + */ +void +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin) +{ + int i; + + if (!InHotStandby) + /* nobody can be actively using logical slots */ + return; + + /* Already applied new limit, can't have replayed later one yet */ + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin); + + /* + * Find the first conflicting active slot and signal its owning backend + * to exit. We'll be called repeatedly by the recovery code until there + * are no more conflicts. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + pid_t active_pid; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + /* + * Physical slots can have a catalog_xmin, but conflicts are the + * problem of the leaf replica with the logical slot. + */ + if (!(slot->in_use && SlotIsLogical(slot))) + continue; + + /* + * We only care about the effective_catalog_xmin of active logical + * slots. Anything else gets checked when a new decoding session tries + * to start. + */ + while (slot->in_use && slot->active_pid != 0 && + TransactionIdIsValid(slot->effective_catalog_xmin) && + (!TransactionIdIsValid(new_catalog_xmin) || + TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin))) + { + /* + * We'll be sleeping, so release the control lock. New conflicting + * backends cannot appear and if old ones go away that's what we + * want, so release and re-acquire is OK here. + */ + active_pid = slot->active_pid; + LWLockRelease(ReplicationSlotControlLock); + + if (WaitExceedsMaxStandbyDelay()) + { + ereport(INFO, + (errmsg("terminating logical decoding session due to recovery conflict"), + errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.", + active_pid, slot->effective_catalog_xmin, + NameStr(slot->data.name), new_catalog_xmin))); + + /* + * Signal the proc. If the slot is already released or even if + * pid is re-used we don't care, backends are required to + * tolerate spurious recovery signals. + */ + CancelLogicalDecodingSessionWithRecoveryConflict(active_pid); + + /* Don't flood the system with signals */ + pg_usleep(10000); + } + + /* + * We need to re-acquire the lock before re-checking the slot or + * continuing the scan. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + + } + LWLockRelease(ReplicationSlotControlLock); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6c5ec7a..57a3994 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -48,6 +48,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/standby.h" #include "utils/builtins.h" /* @@ -931,7 +932,8 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. We can't do that on a standby; there we must wait for the + * bgwriter to get around to logging its periodic standby snapshot. * * That's not needed (or indeed helpful) for physical slots as they'll * start replay at the last logged checkpoint anyway. Instead return diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e64054b..5d60e7a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -212,7 +212,6 @@ static struct /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -2863,17 +2862,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGUSR1: set flag to send WAL records */ -static void -WalSndXLogSendHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - latch_sigusr1_handler(); - - errno = save_errno; -} - /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) @@ -2907,7 +2895,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9e98af8..05e3058 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2762,6 +2762,57 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) } /* + * Notify a logical decoding session that it conflicts with newly set + * catalog_xmin from the master. We're about to start replaying WAL + * that will make its historic snapshot potentially unsafe by removing + * system tuples it might need. + */ +void +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid) +{ + ProcArrayStruct *arrayP = procArray; + int index; + BackendId backend_id = InvalidBackendId; + + /* + * We have to scan ProcArray to find the process and set a pending recovery + * conflict even though we know the pid. At least we can get the BackendId + * and avoid a ProcSignal scan by SendProcSignal. + * + * The pid might've gone away, in which case we got the desired + * outcome anyway. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGPROC *proc = &allProcs[pgprocno]; + + if (proc->pid == session_pid) + { + VirtualTransactionId procvxid; + + GET_VXID_FROM_PGPROC(procvxid, *proc); + + proc->recoveryConflictPending = true; + backend_id = procvxid.backendId; + break; + } + } + + LWLockRelease(ProcArrayLock); + + /* + * Kill the pid if it's still here. If not, that's what we + * wanted so ignore any errors. + */ + if (backend_id != InvalidBackendId) + (void) SendProcSignal(session_pid, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, backend_id); +} + +/* * MinimumActiveBackends --- count backends (other than myself) that are * in active transactions. Return true if the count exceeds the * minimum threshold passed. This is used as a heuristic to decide if diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 4a21d55..16c2e1f 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f93..f6106ca 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -29,6 +29,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "replication/slot.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -152,11 +153,13 @@ GetStandbyLimitTime(void) static int standbyWait_us = STANDBY_INITIAL_WAIT_US; /* - * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs. + * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and + * ResolveRecoveryConflictWithLogicalDecoding. + * * We wait here for a while then return. If we decide we can't wait any * more then we return true, if we can wait some more return false. */ -static bool +bool WaitExceedsMaxStandbyDelay(void) { TimestampTz ltime; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a2282058..530dcbe 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2276,6 +2276,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: + errdetail("Logical replication slot requires catalog rows that will be removed."); + break; case PROCSIG_RECOVERY_CONFLICT_DATABASE: errdetail("User was connected to a database that must be dropped."); break; @@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS) /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() - * and StatementCancelHandler(). Called only by a normal user backend - * that begins a transaction during recovery. + * and StatementCancelHandler(). + * + * Called by normal user backends running during recovery. Also used by the + * walsender to handle recovery conflicts with logical decoding, and by + * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery + * conflicts. */ void RecoveryConflictInterrupt(ProcSignalReason reason) @@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional drop through to session cancel */ + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; @@ -2795,12 +2803,18 @@ RecoveryConflictInterrupt(ProcSignalReason reason) Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending)); /* - * All conflicts apart from database cause dynamic errors where the - * command or transaction can be retried at a later point with some - * potential for success. No need to reset this, since non-retryable - * conflict errors are currently FATAL. + * All conflicts apart from database and catalog_xmin cause dynamic + * errors where the command or transaction can be retried at a later + * point with some potential for success. No need to reset this, since + * non-retryable conflict errors are currently FATAL. + * + * catalog_xmin is non-retryable because once we advance the + * catalog_xmin threshold we might replay wal that removes + * needed catalog tuples. The slot can't (re)start decoding + * because its catalog_xmin cannot be satisifed. */ - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE) + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) RecoveryConflictRetryable = false; } @@ -2855,11 +2869,20 @@ ProcessInterrupts(void) } else if (RecoveryConflictPending) { - /* Currently there is only one non-retryable recovery conflict */ - Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE); + int code; + + Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + + if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) + /* XXX more appropriate error code? */ + code = ERRCODE_PROGRAM_LIMIT_EXCEEDED; + else + code = ERRCODE_DATABASE_DROPPED; + pgstat_report_recovery_conflict(RecoveryConflictReason); ereport(FATAL, - (errcode(ERRCODE_DATABASE_DROPPED), + (errcode(code), errmsg("terminating connection due to conflict with recovery"), errdetail_recovery_conflict())); } diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 69a82d7..231297d 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -112,6 +112,8 @@ extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared); +extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid); + extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index d068dde..3a3ba72 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 3ecc446..b17ba6f 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -34,10 +34,13 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid); extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag); extern void ResolveRecoveryConflictWithBufferPin(void); +extern void ResolveRecoveryConflictWithLogicalDecoding( + TransactionId new_catalog_xmin); extern void CheckRecoveryConflictDeadlock(void); extern void StandbyDeadLockHandler(void); extern void StandbyTimeoutHandler(void); extern void StandbyLockTimeoutHandler(void); +extern bool WaitExceedsMaxStandbyDelay(void); /* * Standby Rmgr (RM_STANDBY_ID) -- 2.5.5
From 860beb565dba23c8c4f68d41f3c88a0e1789d12f Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Tue, 4 Apr 2017 11:50:30 +0800 Subject: [PATCH 3/3] Permit logical decoding on standby Permit the creation of logical slots on replicas and permit replay from them. Dropping logical slots on replcas was already supported. --- src/backend/replication/logical/logical.c | 49 +- src/backend/replication/walreceiver.c | 12 + src/test/recovery/t/006_logical_decoding.pl | 70 ++- .../recovery/t/012_logical_decoding_on_replica.pl | 506 +++++++++++++++++++++ 4 files changed, 605 insertions(+), 32 deletions(-) create mode 100644 src/test/recovery/t/012_logical_decoding_on_replica.pl diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 4a15d55..35d110f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,23 +93,40 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /*---- + * We really want to enforce that: + * - we're connected to the primary via a replication slot + * - hot_standby_feedback is enabled + * - the user cannot turn hot_standby_feedback off while we have + * logical slots on the standby (it's PGC_SIGHUP) + * - hot_standby_feedback has actually taken effect on the master + * + * ... but because the walreceiver doesn't use normal GUCs and may or + * may not actually be running we can't reliably enforce those + * conditions yet. We also have no way of knowing when hot standby + * feedback has reached the master and locked in a catalog_xmin. + * + * So on standbys, slot creation or decoding from a slot may fail with + * a recovery conflict. But we keep track of the master's true + * catalog_xmin in WAL, so we'll never attempt to decode unsafely. + * + * Make a best effort sanity check anyway. + *--- + */ + if (!hot_standby_feedback) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires hot_standby_feedback = on"))); + + LWLockAcquire(ProcArrayLock, LW_SHARED); + if (!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("hot_standby_feedback has not yet taken effect"))); + LWLockRelease(ProcArrayLock); + } } /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 277f196..c0f6cec 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1239,6 +1239,18 @@ XLogWalRcvSendHSFeedback(bool immed) if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) xmin = slot_xmin; + + /* + * If there's no local catalog_xmin, report it as == xmin, so that + * we lock in a catalog_xmin before we need to create any logical slots + * on this standby. This won't add much catalog bloat until we create + * local slots and catalog_xmin starts lagging behind xmin, but it will + * cause the master to start logging + * xl_xact_catalog_xmin_advance records we need for logical + * decoding on standby. + */ + if (!TransactionIdIsValid(catalog_xmin) && XLogLogicalInfoActive()) + catalog_xmin = xmin; } else { diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 80b976b..88ddf00 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 44; +use Test::More tests => 57; # Initialize master node my $node_master = get_new_node('master'); @@ -61,18 +61,22 @@ sub wait_standbys $node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn); } +sub sync_up +{ + $node_master->safe_psql('postgres', 'CHECKPOINT;'); + wait_standbys(); + restartpoint_standbys(); + # for hot_standby_feedback wal_sender_status_interval + sleep(1.5); +} + # pg_basebackup doesn't copy replication slots is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef, 'logical slot test_slot on master not copied by pg_basebackup'); -# Make sure oldestCatalogXmin lands in the control file on master -$node_master->safe_psql('postgres', 'VACUUM;'); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica); - -wait_standbys(); -restartpoint_standbys(); +sync_up(); foreach my $node (@nodes) { # Master had an oldestCatalogXmin, so we must've inherited it via checkpoint @@ -154,26 +158,60 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); -wait_standbys(); -restartpoint_standbys(); +sync_up(); foreach my $node (@nodes) { command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, "pg_controldata's oldestCatalogXmin is nonzero on " . $node->name); } -# Dropping the slot must clear catalog_xmin +# Drop the logical slot on the master; make sure feedback from standbys continues to peg +# catalog_xmin. is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); -is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); -$node_master->safe_psql('postgres', 'VACUUM;'); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); -wait_standbys(); -restartpoint_standbys(); +is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped on master'); +# Do a dummy xact so we can make sure catalog_xmin will advance, and we can see that +# catalog_xmin will advance along with it. +my $xmin = $node_master->safe_psql('postgres', 'BEGIN; CREATE TABLE dummy_xact(blah integer); SELECT txid_current(); COMMIT;'); + +# even though the logical slot on the upstream is dropped, master's +# oldestCatalogXmin is held down by hot standby feedback from the replicas. +# Since the replicas have no logical slots of their own, it should've advanced +# to be the same as the physical slot xmin for the slot replica. +sync_up(); +# There are no transactions on the replicas so their xmin and catalog_xmin +# will both be nextXid. +cmp_ok($node_master->slot('slot_replica')->{'xmin'}, "eq", $xmin + 1, + 'xmin advanced to latest master xid on slot_replica on master'); +cmp_ok($node_master->slot('slot_replica')->{'catalog_xmin'}, "le", $xmin + 1, + 'xmin == catalog_xmin on phys slot held down by standby catalog_xmin'); +# Control files will still contain the xid, since there won't have been another +# checkpoint to advance the nextXid reported by feedback and write it to the +# control file. +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:$xmin$/m, + "pg_controldata's oldestCatalogXmin advanced after drop, vacuum and checkpoint on " . $node->name); +} + +# if we turn hot_standby_feedback off on the replica that uses a slot, the +# master should no longer have anything holding down its catalog_xmin. Even +# though hot_standby_feedback is still enabled on the non-slot replica, it +# cannot set the master's catalog_xmin because it has no destination slot, +# it can only set xmin in its procarray entry. +$node_slot_replica->safe_psql('postgres', q[ALTER SYSTEM SET hot_standby_feedback = off;]); +# simplest way to force new hot standby feedback to be sent +$node_slot_replica->restart; +sleep(1); +# hot standby feedback should've cleared minimums +is($node_master->slot('slot_replica')->{'xmin'}, '', 'phys slot xmin null with hs_feedback off'); +is($node_master->slot('slot_replica')->{'catalog_xmin'}, '', 'phys slot catalog_xmin null with hs_feedback off'); +sync_up(); +# Everyone should now see the cleared catalog_xmin foreach my $node (@nodes) { command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, - "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name); + "pg_controldata's oldestCatalogXmin zero after turning off hs_feedback: " . $node->name); } foreach my $node (@nodes) diff --git a/src/test/recovery/t/012_logical_decoding_on_replica.pl b/src/test/recovery/t/012_logical_decoding_on_replica.pl new file mode 100644 index 0000000..962c801 --- /dev/null +++ b/src/test/recovery/t/012_logical_decoding_on_replica.pl @@ -0,0 +1,506 @@ +#!/usr/bin/env perl +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 77; +use RecursiveCopy; +use File::Copy; + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--write-recovery-conf', '--slot=decoding_standby'); + +open(my $fh, "<", $backup_dir . "/recovery.conf") + or die "can't open recovery.conf"; + +my $found = 0; +while (my $line = <$fh>) +{ + chomp($line); + if ($line eq "primary_slot_name = 'decoding_standby'") + { + $found = 1; + last; + } +} +ok($found, "using physical slot for standby"); + +sub print_phys_xmin +{ + my $slot = $node_master->slot('decoding_standby'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# After slot creation, xmins must be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); + +$node_replica->start; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# with hot_standby_feedback off, xmin and catalog_xmin must still be null +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "xmin null after replica join"); +is($catalog_xmin, '', "catalog_xmin null after replica join"); + +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; +sleep(2); # ensure walreceiver feedback sent + +# If no slot on standby exists to hold down catalog_xmin it must follow xmin, +# (which is nextXid when no xacts are running on the standby). +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, "xmin not null"); +is($xmin, $catalog_xmin, "xmin and catalog_xmin equal"); + +# We need catalog_xmin advance to take effect on the master and be replayed +# on standby. +$node_master->safe_psql('postgres', 'CHECKPOINT'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# Create new slots on the replica, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or BAIL_OUT('cannot continue if slot replay fails'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +is($new_logical_xmin, '', "logical xmin null"); +isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null"); +cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); +isnt($new_physical_xmin, '', "physical xmin not null"); +# hot standby feedback should advance phys catalog_xmin now the standby's slot +# doesn't hold it down as far. +isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null"); +cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced"); + +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream catalog retention +######################################################### + +sub test_catalog_xmin_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/) + { + $oldestCatalogXmin = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + print "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid, $oldestCatalogXmin); +} + +my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); +cmp_ok($oldestCatalogXmin, ">=", $oldestXid, "oldestCatalogXmin >= oldestXid"); +cmp_ok($oldestCatalogXmin, "<=", $new_logical_catalog_xmin,, "oldestCatalogXmin >= downstream catalog_xmin"); + +######################################################### +# Conflict with recovery: xmin cancels decoding session +######################################################### +# +# Start a transaction on the replica then perform work that should cause a +# recovery conflict with it. We'll check to make sure the client gets +# terminated with recovery conflict. +# +# Temporarily disable hs feedback so we can test recovery conflicts. +# It's fine to continue using a physical slot, the xmin should be +# cleared. We only check hot_standby_feedback when establishing +# a new decoding session so this approach circumvents the safeguards +# in place and forces a conflict. +# +# We'll also create an unrelated table so we can drop it later, making +# sure there are catalog changes to replay. +$node_master->safe_psql('testdb', 'CREATE TABLE dummy_table(blah integer)'); + +# Start pg_recvlogical before we turn off hs_feedback so its slot's +# catalog_xmin is above the downstream's catalog_threshold when we start +# decoding. +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off'); +$node_replica->reload; + +sleep(2); + +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "physical xmin null after hs_feedback disabled"); +is($catalog_xmin, '', "physical catalog_xmin null after hs_feedback disabled"); + +# Burn a bunch of XIDs and make sure upstream catalog_xmin is past what we'll +# need here +($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); +cmp_ok($oldestXid, ">", $new_logical_catalog_xmin, 'upstream oldestXid advanced past downstream catalog_xmin with hs_feedback off'); +cmp_ok($oldestCatalogXmin, "==", 0, "oldestCatalogXmin = InvalidTransactionId with hs_feedback off"); + +# Make some data-only changes. We don't have a way to delay advance of the +# catalog_xmin threshold until catalog changes are made, now that our slot is +# no longer holding down catalog_xmin this will result in a recovery conflict. +$node_master->safe_psql('testdb', 'DELETE FROM test_table'); +# Force a checkpoint to make sure catalog_xmin advances +$node_master->safe_psql('testdb', 'CHECKPOINT;'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +$handle->pump; + +is($node_replica->slot('standby_logical')->{'active_pid'}, '', 'pg_recvlogical no longer connected to slot'); + +# client died? +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server on recovery conflict"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict errmsg'); + like($stderr, qr/requires catalog rows that will be removed/, 'pg_recvlogical exited with catalog_xmin conflict'); +} +else +{ + fail("pg_recvlogical returned ok $return with stdout '$stdout', stderr '$stderr'"); +} + +# record the xmin when the conflicts arose +my ($conflict_xmin, $conflict_catalog_xmin) = print_logical_xmin(); + +##################################################################### +# Conflict with recovery: oldestCatalogXmin should be zero with no feedback +##################################################################### +# +# We cleared the catalog_xmin on the physical slot when hs feedback was turned +# off. There's no logical slot on the master. So oldestCatalogXmin must be +# zero. +# +$node_replica->safe_psql('postgres', 'CHECKPOINT'); +command_like(['pg_controldata', $node_replica->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero when hot standby feedback is off"); + +##################################################################### +# Conflict with recovery: refuse to run without hot_standby_feedback +##################################################################### +# +# When hot_standby_feedback is off, new connections should fail. +# + +IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +is($?, 256, 'pg_recvlogical failed to connect to slot while hot_standby_feedback off'); +like($stderr, qr/hot_standby_feedback/, 'recvlogical recovery conflict errmsg'); + +##################################################################### +# Conflict with recovery: catalog_xmin advance invalidates idle slot +##################################################################### +# +# The slot that pg_recvlogical was using before it was terminated +# should not accept new connections now, since its catalog_xmin +# is lower than the replica's threshold. Even once we re-enable +# hot_standby_feedback, the removed tuples won't somehow come back. +# + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on'); +$node_replica->reload; +# Wait until hot_standby_feedback is applied +sleep(2); +# make sure we see the effect promptly in xlog +$node_master->safe_psql('postgres', 'CHECKPOINT'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, 'xmin on phys slot non-null after re-establishing hot standby feedback'); +ok($catalog_xmin, 'catalog_xmin on phys slot non-null after re-establishing hot standby feedback') + or BAIL_OUT('further results meaningless if catalog_xmin not set on master'); + +# The walsender will clamp the catalog_xmin on the slot, so when the standby sends +# feedback with a too-old catalog_xmin the result will actually be limited to +# the safe catalog_xmin. +cmp_ok($catalog_xmin, ">=", $conflict_catalog_xmin, + 'phys slot catalog_xmin has not rewound to replica logical slot catalog_xmin'); + +print "catalog_xmin is $catalog_xmin"; + +$node_replica->safe_psql('postgres', 'CHECKPOINT'); +command_like(['pg_controldata', $node_replica->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:(?!$conflict_catalog_xmin)[^0][[:digit:]]*$/m, + "pg_controldata's oldestCatalogXmin has not rewound to slot catalog_xmin") + or BAIL_OUT('oldestCatalogXmin rewound, further tests are nonsensical'); + +my $timer = IPC::Run::timeout(120); +eval { + IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], + '>', \$stdout, '2>', \$stderr, $timer); +}; +ok(!$timer->is_expired, 'pg_recvlogical exited not timed out'); +is($?, 256, 'pg_recvlogical failed to connect to slot with past catalog_xmin'); +like($stderr, qr/replication slot '.*' requires catalogs removed by master/, 'recvlogical recovery conflict errmsg'); + + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +($xmin, $catalog_xmin) = print_phys_xmin(); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_xmin, $new_catalog_xmin) = print_phys_xmin(); +# We're now back to the old behaviour of hot_standby_feedback +# reporting nextXid for both thresholds +ok($new_catalog_xmin, "physical catalog_xmin still non-null"); +cmp_ok($new_catalog_xmin, '==', $new_xmin, + 'xmin and catalog_xmin equal after slot drop'); + + +################################################## +# Recovery: drop database drops idle slots +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB on the upstream if they're on the right DB, or not dropped if on +# another DB. + +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot') + or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot') + or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); + +is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped'); + + +################################################## +# Recovery: drop database drops in-use slots +################################################## + +# This time, have the slot in-use on the downstream DB when we drop it. +print "Testing dropdb when downstream slot is in-use"; +$node_master->psql('postgres', q[CREATE DATABASE testdb2]); + +print "creating slot dodropslot2"; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'], + 'pg_recvlogical created slot test_decoding'); +is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created'); + +# make sure the slot is in use +print "starting pg_recvlogical"; +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +sleep(1); + +is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active') + or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'"); + +# Master doesn't know the replica's slot is busy so dropdb should succeed +$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]); +ok(1, 'dropdb finished'); + +while ($node_replica->slot('dodropslot2')->{'active_pid'}) +{ + sleep(1); + print "waiting for walsender to exit"; +} + +print "walsender exited, waiting for pg_recvlogical to exit"; + +# our client should've terminated in response to the walsender error +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db'); +} + +is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited'); + +# The slot should be dropped by recovery now +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers