On 4 April 2017 at 22:32, Craig Ringer <cr...@2ndquadrant.com> wrote: > Hi all > > Here's the final set of three patches on top of what's already committed. > > The first is catalog_xmin logging, which is unchanged from the prior post. > > The 2nd is support for conflict with recovery, with changes that > should address Andres's concerns there. > > The 3rd actually enables decoding on standby. Unlike the prior > version, no attempt is made to check the walsender configuration for > slot use, etc. The ugly code to try to mitigate races is also removed. > Instead, if wal_level is logical the catalog_xmin sent by > hot_standby_feedback is now the same as the xmin if there's no local > slot holding it down. So we're always sending a catalog_xmin in > feedback and we should always expect to have a valid local > oldestCatalogXmin once hot_standby_feedback kicks in. This makes the > race in slot creation no worse than the existing race between > hot_standby_feedback establishment and the first queries run on a > downstream, albeit with more annoying consequences. Apps can still > ensure a slot created on standby is guaranteed safe and conflict-free > by having a slot on the master first. > > I'm much happier with this. I'm still fixing some issues in the tests > for 03 and tidying them up, but 03 should allow 01 and 02 to be > reviewed in their proper context now.
Dammit. Attached. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 9b8b1236eb32819430062031ff76750ed8bc1661 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 22 Mar 2017 13:36:49 +0800 Subject: [PATCH 1/3] Log catalog_xmin advances before removing catalog tuples Before advancing the effective catalog_xmin we use to remove old catalog tuple versions, make sure it is written to WAL. This allows standbys to know the oldest xid they can safely create a historic snapshot for. They can then refuse to start decoding from a slot or raise a recovery conflict. The catalog_xmin advance is logged in a new xl_catalog_xmin_advance record, emitted before vacuum or periodically by the bgwriter. WAL is only written if the lowest catalog_xmin needed by any replication slot has advanced. --- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/rmgrdesc/xactdesc.c | 9 ++ src/backend/access/rmgrdesc/xlogdesc.c | 3 +- src/backend/access/transam/varsup.c | 15 ++++ src/backend/access/transam/xact.c | 36 ++++++++ src/backend/access/transam/xlog.c | 23 ++++- src/backend/postmaster/bgwriter.c | 9 ++ src/backend/replication/logical/decode.c | 12 +++ src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 8 ++ src/backend/storage/ipc/procarray.c | 134 ++++++++++++++++++++++++++-- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/transam.h | 5 ++ src/include/access/xact.h | 12 ++- src/include/catalog/pg_control.h | 1 + src/include/storage/procarray.h | 5 +- src/test/recovery/t/006_logical_decoding.pl | 90 +++++++++++++++++-- 17 files changed, 348 insertions(+), 21 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index d7f65a5..d1400ec 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state) if (!state->rs_logical_rewrite) return; - ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin); + /* Use oldestCatalogXmin here */ + ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL); /* * If there are no logical slots in progress we don't need to do anything, diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 735f8c5..96ea163 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin); + } } const char * @@ -324,6 +330,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_CATALOG_XMIN_ADV: + id = "CATALOG_XMIN"; + break; } return id; diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 5f07eb1..a66cfc6 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -47,7 +47,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " - "oldest running xid %u; %s", + "oldest running xid %u; oldest catalog xmin %u; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -63,6 +63,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->oldestCommitTsXid, checkpoint->newestCommitTsXid, checkpoint->oldestActiveXid, + checkpoint->oldestCatalogXmin, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } else if (info == XLOG_NEXTOID) diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 5efbfbd..ffabf1c 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -414,6 +414,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) } } +/* + * Set the global oldest catalog_xmin used to determine when tuples + * may be removed from catalogs and user-catalogs accessible from logical + * decoding. + * + * Only to be called from the startup process or from LogCurrentRunningXacts() + * which ensures the update is properly written to xlog first. + */ +void +SetOldestCatalogXmin(TransactionId oldestCatalogXmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin; + LWLockRelease(ProcArrayLock); +} /* * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating? diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c8751c6..63453d7 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5652,6 +5652,42 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_CATALOG_XMIN_ADV) + { + xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record); + + /* + * Apply the new catalog_xmin limit immediately. New decoding sessions + * will refuse to start if their slot is past it, and old ones will + * notice when we signal them with a recovery conflict. There's no + * effect on the catalogs themselves yet, so it's safe for backends + * with older catalog_xmins to still exist. + * + * We don't have to take ProcArrayLock since only the startup process + * is allowed to change oldestCatalogXmin when we're in recovery. + * + * Existing sessions are not notified and must check the safe xmin. + */ + SetOldestCatalogXmin(xlrec->new_catalog_xmin); + + } else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * Record when we advance the catalog_xmin used for tuple removal + * so standbys find out before we remove catalog tuples they might + * need for logical decoding. + */ +XLogRecPtr +XactLogCatalogXminUpdate(TransactionId new_catalog_xmin) +{ + xl_xact_catalog_xmin_advance xlrec; + + xlrec.new_catalog_xmin = new_catalog_xmin; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance); + return XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5d58f09..8d713e9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5021,6 +5021,7 @@ BootStrapXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId); @@ -6611,6 +6612,12 @@ StartupXLOG(void) (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u", checkPoint.oldestXid, checkPoint.oldestXidDB))); ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, + (errmsg_internal("oldest catalog-only transaction ID: %u", + checkPoint.oldestCatalogXmin))); + ereport(DEBUG1, (errmsg_internal("oldest MultiXactId: %u, in database %u", checkPoint.oldestMulti, checkPoint.oldestMultiDB))); ereport(DEBUG1, @@ -6628,6 +6635,7 @@ StartupXLOG(void) MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); AdvanceOldestClogXid(checkPoint.oldestXid); SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true); SetCommitTsLimit(checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid); @@ -8537,6 +8545,9 @@ CreateCheckPoint(int flags) */ InitXLogInsert(); + /* Checkpoints are a handy time to update the effective catalog_xmin */ + UpdateOldestCatalogXmin(); + /* * Acquire CheckpointLock to ensure only one checkpoint happens at a time. * (This is just pro forma, since in the present system structure there is @@ -8726,6 +8737,10 @@ CreateCheckPoint(int flags) &checkPoint.oldestMulti, &checkPoint.oldestMultiDB); + LWLockAcquire(ProcArrayLock, LW_SHARED); + checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + /* * Having constructed the checkpoint record, ensure all shmem disk buffers * and commit-log buffers are flushed to disk. @@ -9632,6 +9647,8 @@ xlog_redo(XLogReaderState *record) */ SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); + /* * If we see a shutdown checkpoint while waiting for an end-of-backup * record, the backup was canceled and the end-of-backup record will @@ -9729,8 +9746,10 @@ xlog_redo(XLogReaderState *record) checkPoint.oldestMultiDB); if (TransactionIdPrecedes(ShmemVariableCache->oldestXid, checkPoint.oldestXid)) - SetTransactionIdLimit(checkPoint.oldestXid, - checkPoint.oldestXidDB); + SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB); + + SetOldestCatalogXmin(checkPoint.oldestCatalogXmin); + /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index dcb4cf2..3bb5200 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -51,6 +51,7 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "storage/shmem.h" #include "storage/smgr.h" #include "storage/spin.h" @@ -333,6 +334,14 @@ BackgroundWriterMain(void) last_snapshot_lsn = LogStandbySnapshot(); last_snapshot_ts = now; } + + /* + * We can also advance the threshold used for catalog tuple + * cleanup, rate-limited so we don't write it too often. The delay + * slightly increases catalog bloat but reduces the volume of + * catalog_xmin advance records written. + */ + UpdateOldestCatalogXmin(); } /* diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5c13d26..b5084b9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -288,6 +288,18 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; + case XLOG_XACT_CATALOG_XMIN_ADV: + + /* + * The global catalog_xmin has been advanced. By the time we see + * this in logical decoding it no longer matters, since it's + * guaranteed that all later records will be consistent with the + * advanced catalog_xmin, so we ignore it here. If we were running + * on a standby and it applied a catalog xmin advance past our + * needed catalog_xmin we would've already been terminated with a + * conflict with standby error. + */ + break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index df93265..277f196 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed) xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN); - ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); + ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin); if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dbb10c7..9f3a86b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1778,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } + /* + * If the physical slot is relaying catalog_xmin for logical replication + * slots on the replica it's safe to act on catalog_xmin advances + * immediately too. The replica will only send a new catalog_xmin via + * feedback when it advances its effective_catalog_xmin, so it's done the + * delay-until-confirmed dance for us and knows it won't need the data + * we're protecting from vacuum again. + */ if (!TransactionIdIsNormal(slot->data.catalog_xmin) || !TransactionIdIsNormal(feedbackCatalogXmin) || TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 7c2e1e1..9e98af8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -87,7 +87,12 @@ typedef struct ProcArrayStruct /* oldest xmin of any replication slot */ TransactionId replication_slot_xmin; - /* oldest catalog xmin of any replication slot */ + + /* + * Oldest catalog xmin of any replication slot + * + * See also ShmemVariableCache->oldestGlobalXmin + */ TransactionId replication_slot_catalog_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ @@ -1306,6 +1311,9 @@ TransactionIdIsActive(TransactionId xid) * The return value is also adjusted with vacuum_defer_cleanup_age, so * increasing that setting on the fly is another easy way to make * GetOldestXmin() move backwards, with no consequences for data integrity. + * + * When changing GetOldestXmin, check to see whether RecentGlobalXmin + * computation in GetSnapshotData also needs changing. */ TransactionId GetOldestXmin(Relation rel, int flags) @@ -1444,6 +1452,89 @@ GetOldestXmin(Relation rel, int flags) } /* + * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated + * to reflect an advance in procArray->replication_slot_catalog_xmin or + * it becoming newly set or unset. + * + */ +static bool +CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin) +{ + return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin) + || (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin))); +} + +/* + * If necessary, copy the current catalog_xmin needed by replication slots to + * the effective catalog_xmin used for dead tuple removal and write a WAL + * record recording the change. + * + * This allows standbys to know the oldest xid for which it is safe to create + * a historic snapshot for logical decoding. VACUUM or other cleanup may have + * removed catalog tuple versions needed to correctly decode transactions older + * than this threshold. Standbys can use this information to cancel conflicting + * decoding sessions and invalidate slots that need discarded information. + * + * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for + * this, since they don't identify the relation as a catalog or not. Nor can a + * standby look up the relcache to get the Relation for the affected + * relfilenode to check if it is a catalog. The standby would also have no way + * to know the oldest safe position at startup if it wasn't in the control + * file.) + */ +void +UpdateOldestCatalogXmin(void) +{ + TransactionId vacuum_catalog_xmin; + TransactionId slots_catalog_xmin; + + Assert(XLogInsertAllowed()); + + /* + * It's most likely that replication_slot_catalog_xmin and + * oldestCatalogXmin will be the same and no action is required, so do a + * pre-check before doing expensive WAL writing and exclusive locking. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + vacuum_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + slots_catalog_xmin = procArray->replication_slot_catalog_xmin; + LWLockRelease(ProcArrayLock); + + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + { + /* + * We must prevent a concurrent checkpoint, otherwise the catalog xmin + * advance xlog record with the new value might be written before the + * checkpoint but the checkpoint may still see the old + * oldestCatalogXmin value. + */ + if (!LWLockConditionalAcquire(CheckpointLock, LW_SHARED)) + /* Couldn't get checkpointer lock; will retry later */ + return; + + XactLogCatalogXminUpdate(slots_catalog_xmin); + + /* + * A concurrent updater could've changed the oldestCatalogXmin so we + * need to re-check under ProcArrayLock before updating. The LWLock + * provides a barrier. + * + * We must not re-read replication_slot_catalog_xmin even if it has + * advanced, since we xlog'd the older value. If it advanced since, a + * later run will xlog the new value and advance. + */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + vacuum_catalog_xmin = *((volatile TransactionId *) &ShmemVariableCache->oldestCatalogXmin); + if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin)) + ShmemVariableCache->oldestCatalogXmin = slots_catalog_xmin; + LWLockRelease(ProcArrayLock); + + LWLockRelease(CheckpointLock); + } + +} + +/* * GetMaxSnapshotXidCount -- get max size for snapshot XID array * * We have to export this for use by snapmgr.c. @@ -1700,7 +1791,7 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; - replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; @@ -1711,6 +1802,9 @@ GetSnapshotData(Snapshot snapshot) * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. + * + * If you change computation of RecentGlobalXmin here you may need to + * change GetOldestXmin(...) as well. */ if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; @@ -2041,12 +2135,16 @@ GetRunningTransactionData(void) } /* - * It's important *not* to include the limits set by slots here because + * It's important *not* to include the xmin set by slots here because * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those * were to be included here the initial value could never increase because - * of a circular dependency where slots only increase their limits when - * running xacts increases oldestRunningXid and running xacts only + * of a circular dependency where slots only increase their xmin limits + * when running xacts increases oldestRunningXid and running xacts only * increases if slots do. + * + * We can safely report the catalog_xmin limit for replication slots here + * because it's only used to advance oldestCatalogXmin. Slots' + * catalog_xmin advance does not depend on it so there's no circularity. */ CurrentRunningXacts->xcnt = count - subcount; @@ -2171,6 +2269,13 @@ GetOldestSafeDecodingTransactionId(void) * If there's already a slot pegging the xmin horizon, we can start with * that value, it's guaranteed to be safe since it's computed by this * routine initially and has been enforced since. + * + * We don't use ShmemVariableCache->oldestCatalogXmin here because another + * backend may have already logged its intention to advance it to a higher + * value (still <= replication_slot_catalog_xmin) and just be waiting on + * ProcArrayLock to actually apply the change. On a standby + * replication_slot_catalog_xmin is what the walreceiver will be sending + * in hot_standby_feedback, not oldestCatalogXmin. */ if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, @@ -2965,18 +3070,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, * * Return the current slot xmin limits. That's useful to be able to remove * data that's older than those limits. + * + * For logical replication slots' catalog_xmins, we return both the effective + * catalog_xmin being used for tuple removal (retained catalog_xmin) and the + * catalog_xmin actually needed by replication slots (needed_catalog_xmin). + * + * retained_catalog_xmin should be older than needed_catalog_xmin but is not + * guaranteed to be if there are replication slots on a replica currently + * attempting to start up and reserve catalogs, outdated replicas sending + * feedback, etc. */ void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin) + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin) { LWLockAcquire(ProcArrayLock, LW_SHARED); if (xmin != NULL) *xmin = procArray->replication_slot_xmin; - if (catalog_xmin != NULL) - *catalog_xmin = procArray->replication_slot_catalog_xmin; + if (retained_catalog_xmin != NULL) + *retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + + if (needed_catalog_xmin != NULL) + *needed_catalog_xmin = procArray->replication_slot_catalog_xmin; LWLockRelease(ProcArrayLock); } diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 2ea8931..5c7eb77 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -248,6 +248,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"), + ControlFile->checkPointCopy.oldestCatalogXmin); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/include/access/transam.h b/src/include/access/transam.h index d25a2dd..c2cb0a1 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -134,6 +134,10 @@ typedef struct VariableCacheData */ TransactionId latestCompletedXid; /* newest XID that has committed or * aborted */ + TransactionId oldestCatalogXmin; /* oldestCatalogXmin guarantees that + * no valid catalog tuples >= than it + * are removed. That property is used + * for logical decoding. */ /* * These fields are protected by CLogTruncationLock @@ -179,6 +183,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact); extern TransactionId ReadNewTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid); +extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin); extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid); extern bool ForceTransactionIdLimitUpdate(void); extern Oid GetNewObjectId(void); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5b37c05..6d18d18 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -137,7 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_CATALOG_XMIN_ADV 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -187,6 +187,13 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +typedef struct xl_xact_catalog_xmin_advance +{ + TransactionId new_catalog_xmin; +} xl_xact_catalog_xmin_advance; + +#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId)) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we @@ -391,6 +398,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, int xactflags, TransactionId twophase_xid); + +extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin); + extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 3a25cc8..1fe89ae 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -45,6 +45,7 @@ typedef struct CheckPoint MultiXactOffset nextMultiOffset; /* next free MultiXact offset */ TransactionId oldestXid; /* cluster-wide minimum datfrozenxid */ Oid oldestXidDB; /* database with minimum datfrozenxid */ + TransactionId oldestCatalogXmin; /* catalog retained after this xid */ MultiXactId oldestMulti; /* cluster-wide minimum datminmxid */ Oid oldestMultiDB; /* database with minimum datminmxid */ pg_time_t time; /* time stamp of checkpoint */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 9b42e49..69a82d7 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -120,6 +120,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked); extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, - TransactionId *catalog_xmin); + TransactionId *retained_catalog_xmin, + TransactionId *needed_catalog_xmin); + +extern void UpdateOldestCatalogXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index bf9b50a..80b976b 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,24 +7,79 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 44; # Initialize master node my $node_master = get_new_node('master'); $node_master->init(allows_streaming => 1); -$node_master->append_conf( - 'postgresql.conf', qq( +$node_master->append_conf('postgresql.conf', qq( wal_level = logical +hot_standby_feedback = on +wal_receiver_status_interval = 1 +log_min_messages = debug1 )); $node_master->start; -my $backup_name = 'master_backup'; +# Set up some changes before we make base backups $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]); $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]); $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); +# Launch two streaming replicas, one with and one without +# physical replication slots. We'll use these for tests +# involving interaction of logical and physical standby. +# +# Both backups are created with pg_basebackup. +# +my $backup_name = 'master_backup'; +$node_master->backup($backup_name); + +$node_master->safe_psql('postgres', q[SELECT pg_create_physical_replication_slot('slot_replica');]); +my $node_slot_replica = get_new_node('slot_replica'); +$node_slot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1); +$node_slot_replica->append_conf('recovery.conf', "primary_slot_name = 'slot_replica'"); + +my $node_noslot_replica = get_new_node('noslot_replica'); +$node_noslot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1); + +$node_slot_replica->start; +$node_noslot_replica->start; + +sub restartpoint_standbys +{ + # Force restartpoints to update control files on replicas + $node_slot_replica->safe_psql('postgres', 'CHECKPOINT'); + $node_noslot_replica->safe_psql('postgres', 'CHECKPOINT'); +} + +sub wait_standbys +{ + my $lsn = $node_master->lsn('insert'); + $node_master->wait_for_catchup($node_noslot_replica, 'replay', $lsn); + $node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn); +} + +# pg_basebackup doesn't copy replication slots +is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef, + 'logical slot test_slot on master not copied by pg_basebackup'); + +# Make sure oldestCatalogXmin lands in the control file on master +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica); + +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + # Master had an oldestCatalogXmin, so we must've inherited it via checkpoint + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero after start on " . $node->name); +} + # Basic decoding works my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); @@ -64,6 +119,9 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo chomp($stdout_recv); is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot'); +# Create a second DB we'll use for testing dropping and accessing slots across +# databases. This matters since logical slots are globally visible objects that +# can only actually be used on one DB for most purposes. $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb'); is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3, @@ -96,9 +154,29 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, + "pg_controldata's oldestCatalogXmin is nonzero on " . $node->name); +} + +# Dropping the slot must clear catalog_xmin is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); +wait_standbys(); +restartpoint_standbys(); +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, + "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name); +} -# done with the node -$node_master->stop; +foreach my $node (@nodes) +{ + $node->stop; +} -- 2.5.5
From c62acf7789a4d1a3db666fa1b2f67ba69af1f237 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Mon, 3 Apr 2017 17:31:19 +0800 Subject: [PATCH 2/3] Support conflict with standby on logical walsender --- src/backend/access/heap/heapam.c | 2 +- src/backend/access/transam/xact.c | 6 +- src/backend/access/transam/xlog.c | 3 +- src/backend/replication/logical/logical.c | 139 ++++++++++++++++++++++++++++++ src/backend/replication/slot.c | 4 +- src/backend/replication/walsender.c | 14 +-- src/backend/storage/ipc/procarray.c | 51 +++++++++++ src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 7 +- src/backend/tcop/postgres.c | 43 ++++++--- src/include/storage/procarray.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 3 + 13 files changed, 245 insertions(+), 33 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 0c3e2b0..93bf143 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7273,7 +7273,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid, * ratchet forwards latestRemovedXid to the greatest one found. * This is used as the basis for generating Hot Standby conflicts, so * if a tuple was never visible then removing it should not conflict - * with queries. + * with queries or logical decoding output plugin callbacks. */ void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 63453d7..48ca884 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5662,14 +5662,10 @@ xact_redo(XLogReaderState *record) * notice when we signal them with a recovery conflict. There's no * effect on the catalogs themselves yet, so it's safe for backends * with older catalog_xmins to still exist. - * - * We don't have to take ProcArrayLock since only the startup process - * is allowed to change oldestCatalogXmin when we're in recovery. - * - * Existing sessions are not notified and must check the safe xmin. */ SetOldestCatalogXmin(xlrec->new_catalog_xmin); + ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin); } else elog(PANIC, "xact_redo: unknown op code %u", info); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8d713e9..a98601a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8546,7 +8546,8 @@ CreateCheckPoint(int flags) InitXLogInsert(); /* Checkpoints are a handy time to update the effective catalog_xmin */ - UpdateOldestCatalogXmin(); + if (XLogInsertAllowed()) + UpdateOldestCatalogXmin(); /* * Acquire CheckpointLock to ensure only one checkpoint happens at a time. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..282e330 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "pgstat.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -38,11 +39,14 @@ #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/memutils.h" +#include "utils/ps_status.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState @@ -68,6 +72,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void EnsureActiveLogicalSlotValid(void); + /* * Make sure the current settings & environment are capable of doing logical * decoding. @@ -218,6 +224,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlot *slot; LogicalDecodingContext *ctx; MemoryContext old_context; + bool force_standby_snapshot; /* shorter lines... */ slot = MyReplicationSlot; @@ -276,8 +283,21 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotsComputeRequiredXmin(true); + /* + * If this is the first slot created on the master we won't have a + * persistent record of the oldest safe xid for historic snapshots yet. + * Force one to be recorded so that when we go to replay from this slot we + * know it's safe. + */ + force_standby_snapshot = + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin); + LWLockRelease(ProcArrayLock); + /* Update ShmemVariableCache->oldestCatalogXmin */ + if (force_standby_snapshot) + UpdateOldestCatalogXmin(); + /* * tell the snapshot builder to only assemble snapshot once reaching the * running_xact's record with the respective xmin. @@ -376,6 +396,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } + EnsureActiveLogicalSlotValid(); + ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, read_page, prepare_write, do_write); @@ -963,3 +985,120 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +/* + * Test to see if the active logical slot is usable. + */ +static void +EnsureActiveLogicalSlotValid(void) +{ + TransactionId shmem_catalog_xmin; + + Assert(MyReplicationSlot != NULL); + + /* + * A logical slot can become unusable if we're doing logical decoding on a + * standby or using a slot created before we were promoted from standby + * to master. If the master advanced its global catalog_xmin past the + * threshold we need it could've removed catalog tuple versions that + * we'll require to start decoding at our restart_lsn. + */ + + LWLockAcquire(ProcArrayLock, LW_SHARED); + shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin; + LWLockRelease(ProcArrayLock); + + if (!TransactionIdIsValid(shmem_catalog_xmin) || + TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot '%s' requires catalogs removed by master", + NameStr(MyReplicationSlot->data.name)), + errdetail("need catalog_xmin %u, have oldestCatalogXmin %u", + MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin))); +} + +/* + * Scan to see if any clients are using replication slots that are below a + * newly-applied new catalog_xmin theshold and signal them to terminate with a + * recovery conflict. + */ +void +ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin) +{ + int i; + + if (!InHotStandby) + /* nobody can be actively using logical slots */ + return; + + /* Already applied new limit, can't have replayed later one yet */ + Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin); + + /* + * Find the first conflicting active slot and signal its owning backend + * to exit. We'll be called repeatedly by the recovery code until there + * are no more conflicts. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *slot; + pid_t active_pid; + + slot = &ReplicationSlotCtl->replication_slots[i]; + + /* + * Physical slots can have a catalog_xmin, but conflicts are the + * problem of the leaf replica with the logical slot. + */ + if (!(slot->in_use && SlotIsLogical(slot))) + continue; + + /* + * We only care about the effective_catalog_xmin of active logical + * slots. Anything else gets checked when a new decoding session tries + * to start. + */ + while (slot->in_use && slot->active_pid != 0 && + TransactionIdIsValid(slot->effective_catalog_xmin) && + (!TransactionIdIsValid(new_catalog_xmin) || + TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin))) + { + /* + * We'll be sleeping, so release the control lock. New conflicting + * backends cannot appear and if old ones go away that's what we + * want, so release and re-acquire is OK here. + */ + active_pid = slot->active_pid; + LWLockRelease(ReplicationSlotControlLock); + + if (WaitExceedsMaxStandbyDelay()) + { + ereport(INFO, + (errmsg("terminating logical decoding session due to recovery conflict"), + errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.", + active_pid, slot->effective_catalog_xmin, + NameStr(slot->data.name), new_catalog_xmin))); + + /* + * Signal the proc. If the slot is already released or even if + * pid is re-used we don't care, backends are required to + * tolerate spurious recovery signals. + */ + CancelLogicalDecodingSessionWithRecoveryConflict(active_pid); + + /* Don't flood the system with signals */ + pg_usleep(10000); + } + + /* + * We need to re-acquire the lock before re-checking the slot or + * continuing the scan. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + + } + LWLockRelease(ReplicationSlotControlLock); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6c5ec7a..57a3994 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -48,6 +48,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/standby.h" #include "utils/builtins.h" /* @@ -931,7 +932,8 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. We can't do that on a standby; there we must wait for the + * bgwriter to get around to logging its periodic standby snapshot. * * That's not needed (or indeed helpful) for physical slots as they'll * start replay at the last logged checkpoint anyway. Instead return diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9f3a86b..ef63b63 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -212,7 +212,6 @@ static struct /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -2831,17 +2830,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGUSR1: set flag to send WAL records */ -static void -WalSndXLogSendHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - latch_sigusr1_handler(); - - errno = save_errno; -} - /* SIGUSR2: set flag to do a last cycle and shut down afterwards */ static void WalSndLastCycleHandler(SIGNAL_ARGS) @@ -2875,7 +2863,7 @@ WalSndSignals(void) pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9e98af8..05e3058 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2762,6 +2762,57 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) } /* + * Notify a logical decoding session that it conflicts with newly set + * catalog_xmin from the master. We're about to start replaying WAL + * that will make its historic snapshot potentially unsafe by removing + * system tuples it might need. + */ +void +CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid) +{ + ProcArrayStruct *arrayP = procArray; + int index; + BackendId backend_id = InvalidBackendId; + + /* + * We have to scan ProcArray to find the process and set a pending recovery + * conflict even though we know the pid. At least we can get the BackendId + * and avoid a ProcSignal scan by SendProcSignal. + * + * The pid might've gone away, in which case we got the desired + * outcome anyway. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGPROC *proc = &allProcs[pgprocno]; + + if (proc->pid == session_pid) + { + VirtualTransactionId procvxid; + + GET_VXID_FROM_PGPROC(procvxid, *proc); + + proc->recoveryConflictPending = true; + backend_id = procvxid.backendId; + break; + } + } + + LWLockRelease(ProcArrayLock); + + /* + * Kill the pid if it's still here. If not, that's what we + * wanted so ignore any errors. + */ + if (backend_id != InvalidBackendId) + (void) SendProcSignal(session_pid, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, backend_id); +} + +/* * MinimumActiveBackends --- count backends (other than myself) that are * in active transactions. Return true if the count exceeds the * minimum threshold passed. This is used as a heuristic to decide if diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 4a21d55..16c2e1f 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 8e57f93..f6106ca 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -29,6 +29,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/standby.h" +#include "replication/slot.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -152,11 +153,13 @@ GetStandbyLimitTime(void) static int standbyWait_us = STANDBY_INITIAL_WAIT_US; /* - * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs. + * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and + * ResolveRecoveryConflictWithLogicalDecoding. + * * We wait here for a while then return. If we decide we can't wait any * more then we return true, if we can wait some more return false. */ -static bool +bool WaitExceedsMaxStandbyDelay(void) { TimestampTz ltime; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a2282058..530dcbe 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2276,6 +2276,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: + errdetail("Logical replication slot requires catalog rows that will be removed."); + break; case PROCSIG_RECOVERY_CONFLICT_DATABASE: errdetail("User was connected to a database that must be dropped."); break; @@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS) /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() - * and StatementCancelHandler(). Called only by a normal user backend - * that begins a transaction during recovery. + * and StatementCancelHandler(). + * + * Called by normal user backends running during recovery. Also used by the + * walsender to handle recovery conflicts with logical decoding, and by + * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery + * conflicts. */ void RecoveryConflictInterrupt(ProcSignalReason reason) @@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* Intentional drop through to session cancel */ + case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN: case PROCSIG_RECOVERY_CONFLICT_DATABASE: RecoveryConflictPending = true; ProcDiePending = true; @@ -2795,12 +2803,18 @@ RecoveryConflictInterrupt(ProcSignalReason reason) Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending)); /* - * All conflicts apart from database cause dynamic errors where the - * command or transaction can be retried at a later point with some - * potential for success. No need to reset this, since non-retryable - * conflict errors are currently FATAL. + * All conflicts apart from database and catalog_xmin cause dynamic + * errors where the command or transaction can be retried at a later + * point with some potential for success. No need to reset this, since + * non-retryable conflict errors are currently FATAL. + * + * catalog_xmin is non-retryable because once we advance the + * catalog_xmin threshold we might replay wal that removes + * needed catalog tuples. The slot can't (re)start decoding + * because its catalog_xmin cannot be satisifed. */ - if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE) + if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) RecoveryConflictRetryable = false; } @@ -2855,11 +2869,20 @@ ProcessInterrupts(void) } else if (RecoveryConflictPending) { - /* Currently there is only one non-retryable recovery conflict */ - Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE); + int code; + + Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE || + RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN); + + if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN) + /* XXX more appropriate error code? */ + code = ERRCODE_PROGRAM_LIMIT_EXCEEDED; + else + code = ERRCODE_DATABASE_DROPPED; + pgstat_report_recovery_conflict(RecoveryConflictReason); ereport(FATAL, - (errcode(ERRCODE_DATABASE_DROPPED), + (errcode(code), errmsg("terminating connection due to conflict with recovery"), errdetail_recovery_conflict())); } diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 69a82d7..231297d 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -112,6 +112,8 @@ extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared); +extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid); + extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index d068dde..3a3ba72 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 3ecc446..b17ba6f 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -34,10 +34,13 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid); extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag); extern void ResolveRecoveryConflictWithBufferPin(void); +extern void ResolveRecoveryConflictWithLogicalDecoding( + TransactionId new_catalog_xmin); extern void CheckRecoveryConflictDeadlock(void); extern void StandbyDeadLockHandler(void); extern void StandbyTimeoutHandler(void); extern void StandbyLockTimeoutHandler(void); +extern bool WaitExceedsMaxStandbyDelay(void); /* * Standby Rmgr (RM_STANDBY_ID) -- 2.5.5
From 854e0f586d6c4f28d02469717122a2997b4410fd Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Tue, 4 Apr 2017 11:50:30 +0800 Subject: [PATCH 3/3] Support decoding on standby --- src/backend/replication/logical/logical.c | 61 ++- src/backend/replication/walreceiver.c | 12 + src/test/recovery/t/006_logical_decoding.pl | 70 ++- .../recovery/t/012_logical_decoding_on_replica.pl | 497 +++++++++++++++++++++ 4 files changed, 600 insertions(+), 40 deletions(-) create mode 100644 src/test/recovery/t/012_logical_decoding_on_replica.pl diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 282e330..35d110f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,23 +93,40 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /*---- + * We really want to enforce that: + * - we're connected to the primary via a replication slot + * - hot_standby_feedback is enabled + * - the user cannot turn hot_standby_feedback off while we have + * logical slots on the standby (it's PGC_SIGHUP) + * - hot_standby_feedback has actually taken effect on the master + * + * ... but because the walreceiver doesn't use normal GUCs and may or + * may not actually be running we can't reliably enforce those + * conditions yet. We also have no way of knowing when hot standby + * feedback has reached the master and locked in a catalog_xmin. + * + * So on standbys, slot creation or decoding from a slot may fail with + * a recovery conflict. But we keep track of the master's true + * catalog_xmin in WAL, so we'll never attempt to decode unsafely. + * + * Make a best effort sanity check anyway. + *--- + */ + if (!hot_standby_feedback) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires hot_standby_feedback = on"))); + + LWLockAcquire(ProcArrayLock, LW_SHARED); + if (!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("hot_standby_feedback has not yet taken effect"))); + LWLockRelease(ProcArrayLock); + } } /* @@ -224,7 +241,6 @@ CreateInitDecodingContext(char *plugin, ReplicationSlot *slot; LogicalDecodingContext *ctx; MemoryContext old_context; - bool force_standby_snapshot; /* shorter lines... */ slot = MyReplicationSlot; @@ -283,19 +299,16 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + /* * If this is the first slot created on the master we won't have a * persistent record of the oldest safe xid for historic snapshots yet. * Force one to be recorded so that when we go to replay from this slot we * know it's safe. */ - force_standby_snapshot = - !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin); - - LWLockRelease(ProcArrayLock); - - /* Update ShmemVariableCache->oldestCatalogXmin */ - if (force_standby_snapshot) + if (!RecoveryInProgress() && + !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin)) UpdateOldestCatalogXmin(); /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 277f196..c0f6cec 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1239,6 +1239,18 @@ XLogWalRcvSendHSFeedback(bool immed) if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) xmin = slot_xmin; + + /* + * If there's no local catalog_xmin, report it as == xmin, so that + * we lock in a catalog_xmin before we need to create any logical slots + * on this standby. This won't add much catalog bloat until we create + * local slots and catalog_xmin starts lagging behind xmin, but it will + * cause the master to start logging + * xl_xact_catalog_xmin_advance records we need for logical + * decoding on standby. + */ + if (!TransactionIdIsValid(catalog_xmin) && XLogLogicalInfoActive()) + catalog_xmin = xmin; } else { diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 80b976b..88ddf00 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 44; +use Test::More tests => 57; # Initialize master node my $node_master = get_new_node('master'); @@ -61,18 +61,22 @@ sub wait_standbys $node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn); } +sub sync_up +{ + $node_master->safe_psql('postgres', 'CHECKPOINT;'); + wait_standbys(); + restartpoint_standbys(); + # for hot_standby_feedback wal_sender_status_interval + sleep(1.5); +} + # pg_basebackup doesn't copy replication slots is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef, 'logical slot test_slot on master not copied by pg_basebackup'); -# Make sure oldestCatalogXmin lands in the control file on master -$node_master->safe_psql('postgres', 'VACUUM;'); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica); - -wait_standbys(); -restartpoint_standbys(); +sync_up(); foreach my $node (@nodes) { # Master had an oldestCatalogXmin, so we must've inherited it via checkpoint @@ -154,26 +158,60 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0', 'restored slot catalog_xmin is nonzero'); is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3, 'reading from slot with wal_level < logical fails'); -wait_standbys(); -restartpoint_standbys(); +sync_up(); foreach my $node (@nodes) { command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m, "pg_controldata's oldestCatalogXmin is nonzero on " . $node->name); } -# Dropping the slot must clear catalog_xmin +# Drop the logical slot on the master; make sure feedback from standbys continues to peg +# catalog_xmin. is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0, 'can drop logical slot while wal_level = replica'); -is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped'); -$node_master->safe_psql('postgres', 'VACUUM;'); -$node_master->safe_psql('postgres', 'CHECKPOINT;'); -wait_standbys(); -restartpoint_standbys(); +is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped on master'); +# Do a dummy xact so we can make sure catalog_xmin will advance, and we can see that +# catalog_xmin will advance along with it. +my $xmin = $node_master->safe_psql('postgres', 'BEGIN; CREATE TABLE dummy_xact(blah integer); SELECT txid_current(); COMMIT;'); + +# even though the logical slot on the upstream is dropped, master's +# oldestCatalogXmin is held down by hot standby feedback from the replicas. +# Since the replicas have no logical slots of their own, it should've advanced +# to be the same as the physical slot xmin for the slot replica. +sync_up(); +# There are no transactions on the replicas so their xmin and catalog_xmin +# will both be nextXid. +cmp_ok($node_master->slot('slot_replica')->{'xmin'}, "eq", $xmin + 1, + 'xmin advanced to latest master xid on slot_replica on master'); +cmp_ok($node_master->slot('slot_replica')->{'catalog_xmin'}, "le", $xmin + 1, + 'xmin == catalog_xmin on phys slot held down by standby catalog_xmin'); +# Control files will still contain the xid, since there won't have been another +# checkpoint to advance the nextXid reported by feedback and write it to the +# control file. +foreach my $node (@nodes) +{ + command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:$xmin$/m, + "pg_controldata's oldestCatalogXmin advanced after drop, vacuum and checkpoint on " . $node->name); +} + +# if we turn hot_standby_feedback off on the replica that uses a slot, the +# master should no longer have anything holding down its catalog_xmin. Even +# though hot_standby_feedback is still enabled on the non-slot replica, it +# cannot set the master's catalog_xmin because it has no destination slot, +# it can only set xmin in its procarray entry. +$node_slot_replica->safe_psql('postgres', q[ALTER SYSTEM SET hot_standby_feedback = off;]); +# simplest way to force new hot standby feedback to be sent +$node_slot_replica->restart; +sleep(1); +# hot standby feedback should've cleared minimums +is($node_master->slot('slot_replica')->{'xmin'}, '', 'phys slot xmin null with hs_feedback off'); +is($node_master->slot('slot_replica')->{'catalog_xmin'}, '', 'phys slot catalog_xmin null with hs_feedback off'); +sync_up(); +# Everyone should now see the cleared catalog_xmin foreach my $node (@nodes) { command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m, - "pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name); + "pg_controldata's oldestCatalogXmin zero after turning off hs_feedback: " . $node->name); } foreach my $node (@nodes) diff --git a/src/test/recovery/t/012_logical_decoding_on_replica.pl b/src/test/recovery/t/012_logical_decoding_on_replica.pl new file mode 100644 index 0000000..6ed0abc --- /dev/null +++ b/src/test/recovery/t/012_logical_decoding_on_replica.pl @@ -0,0 +1,497 @@ +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 63; +use RecursiveCopy; +use File::Copy; +use Time::HiRes; + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--write-recovery-conf', '--slot=decoding_standby'); + +open(my $fh, "<", $backup_dir . "/recovery.conf") + or die "can't open recovery.conf"; + +my $found = 0; +while (my $line = <$fh>) +{ + chomp($line); + if ($line eq "primary_slot_name = 'decoding_standby'") + { + $found = 1; + last; + } +} +ok($found, "using physical slot for standby"); + +sub print_phys_xmin +{ + my $slot = $node_master->slot('decoding_standby'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# After slot creation, xmins must be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); + +$node_replica->start; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# with hot_standby_feedback off, xmin and catalog_xmin must still be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; +sleep(2); # ensure walreceiver feedback sent + +# If no slot on standby exists to hold down catalog_xmin it must follow xmin, +# (which is nextXid when no xacts are running on the standby). +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, "xmin not null"); +is($xmin, $catalog_xmin, "xmin and catalog_xmin equal"); + +# We need catalog_xmin advance to take effect on the master and be replayed +# on standby. +$node_master->safe_psql('postgres', 'CHECKPOINT'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# Create new slots on the replica, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +diag "creating slot standby_logical"; +my $start_time = [Time::HiRes::gettimeofday()]; +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or BAIL_OUT('cannot continue if slot replay fails'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +is($new_logical_xmin, '', "logical xmin null"); +isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null"); +cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); +isnt($new_physical_xmin, '', "physical xmin not null"); +# hot standby feedback should advance phys catalog_xmin now the standby's slot +# doesn't hold it down as far. +isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null"); +cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced"); + +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream catalog retention +######################################################### + +sub test_catalog_xmin_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/) + { + $oldestCatalogXmin = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + diag "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid, $oldestCatalogXmin); +} + +diag "Testing catalog_xmin retention with hs_feedback on"; +my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); +cmp_ok($oldestCatalogXmin, ">=", $oldestXid, "oldestCatalogXmin >= oldestXid"); +cmp_ok($oldestCatalogXmin, "<=", $new_logical_catalog_xmin,, "oldestCatalogXmin >= downstream catalog_xmin"); + +######################################################### +# Conflict with recovery: xmin cancels decoding session +######################################################### +# +# Start a transaction on the replica then perform work that should cause a +# recovery conflict with it. We'll check to make sure the client gets +# terminated with recovery conflict. +# +# Temporarily disable hs feedback so we can test recovery conflicts. +# It's fine to continue using a physical slot, the xmin should be +# cleared. We only check hot_standby_feedback when establishing +# a new decoding session so this approach circumvents the safeguards +# in place and forces a conflict. +# +# We'll also create an unrelated table so we can drop it later, making +# sure there are catalog changes to replay. +$node_master->safe_psql('testdb', 'CREATE TABLE dummy_table(blah integer)'); + +# Start pg_recvlogical before we turn off hs_feedback so its slot's +# catalog_xmin is above the downstream's catalog_threshold when we start +# decoding. +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off'); +$node_replica->reload; + +sleep(2); + +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "physical xmin null after hs_feedback disabled"); +is($catalog_xmin, '', "physical catalog_xmin null after hs_feedback disabled"); + +# Burn a bunch of XIDs and make sure upstream catalog_xmin is past what we'll +# need here +($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); +cmp_ok($oldestXid, ">", $new_logical_catalog_xmin, 'upstream oldestXid advanced past downstream catalog_xmin with hs_feedback off'); +cmp_ok($oldestCatalogXmin, "==", 0, "oldestCatalogXmin = InvalidTransactionId with hs_feedback off"); + +# Data-only changes, no effect on catalogs. We should replay them fine +# without a conflict, since they advance xmin but not catalog_xmin. +$node_master->safe_psql('testdb', 'DELETE FROM test_table'); +$node_master->safe_psql('testdb', 'VACUUM FULL test_table'); +$node_master->safe_psql('testdb', 'VACUUM;'); + +diag "waiting for catchup"; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +diag "pumping"; +$handle->pump; +diag "pumped"; + +ok($node_replica->slot('standby_logical')->{'active_pid'}, 'pg_recvlogical still connected to slot'); + +# If we change the catalogs, we'll get a conflict with recovery, but only if +# there's an active xact when decoding. +diag "dropping dummy_table"; +$node_master->safe_psql('testdb', 'DROP TABLE dummy_table;'); + +diag "waiting for catchup"; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +diag "caught up, waiting for client"; + +# client dies? +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server on recovery conflict"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict errmsg'); + like($stderr, qr/requires catalog rows that will be removed/, 'pg_recvlogical exited with catalog_xmin conflict'); +} +else +{ + fail("pg_recvlogical returned ok $return with stdout '$stdout', stderr '$stderr'"); +} + +##################################################################### +# Conflict with recovery: refuse to run without hot_standby_feedback +##################################################################### +# +# When hot_standby_feedback is off, new connections should fail. +# + +IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +is($?, 256, 'pg_recvlogical failed to connect to slot while hot_standby_feedback off'); +like($stderr, qr/hot_standby_feedback/, 'recvlogical recovery conflict errmsg'); + +##################################################################### +# Conflict with recovery: catalog_xmin advance invalidates idle slot +##################################################################### +# +# The slot that pg_recvlogical was using before it was terminated +# should not accept new connections now, since its catalog_xmin +# is lower than the replica's threshold. Even once we re-enable +# hot_standby_feedback, the removed tuples won't somehow come back. +# + +$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on'); +$node_replica->reload; +# make sure we see the effect promptly +$node_master->safe_psql('postgres', 'CHECKPOINT'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, 'xmin on phys slot non-null after re-establishing hot standby feedback'); +ok($catalog_xmin, 'catalog_xmin on phys slot non-null after re-establishing hot standby feedback') + or BAIL_OUT('further results meaningless if catalog_xmin not set on master'); + +IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +is($?, 256, 'pg_recvlogical failed to connect to slot with past catalog_xmin'); +like($stderr, qr/replication slot '.*' requires catalogs removed by master/, 'recvlogical recovery conflict errmsg'); + + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +($xmin, $catalog_xmin) = print_phys_xmin(); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_xmin, $new_catalog_xmin) = print_phys_xmin(); +# We're now back to the old behaviour of hot_standby_feedback +# reporting nextXid for both thresholds +ok($new_catalog_xmin, "physical catalog_xmin still non-null"); +cmp_ok($new_catalog_xmin, 'gt', $catalog_xmin, + 'catalog_xmin increased after slot drop'); +cmp_ok($new_catalog_xmin, 'eq', $new_xmin, + 'xmin and catalog_xmin equal after slot drop'); + + + + +################################################## +# Recovery: drop database drops idle slots +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB on the upstream if they're on the right DB, or not dropped if on +# another DB. + +diag "Testing dropdb when downstream slot is not in-use"; +diag "creating slot dodropslot"; +$start_time = [Time::HiRes::gettimeofday()]; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot') + or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); +diag "creating slot otherslot"; +$start_time = [Time::HiRes::gettimeofday()]; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot') + or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); +diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time)); + +is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped'); + + +################################################## +# Recovery: drop database drops in-use slots +################################################## + +# This time, have the slot in-use on the downstream DB when we drop it. +diag "Testing dropdb when downstream slot is in-use"; +$node_master->psql('postgres', q[CREATE DATABASE testdb2]); + +diag "creaitng slot dodropslot2"; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot']); +is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created'); + +# make sure the slot is in use +diag "starting pg_recvlogical"; +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +sleep(1); + +is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active') + or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'"); + +diag "pg_recvlogical backend pid is " . $node_replica->slot('dodropslot2')->{'active_pid'}; + +# Master doesn't know the replica's slot is busy so dropdb should succeed +$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]); +ok(1, 'dropdb finished'); + +while ($node_replica->slot('dodropslot2')->{'active_pid'}) +{ + sleep(1); + diag "waiting for walsender to exit"; +} + +diag "walsender exited, waiting for pg_recvlogical to exit"; + +# our client should've terminated in response to the walsender error +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db'); +} + +is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited'); + +# The slot should be dropped by recovery now +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers