There were conflicts again, so I rebased once more. Didn't do anything else.
-- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From e2f5cedfc35652f198f9ca3bca9a45572f8845fb Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:15 +0530 Subject: [PATCH v6 1/5] Allow logical decoding on standby. Allow a logical slot to be created on standby. Restrict its usage or its creation if wal_level on primary is less than logical. During slot creation, it's restart_lsn is set to the last replayed LSN. Effectively, a logical slot creation on standby waits for an xl_running_xact record to arrive from primary. Conflicting slots would be handled in next commits. Andres Freund and Amit Khandekar. --- src/backend/access/transam/xlog.c | 11 +++++ src/backend/replication/logical/decode.c | 22 ++++++++- src/backend/replication/logical/logical.c | 37 ++++++++------- src/backend/replication/slot.c | 57 +++++++++++++++-------- src/backend/replication/walsender.c | 10 ++-- src/include/access/xlog.h | 1 + 6 files changed, 98 insertions(+), 40 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index de2d4ee582..a666b4b935 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4967,6 +4967,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevel(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..3a072af75b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -187,11 +187,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on primary is reduced to less than logical, then we + * want to prevent existing logical slots from being used. + * Existing logical slots on standby get dropped when this WAL + * record is replayed; and further, slot creation fails when the + * wal level is not sufficient; but all these operations are not + * synchronized, so a logical slot may creep in while the wal_level + * is being reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..03463719f8 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -91,23 +91,22 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires " + "wal_level >= logical on master"))); + } } /* @@ -240,6 +239,12 @@ CreateInitDecodingContext(char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On standby, this check is also required while creating the slot. Check + * the comments in this function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1cec53d748..00aa95ba15 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,37 +1016,56 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be built + * using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base backup + * has to start replay at. */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + { + restart_lsn = GetXLogReplayRecPtr(NULL); + /* + * Replay pointer may point one past the end of the record. If that + * is a XLOG page boundary, it will not be a valid LSN for the + * start of a record, so bump it up past the page header. + */ + if (!XRecOffIsValid(restart_lsn)) + { + if (restart_lsn % XLOG_BLCKSZ != 0) + elog(ERROR, "invalid replay pointer"); + /* For the first page of a segment file, it's a long header */ + if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0) + restart_lsn += SizeOfXLogLongPHD; + else + restart_lsn += SizeOfXLogShortPHD; + } + } + else + restart_lsn = GetXLogInsertRecPtr(); + + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + if (!RecoveryInProgress() && SlotIsLogical(slot)) { XLogRecPtr flushptr; - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogFlush(flushptr); } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 76ec3c7dd0..9274fec10b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2819,10 +2819,12 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + flushPtr = (am_cascading_walsender ? + GetStandbyFlushRecPtr() : GetFlushRecPtr()); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 98b033fc20..6228779df6 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -298,6 +298,7 @@ extern Size XLOGShmemSize(void); extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevel(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void InitXLOGAccess(void); -- 2.20.1
>From a89d06fbca4b350192ad7895796783bf50a17757 Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:16 +0530 Subject: [PATCH v6 2/5] Add info in WAL records in preparation for logical slot conflict handling. When a WAL replay on standby indicates that a catalog table tuple is to be deleted by an xid that is greater than a logical slot's catalog_xmin, then that means the slot's catalog_xmin conflicts with the xid, and we need to handle the conflict. While subsequent commits will do the actual conflict handling, this commit adds a new field onCatalogTable in such WAL records, that is true for catalog tables, so as to arrange for conflict handling. Andres Freund. --- src/backend/access/gist/gist.c | 2 +- src/backend/access/gist/gistbuild.c | 2 +- src/backend/access/gist/gistutil.c | 4 ++-- src/backend/access/gist/gistxlog.c | 4 +++- src/backend/access/hash/hashinsert.c | 2 ++ src/backend/access/heap/heapam.c | 10 +++++++--- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/access/heap/visibilitymap.c | 2 +- src/backend/access/nbtree/nbtpage.c | 4 ++++ src/backend/access/spgist/spgvacuum.c | 8 ++++++++ src/backend/utils/cache/lsyscache.c | 16 ++++++++++++++++ src/include/access/gist_private.h | 6 +++--- src/include/access/gistxlog.h | 3 ++- src/include/access/hash_xlog.h | 1 + src/include/access/heapam_xlog.h | 8 ++++++-- src/include/access/nbtxlog.h | 2 ++ src/include/access/spgxlog.h | 1 + src/include/utils/lsyscache.h | 1 + src/include/utils/rel.h | 9 +++++++++ 19 files changed, 71 insertions(+), 16 deletions(-) diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 90c46e86a1..43ce813a50 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -342,7 +342,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, for (; ptr; ptr = ptr->next) { /* Allocate new page */ - ptr->buffer = gistNewBuffer(rel); + ptr->buffer = gistNewBuffer(heapRel, rel); GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); ptr->page = BufferGetPage(ptr->buffer); ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c index 671b5e9186..c4c5f41a2a 100644 --- a/src/backend/access/gist/gistbuild.c +++ b/src/backend/access/gist/gistbuild.c @@ -171,7 +171,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) buildstate.giststate->tempCxt = createTempGistContext(); /* initialize the root page */ - buffer = gistNewBuffer(index); + buffer = gistNewBuffer(heap, index); Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = BufferGetPage(buffer); diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index dd975b164c..62b4e9e8db 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -806,7 +806,7 @@ gistcheckpage(Relation rel, Buffer buf) * Caller is responsible for initializing the page by calling GISTInitBuffer */ Buffer -gistNewBuffer(Relation r) +gistNewBuffer(Relation heapRel, Relation r) { Buffer buffer; bool needLock; @@ -850,7 +850,7 @@ gistNewBuffer(Relation r) * page's deleteXid. */ if (XLogStandbyInfoActive() && RelationNeedsWAL(r)) - gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page)); + gistXLogPageReuse(heapRel, r, blkno, GistPageGetDeleteXid(page)); return buffer; } diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index d3f3a7b803..90bc4895b2 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -596,7 +596,8 @@ gistXLogPageDelete(Buffer buffer, FullTransactionId xid, * Write XLOG record about reuse of a deleted page. */ void -gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid) +gistXLogPageReuse(Relation heapRel, Relation rel, + BlockNumber blkno, FullTransactionId latestRemovedXid) { gistxlogPageReuse xlrec_reuse; @@ -607,6 +608,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemov */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(heapRel); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedFullXid = latestRemovedXid; diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index 2ebe671967..e6cef08330 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -17,6 +17,7 @@ #include "access/hash.h" #include "access/hash_xlog.h" +#include "catalog/catalog.h" #include "miscadmin.h" #include "storage/buf_internals.h" #include "storage/lwlock.h" @@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf) xl_hash_vacuum_one_page xlrec; XLogRecPtr recptr; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel); xlrec.latestRemovedXid = latestRemovedXid; xlrec.ntuples = ndeletable; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 29694b8aa4..bf6bfe69dc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7145,12 +7145,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel, * see comments for vacuum_log_cleanup_info(). */ XLogRecPtr -log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid) +log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid) { xl_heap_cleanup_info xlrec; XLogRecPtr recptr; - xlrec.node = rnode; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); + xlrec.node = rel->rd_node; xlrec.latestRemovedXid = latestRemovedXid; XLogBeginInsert(); @@ -7186,6 +7187,7 @@ log_heap_clean(Relation reln, Buffer buffer, /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.latestRemovedXid = latestRemovedXid; xlrec.nredirected = nredirected; xlrec.ndead = ndead; @@ -7236,6 +7238,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, /* nor when there are no tuples to freeze */ Assert(ntuples > 0); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.cutoff_xid = cutoff_xid; xlrec.ntuples = ntuples; @@ -7266,7 +7269,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, * heap_buffer, if necessary. */ XLogRecPtr -log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, +log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 vmflags) { xl_heap_visible xlrec; @@ -7276,6 +7279,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, Assert(BufferIsValid(heap_buffer)); Assert(BufferIsValid(vm_buffer)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec.cutoff_xid = cutoff_xid; xlrec.flags = vmflags; XLogBeginInsert(); diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 03c43efc32..b1014ef7b9 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -659,7 +659,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) * No need to write the record at all unless it contains a valid value */ if (TransactionIdIsValid(vacrelstats->latestRemovedXid)) - (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid); + (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid); } /* diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index 0a51678c40..89f8955f36 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -282,7 +282,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, if (XLogRecPtrIsInvalid(recptr)) { Assert(!InRecovery); - recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf, + recptr = log_heap_visible(rel, heapBuf, vmBuf, cutoff_xid, flags); /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 39b8f17f4b..22ea714382 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -32,6 +32,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "utils/lsyscache.h" #include "utils/snapmgr.h" static BTMetaPageData *_bt_getmeta(Relation rel, Buffer metabuf); @@ -760,6 +761,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; @@ -1210,6 +1212,8 @@ _bt_delitems_delete(Relation rel, Buffer buf, XLogRecPtr recptr; xl_btree_delete xlrec_delete; + xlrec_delete.onCatalogTable = + RelationIsAccessibleInLogicalDecoding(heapRel); xlrec_delete.latestRemovedXid = latestRemovedXid; xlrec_delete.ndeleted = ndeletable; diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index bd98707f3c..f97597dba1 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -27,6 +27,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/snapmgr.h" +#include "utils/lsyscache.h" /* Entry in pending-list of TIDs we need to revisit */ @@ -502,6 +503,13 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) OffsetNumber itemnos[MaxIndexTuplesPerPage]; spgxlogVacuumRedirect xlrec; + /* + * There is no chance of endless recursion even when we are doing catalog + * acceses here; because, spgist is never used for catalogs. Check + * comments in RelationIsAccessibleInLogicalDecoding(). + */ + xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid); + xlrec.nToPlaceholder = 0; xlrec.newestRedirectXid = InvalidTransactionId; diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 27bbb58f56..e663523af1 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -18,7 +18,9 @@ #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/table.h" #include "bootstrap/bootstrap.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_amop.h" @@ -1919,6 +1921,20 @@ get_rel_persistence(Oid relid) return result; } +bool +get_rel_logical_catalog(Oid relid) +{ + bool res; + Relation rel; + + /* assume previously locked */ + rel = table_open(relid, NoLock); + res = RelationIsAccessibleInLogicalDecoding(rel); + table_close(rel, NoLock); + + return res; +} + /* ---------- TRANSFORM CACHE ---------- */ diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 18f2b0d98e..eff9696b18 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -439,8 +439,8 @@ extern XLogRecPtr gistXLogPageDelete(Buffer buffer, FullTransactionId xid, Buffer parentBuffer, OffsetNumber downlinkOffset); -extern void gistXLogPageReuse(Relation rel, BlockNumber blkno, - FullTransactionId latestRemovedXid); +extern void gistXLogPageReuse(Relation heapRel, Relation rel, + BlockNumber blkno, FullTransactionId latestRemovedXid); extern XLogRecPtr gistXLogUpdate(Buffer buffer, OffsetNumber *todelete, int ntodelete, @@ -478,7 +478,7 @@ extern bool gistproperty(Oid index_oid, int attno, extern bool gistfitpage(IndexTuple *itvec, int len); extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace); extern void gistcheckpage(Relation rel, Buffer buf); -extern Buffer gistNewBuffer(Relation r); +extern Buffer gistNewBuffer(Relation heapRel, Relation r); extern bool gistPageRecyclable(Page page); extern void gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off); diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h index 55fc843d3a..390b8b65f6 100644 --- a/src/include/access/gistxlog.h +++ b/src/include/access/gistxlog.h @@ -48,9 +48,9 @@ typedef struct gistxlogPageUpdate */ typedef struct gistxlogDelete { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 ntodelete; /* number of deleted offsets */ - /* * In payload of blk 0 : todelete OffsetNumbers */ @@ -96,6 +96,7 @@ typedef struct gistxlogPageDelete */ typedef struct gistxlogPageReuse { + bool onCatalogTable; RelFileNode node; BlockNumber block; FullTransactionId latestRemovedFullXid; diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h index d1aa6daa40..b44892cc48 100644 --- a/src/include/access/hash_xlog.h +++ b/src/include/access/hash_xlog.h @@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page */ typedef struct xl_hash_vacuum_one_page { + bool onCatalogTable; TransactionId latestRemovedXid; int ntuples; diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 95d18cdb12..5edeb99808 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -237,6 +237,7 @@ typedef struct xl_heap_update */ typedef struct xl_heap_clean { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 nredirected; uint16 ndead; @@ -252,6 +253,7 @@ typedef struct xl_heap_clean */ typedef struct xl_heap_cleanup_info { + bool onCatalogTable; RelFileNode node; TransactionId latestRemovedXid; } xl_heap_cleanup_info; @@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple */ typedef struct xl_heap_freeze_page { + bool onCatalogTable; TransactionId cutoff_xid; uint16 ntuples; } xl_heap_freeze_page; @@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page */ typedef struct xl_heap_visible { + bool onCatalogTable; TransactionId cutoff_xid; uint8 flags; } xl_heap_visible; @@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record); extern const char *heap2_identify(uint8 info); extern void heap_xlog_logical_rewrite(XLogReaderState *r); -extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, +extern XLogRecPtr log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid); extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, @@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple, bool *totally_frozen); extern void heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *xlrec_tp); -extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer, +extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags); #endif /* HEAPAM_XLOG_H */ diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h index 347976c532..fe69dabca0 100644 --- a/src/include/access/nbtxlog.h +++ b/src/include/access/nbtxlog.h @@ -186,6 +186,7 @@ typedef struct xl_btree_dedup */ typedef struct xl_btree_delete { + bool onCatalogTable; TransactionId latestRemovedXid; uint32 ndeleted; @@ -203,6 +204,7 @@ typedef struct xl_btree_delete */ typedef struct xl_btree_reuse_page { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h index 63d3c63db2..83121c7266 100644 --- a/src/include/access/spgxlog.h +++ b/src/include/access/spgxlog.h @@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot typedef struct spgxlogVacuumRedirect { + bool onCatalogTable; uint16 nToPlaceholder; /* number of redirects to make placeholders */ OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */ TransactionId newestRedirectXid; /* newest XID of removed redirects */ diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 4e646c55e9..ab0d16c1f9 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -132,6 +132,7 @@ extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); +extern bool get_rel_logical_catalog(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes); extern bool get_typisdefined(Oid typid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 44ed04dd3f..b6dc5dfc7d 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -16,6 +16,7 @@ #include "access/tupdesc.h" #include "access/xlog.h" +#include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" @@ -316,6 +317,9 @@ typedef struct StdRdOptions * RelationIsUsedAsCatalogTable * Returns whether the relation should be treated as a catalog table * from the pov of logical decoding. Note multiple eval of argument! + * This definition should not invoke anything that performs catalog + * access; otherwise it may cause infinite recursion. Check the comments + * in RelationIsAccessibleInLogicalDecoding() for details. */ #define RelationIsUsedAsCatalogTable(relation) \ ((relation)->rd_options && \ @@ -580,6 +584,11 @@ typedef struct ViewOptions * RelationIsAccessibleInLogicalDecoding * True if we need to log enough information to have access via * decoding snapshot. + * This definition should not invoke anything that performs catalog + * access. Otherwise, e.g. logging a WAL entry for catalog relation may + * invoke this function, which will in turn do catalog access, which may + * in turn cause another similar WAL entry to be logged, leading to + * infinite recursion. */ #define RelationIsAccessibleInLogicalDecoding(relation) \ (XLogLogicalInfoActive() && \ -- 2.20.1
>From ee5c0944effd0634d82d90828c2f749e9abf5fba Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:16 +0530 Subject: [PATCH v6 3/5] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, drop such slots. Also do the same thing if wal_level on master is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_get_activity field: confl_logicalslot. Amit Khandekar, reviewed by Andres Freund. --- doc/src/sgml/monitoring.sgml | 6 + src/backend/access/gist/gistxlog.c | 4 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/heap/heapam.c | 13 +- src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 14 +++ src/backend/catalog/system_views.sql | 1 + src/backend/postmaster/pgstat.c | 4 + src/backend/replication/slot.c | 176 +++++++++++++++++++++++++++ src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 4 +- src/backend/tcop/postgres.c | 22 ++++ src/backend/utils/adt/pgstatfuncs.c | 16 +++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 1 + src/include/replication/slot.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 +- src/test/regress/expected/rules.out | 1 + 21 files changed, 278 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 7626987808..aa65478b31 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2742,6 +2742,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i <entry>Number of queries in this database that have been canceled due to old snapshots</entry> </row> + <row> + <entry><structfield>confl_logicalslot</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of queries in this database that have been canceled due to + logical slots</entry> + </row> <row> <entry><structfield>confl_bufferpin</structfield></entry> <entry><type>bigint</type></entry> diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 90bc4895b2..289176305a 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -414,6 +415,7 @@ gistRedoPageReuse(XLogReaderState *record) latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid); ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 3c60677662..5767890fa4 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bf6bfe69dc..440f444dc1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7703,7 +7703,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -7739,7 +7740,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -7835,7 +7837,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -7972,7 +7976,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 99d0914e72..09734b3e94 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -659,7 +659,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -935,6 +936,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 7be2291d07..0672f994ec 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a666b4b935..c679f2d767 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9942,6 +9942,20 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Drop logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or dropped existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId, + gettext_noop("Logical decoding on standby requires wal_level >= logical on master.")); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b8a3f46912..471a3e78f9 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -906,6 +906,7 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock FROM pg_database D; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index f9287b7942..0dc26dcb19 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4599,6 +4599,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -6223,6 +6224,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 00aa95ba15..3a18ef052d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/lock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -101,6 +102,7 @@ int max_replication_slots = 0; /* the maximum number of replication static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static void ReplicationSlotDropConflicting(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -637,6 +639,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockRelease(ReplicationSlotAllocationLock); } +/* + * Permanently drop a conflicting replication slot. If it's already active by + * another backend, send it a recovery conflict signal, and then try again. + */ +static void +ReplicationSlotDropConflicting(ReplicationSlot *slot) +{ + pid_t active_pid; + PGPROC *proc; + VirtualTransactionId vxid; + + ConditionVariablePrepareToSleep(&slot->active_cv); + while (1) + { + SpinLockAcquire(&slot->mutex); + active_pid = slot->active_pid; + if (active_pid == 0) + active_pid = slot->active_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + /* Drop the acquired slot, unless it is acquired by another backend */ + if (active_pid == MyProcPid) + { + elog(DEBUG1, "acquired conflicting slot, now dropping it"); + ReplicationSlotDropPtr(slot); + break; + } + + /* Send the other backend, a conflict recovery signal */ + + SetInvalidVirtualTransactionId(vxid); + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = BackendPidGetProcWithLock(active_pid); + if (proc) + GET_VXID_FROM_PGPROC(vxid, *proc); + LWLockRelease(ProcArrayLock); + + /* + * If coincidently that process finished, some other backend may + * acquire the slot again. So start over again. + * Note: Even if vxid.localTransactionId is invalid, we need to cancel + * that backend, because there is no other way to make it release the + * slot. So don't bother to validate vxid.localTransactionId. + */ + if (vxid.backendId == InvalidBackendId) + continue; + + elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot", + active_pid, vxid.backendId); + + CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + ConditionVariableSleep(&slot->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + + ConditionVariableCancelSleep(); +} + /* * Serialize the currently acquired slot's state from memory to disk, thereby * guaranteeing the current state will survive a crash. @@ -1083,6 +1143,122 @@ ReplicationSlotReserveWal(void) } } +/* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that rows older than xid might have been + * removed. Therefore we need to drop slots that depend on seeing those rows. + * When xid is invalid, drop all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be dropped. Also, when xid is invalid, a common 'conflict_reason' is + * provided for the error detail; otherwise it is NULL, in which case it is + * constructed out of the xid value. + */ +void +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, + char *conflict_reason) +{ + int i; + bool found_conflict = false; + + if (max_replication_slots <= 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* Invalid xid means caller is asking to drop all logical slots */ + if (!TransactionIdIsValid(xid)) + found_conflict = true; + else + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + StringInfoData conflict_str, conflict_xmins; + char *conflict_sentence = + gettext_noop("Slot conflicted with xid horizon which was being increased to"); + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + /* + * Build the conflict_str which will look like : + * "Slot conflicted with xid horizon which was being increased + * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)." + */ + initStringInfo(&conflict_xmins); + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin); + } + if (TransactionIdIsValid(slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d", + conflict_xmins.len > 0 ? ", " : "", + slot_catalog_xmin); + + if (conflict_xmins.len > 0) + { + initStringInfo(&conflict_str); + appendStringInfo(&conflict_str, "%s %d (%s).", + conflict_sentence, xid, conflict_xmins.data); + found_conflict = true; + conflict_reason = conflict_str.data; + } + } + + if (found_conflict) + { + NameData slotname; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + SpinLockRelease(&s->mutex); + + /* ReplicationSlotDropConflicting() will acquire the lock below */ + LWLockRelease(ReplicationSlotControlLock); + + ReplicationSlotDropConflicting(s); + + ereport(LOG, + (errmsg("dropped conflicting slot %s", NameStr(slotname)), + errdetail("%s", conflict_reason))); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + + /* * Flush all replication slots to disk. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index f45a619deb..db232306b8 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2670,6 +2670,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 65d3946386..68c438d0c5 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -558,6 +558,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 08f695a980..de64eee0e7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -296,7 +297,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 00c77b66c7..c3ad1d5988 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2445,6 +2445,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -2913,6 +2916,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be dropped, the + * requirement is for the signal receiver to release the slot, + * so that it could be dropped by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index cea01534a5..b4b1dbc177 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1471,6 +1471,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS) PG_RETURN_INT64(result); } +Datum +pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + int64 result; + PgStat_StatDBEntry *dbentry; + + if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL) + result = 0; + else + result = (int64) (dbentry->n_conflict_logicalslot); + + PG_RETURN_INT64(result); +} + Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS) { @@ -1514,6 +1529,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7fb574f9dc..12e7be1132 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5323,6 +5323,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '3434', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1a19921f80..1f4a000c48 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -604,6 +604,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 3e95b019b3..200720c589 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -204,4 +204,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 90607df106..f400069c78 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -39,6 +39,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index cfbe426e5a..7e0bc43ac4 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c7304611c3..5ed064627f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1845,6 +1845,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; -- 2.20.1
>From e5a1e2d6efb1a0a24291f7d0478b7d726c0da3c5 Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:16 +0530 Subject: [PATCH v6 4/5] New TAP test for logical decoding on standby. This test was originally written by Craig Ringer, then extended/modified by me, to test various slot conflict scenarios. Authors: Craig Ringer, Amit Khandekar. --- src/test/perl/PostgresNode.pm | 37 +++ .../t/018_standby_logical_decoding_xmins.pl | 272 ++++++++++++++++++ .../019_standby_logical_decoding_conflicts.pl | 216 ++++++++++++++ 3 files changed, 525 insertions(+) create mode 100644 src/test/recovery/t/018_standby_logical_decoding_xmins.pl create mode 100644 src/test/recovery/t/019_standby_logical_decoding_conflicts.pl diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 9575268bd7..3dbfcf56c8 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -2117,6 +2117,43 @@ sub pg_recvlogical_upto =pod +=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname) + +Create logical replication slot on given standby + +=cut + +sub create_logical_slot_on_standby +{ + my ($self, $master, $slot_name, $dbname) = @_; + my ($stdout, $stderr); + + my $handle; + + $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); + + # Once slot restart_lsn is created, the standby looks for xl_running_xacts + # WAL record from the restart_lsn onwards. So firstly, wait until the slot + # restart_lsn is evaluated. + + $self->poll_query_until( + 'postgres', qq[ + SELECT restart_lsn IS NOT NULL + FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name' + ]) or die "timed out waiting for logical slot to calculate its restart_lsn"; + + # Now arrange for the xl_running_xacts record for which pg_recvlogical + # is waiting. + $master->safe_psql('postgres', 'CHECKPOINT'); + + $handle->finish(); + + is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created') + or die "could not create slot" . $slot_name; +} + +=pod + =back =cut diff --git a/src/test/recovery/t/018_standby_logical_decoding_xmins.pl b/src/test/recovery/t/018_standby_logical_decoding_xmins.pl new file mode 100644 index 0000000000..d654d79526 --- /dev/null +++ b/src/test/recovery/t/018_standby_logical_decoding_xmins.pl @@ -0,0 +1,272 @@ +# logical decoding on a standby : ensure xmins are appropriately updated + +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 23; +use RecursiveCopy; +use File::Copy; +use Time::HiRes qw(usleep); + +my ($stdin, $stdout, $stderr, $ret, $handle, $slot); + +my $node_master = get_new_node('master'); +my $node_standby = get_new_node('standby'); + +# Name for the physical slot on master +my $master_slotname = 'master_physical'; +# Name for the logical slot on standby +my $standby_slotname = 'standby_logical'; + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + + +######################## +# Initialize master node +######################## + +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->safe_psql('postgres', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]); +my $backup_name = 'b1'; +$node_master->backup($backup_name); + +# After slot creation, xmins must be null +$slot = $node_master->slot($master_slotname); +is($slot->{'xmin'}, '', "xmin null"); +is($slot->{'catalog_xmin'}, '', "catalog_xmin null"); + +####################### +# Initialize slave node +####################### + +$node_standby->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$master_slotname']); +$node_standby->start; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################ +# xmin/catalog_xmin verification before and after standby-logical-slot creation. +################################ + +# With hot_standby_feedback off, xmin and catalog_xmin must still be null +$slot = $node_master->slot($master_slotname); +is($slot->{'xmin'}, '', "xmin null after standby join"); +is($slot->{'catalog_xmin'}, '', "catalog_xmin null after standby join"); + +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +]); +$node_standby->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +# Create new slots on the standby, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +$node_standby->create_logical_slot_on_standby($node_master, $standby_slotname, 'postgres'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# Now that slot is created on standby, xmin and catalog_xmin should be non NULL +# on both master and standby. But on master, the xmins will be updated only +# after hot standby feedback is received. +wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); + +$slot = $node_standby->slot($standby_slotname); +is($slot->{'xmin'}, '', "logical xmin null"); +isnt($slot->{'catalog_xmin'}, '', "logical catalog_xmin not null"); + + +################################ +# Standby logical slot should be able to fetch the table changes even when the +# table is dropped. +################################ + +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]); +$node_master->safe_psql('postgres', 'DROP TABLE test_table'); +$node_master->safe_psql('postgres', 'VACUUM'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# Should show the inserts even when the table is dropped on master +($ret, $stdout, $stderr) = $node_standby->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or die 'cannot continue if slot replay fails'; +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +$slot = $node_master->slot($master_slotname); +isnt($slot->{'xmin'}, '', "physical xmin not null"); +my $saved_physical_catalog_xmin = $slot->{'catalog_xmin'}; +isnt($saved_physical_catalog_xmin, '', "physical catalog_xmin not null"); + +$slot = $node_standby->slot($standby_slotname); +is($slot->{'xmin'}, '', "logical xmin null"); +my $saved_logical_catalog_xmin = $slot->{'catalog_xmin'}; +isnt($saved_logical_catalog_xmin, '', "logical catalog_xmin not null"); + + +################################ +# Catalog xmins should advance after standby logical slot fetches the changes. +################################ + +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]); +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)'); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('postgres', 'VACUUM'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +cmp_ok($node_standby->slot($standby_slotname)->{'catalog_xmin'}, "==", + $saved_logical_catalog_xmin, + "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_standby->psql('postgres', + qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +# logical slot catalog_xmin on slave should advance after pg_logical_slot_get_changes +wait_for_xmins($node_standby, $standby_slotname, + "catalog_xmin::varchar::int > ${saved_logical_catalog_xmin}"); +$slot = $node_standby->slot($standby_slotname); +my $new_logical_catalog_xmin = $slot->{'catalog_xmin'}; +is($slot->{'xmin'}, '', "logical xmin null"); + +# hot standby feedback should advance master's phys catalog_xmin now that the +# standby's slot doesn't hold it down as far. +# But master's phys catalog_xmin should not go past the slave's logical slot's +# catalog_xmin, even while master's phys xmin advances. +# +# First, make sure master's xmin is advanced. This happens on hot standby +# feedback. So this check for master's xmin advance also makes sure hot standby +# feedback has reached the master, which is required for the subsequent +# catalog_xmin test. +my $temp_phys_xmin = $node_master->slot($master_slotname)->{'xmin'}; +$node_master->safe_psql('postgres', 'SELECT txid_current()'); +wait_for_xmins($node_master, $master_slotname, + "xmin::varchar::int > ${temp_phys_xmin}"); +$slot = $node_master->slot($master_slotname); +# Now check that the master's phys catalog_xmin has advanced but not beyond +# standby's logical catalog_xmin +cmp_ok($slot->{'catalog_xmin'}, ">", $saved_physical_catalog_xmin, + 'upstream physical slot catalog_xmin has advanced with hs_feedback on'); +cmp_ok($slot->{'catalog_xmin'}, "==", $new_logical_catalog_xmin, + 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + + +###################### +# Upstream oldestXid should not go past downstream catalog_xmin +###################### + +# First burn some xids on the master in another DB, so we push the master's +# nextXid ahead. +foreach my $i (1 .. 100) +{ + $node_master->safe_psql('postgres', 'SELECT txid_current()'); +} + +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance +# past our needed xmin. The only way we have visibility into that is to force +# a checkpoint. +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0') +{ + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); +} +$node_master->safe_psql('postgres', 'CHECKPOINT'); +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; +my @checkpoint = split('\n', $stdout); +my $oldestXid = ''; +foreach my $line (@checkpoint) +{ + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } +} +die 'no oldestXID found in checkpoint' unless $oldestXid; + +cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'}, + 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); + +$node_master->safe_psql('postgres', + "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + +################################################## +# Drop slot +# Make sure standby slots are droppable, and properly clear the upstream's xmin +################################################## + +is($node_standby->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$standby_slotname')]); + +is($node_standby->slot($standby_slotname)->{'slot_type'}, '', 'slot on standby dropped manually'); + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. catalog_xmin should become NULL because we dropped +# the logical slot. +wait_for_xmins($node_master, $master_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); diff --git a/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl b/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl new file mode 100644 index 0000000000..d0c449338f --- /dev/null +++ b/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl @@ -0,0 +1,216 @@ +# logical decoding on a standby : test conflict recovery; and other tests that +# verify slots get dropped as expected. + +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 24; +use RecursiveCopy; +use File::Copy; +use Time::HiRes qw(usleep); + +my ($stdin, $stdout, $stderr, $ret, $handle, $slot); + +my $node_master = get_new_node('master'); +my $node_standby = get_new_node('standby'); + +# Name for the physical slot on master +my $master_slotname = 'master_physical'; + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + +# Create the required logical slots on standby. +sub create_logical_slots +{ + $node_standby->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb'); + $node_standby->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb'); +} + +# Acquire one of the standby logical slots created by create_logical_slots() +sub make_slot_active +{ + my $slot_user_handle; + + # make sure activeslot is in use + print "starting pg_recvlogical\n"; + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); + + while (!$node_standby->slot('activeslot')->{'active_pid'}) + { + usleep(100_000); + print "waiting for slot to become active\n"; + } + return $slot_user_handle; +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'dropslot'. +sub check_slots_dropped +{ + my ($slot_user_handle) = @_; + my $return; + + is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped'); + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); + + # our client should've terminated in response to the walsender error + eval { + $slot_user_handle->finish; + }; + $return = $?; + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n"); + if ($return) { + like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/must be dropped/, 'recvlogical error detail'); + } + + return 0; +} + + +######################## +# Initialize master node +######################## + +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]); +my $backup_name = 'b1'; +$node_master->backup($backup_name); + +####################### +# Initialize slave node +####################### + +$node_standby->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$master_slotname']); +$node_standby->start; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 1 : hot_standby_feedback off +################################################## + +create_logical_slots(); + +# One way to reproduce recovery conflict is to run VACUUM FULL with +# hot_standby_feedback turned off on slave. +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = off +]); +$node_standby->restart; +# ensure walreceiver feedback off by waiting for expected xmin and +# catalog_xmin on master. Both should be NULL since hs_feedback is off +wait_for_xmins($node_master, $master_slotname, + "xmin IS NULL AND catalog_xmin IS NULL"); + +$handle = make_slot_active(); + +# This should trigger the conflict +$node_master->safe_psql('testdb', 'VACUUM FULL'); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +check_slots_dropped($handle); + +# Turn hot_standby_feedback back on +$node_standby->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_standby->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +wait_for_xmins($node_master, $master_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +################################################## +# Recovery conflict: Drop conflicting slots, including in-use slots +# Scenario 2 : incorrect wal_level at master +################################################## + +create_logical_slots(); + +$handle = make_slot_active(); + +# Make master wal_level replica. This will trigger slot conflict. +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'replica' +]); +$node_master->restart; + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +check_slots_dropped($handle); + +# Restore master wal_level +$node_master->append_conf('postgresql.conf',q[ +wal_level = 'logical' +]); +$node_master->restart; +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + + +################################################## +# DROP DATABASE should drops it's slots, including active slots. +################################################## + +create_logical_slots(); +$handle = make_slot_active(); + +# Create a slot on a database that would not be dropped. This slot should not +# get dropped. +$node_standby->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush')); + +is($node_standby->safe_psql('postgres', + q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +check_slots_dropped($handle); + +is($node_standby->slot('otherslot')->{'slot_type'}, 'logical', + 'otherslot on standby not dropped'); + +# Cleanup : manually drop the slot that was not dropped. +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); -- 2.20.1
>From 20b254f9c2379833688a46581a8bebf07280e818 Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khande...@enterprisedb.com> Date: Thu, 16 Jan 2020 10:05:16 +0530 Subject: [PATCH v6 5/5] Doc changes describing details about logical decoding on standby. --- doc/src/sgml/logicaldecoding.sgml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index bce6d379bf..745c65ff02 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -248,6 +248,24 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot may consume changes from a slot at any given time. </para> + <para> + A logical replication slot can also be created on a hot standby. To prevent + <command>VACUUM</command> from removing required rows from the system + catalogs, <varname>hot_standby_feedback</varname> should be set on the + standby. In spite of that, if any required rows get removed, the slot gets + dropped. Existing logical slots on standby also get dropped if wal_level + on primary is reduced to less than 'logical'. + </para> + + <para> + For a logical slot to be created, it builds a historic snapshot, for which + information of all the currently running transactions is essential. On + primary, this information is available, but on standby, this information + has to be obtained from primary. So, slot creation may wait for some + activity to happen on the primary. If the primary is idle, creating a + logical slot on standby may take a noticeable time. + </para> + <caution> <para> Replication slots persist across crashes and know nothing about the state -- 2.20.1