On Sat, 6 Apr 2019 at 04:45, Andres Freund <and...@anarazel.de> wrote: > > Hi, > > Thanks for the new version of the patch. Btw, could you add Craig as a > co-author in the commit message of the next version of the patch? Don't > want to forget him.
I had put his name in the earlier patch. But now I have made it easier to spot. > > On 2019-04-05 17:08:39 +0530, Amit Khandekar wrote: > > Regarding the test result failures, I could see that when we drop a > > logical replication slot at standby server, then the catalog_xmin of > > physical replication slot becomes NULL, whereas the test expects it to > > be equal to xmin; and that's the reason a couple of test scenarios are > > failing : > > > > ok 33 - slot on standby dropped manually > > Waiting for replication conn replica's replay_lsn to pass '0/31273E0' on > > master > > done > > not ok 34 - physical catalog_xmin still non-null > > not ok 35 - xmin and catalog_xmin equal after slot drop > > # Failed test 'xmin and catalog_xmin equal after slot drop' > > # at t/016_logical_decoding_on_replica.pl line 272. > > # got: > > # expected: 2584 > > > > I am not sure what is expected. What actually happens is : the > > physical xlot catalog_xmin remains NULL initially, but becomes > > non-NULL after the logical replication slot is created on standby. > > That seems like the correct behaviour to me - why would we still have a > catalog xmin if there's no slot logical slot? Yeah ... In the earlier implementation, maybe it was different, that's why the catalog_xmin didn't become NULL. Not sure. Anyways, I have changed this check. Details in the following sections. > > > > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c > > index 006446b..5785d2f 100644 > > --- a/src/backend/replication/slot.c > > +++ b/src/backend/replication/slot.c > > @@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void) > > } > > } > > > > +void > > +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid) > > +{ > > + int i; > > + bool found_conflict = false; > > + > > + if (max_replication_slots <= 0) > > + return; > > + > > +restart: > > + if (found_conflict) > > + { > > + CHECK_FOR_INTERRUPTS(); > > + /* > > + * Wait awhile for them to die so that we avoid flooding an > > + * unresponsive backend when system is heavily loaded. > > + */ > > + pg_usleep(100000); > > + found_conflict = false; > > + } > > + > > + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); > > + for (i = 0; i < max_replication_slots; i++) > > + { > > + ReplicationSlot *s; > > + NameData slotname; > > + TransactionId slot_xmin; > > + TransactionId slot_catalog_xmin; > > + > > + s = &ReplicationSlotCtl->replication_slots[i]; > > + > > + /* cannot change while ReplicationSlotCtlLock is held */ > > + if (!s->in_use) > > + continue; > > + > > + /* not our database, skip */ > > + if (s->data.database != InvalidOid && s->data.database != > > dboid) > > + continue; > > + > > + SpinLockAcquire(&s->mutex); > > + slotname = s->data.name; > > + slot_xmin = s->data.xmin; > > + slot_catalog_xmin = s->data.catalog_xmin; > > + SpinLockRelease(&s->mutex); > > + > > + if (TransactionIdIsValid(slot_xmin) && > > TransactionIdPrecedesOrEquals(slot_xmin, xid)) > > + { > > + found_conflict = true; > > + > > + ereport(WARNING, > > + (errmsg("slot %s w/ xmin %u conflicts > > with removed xid %u", > > + NameStr(slotname), > > slot_xmin, xid))); > > + } > > + > > + if (TransactionIdIsValid(slot_catalog_xmin) && > > TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) > > + { > > + found_conflict = true; > > + > > + ereport(WARNING, > > + (errmsg("slot %s w/ catalog xmin %u > > conflicts with removed xid %u", > > + NameStr(slotname), > > slot_catalog_xmin, xid))); > > + } > > + > > + > > + if (found_conflict) > > + { > > + elog(WARNING, "Dropping conflicting slot %s", > > s->data.name.data); > > + LWLockRelease(ReplicationSlotControlLock); /* > > avoid deadlock */ > > + ReplicationSlotDropPtr(s); > > + > > + /* We released the lock above; so re-scan the slots. > > */ > > + goto restart; > > + } > > + } > > > I think this should be refactored so that the two found_conflict cases > set a 'reason' variable (perhaps an enum?) to the particular reason, and > then only one warning should be emitted. I also think that LOG might be > more appropriate than WARNING - as confusing as that is, LOG is more > severe than WARNING (see docs about log_min_messages). What I have in mind is : ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Dropping conflicting slot %s", s->data.name.data), errdetail("%s, removed xid %d.", conflict_str, xid))); where conflict_str is a dynamically generated string containing something like : "slot xmin : 1234, slot catalog_xmin: 5678" So for the user, the errdetail will look like : "slot xmin: 1234, catalog_xmin: 5678, removed xid : 9012" I think the user can figure out whether it was xmin or catalog_xmin or both that conflicted with removed xid. If we don't do this way, we may not be able to show in a single message if both xmin and catalog_xmin are conflicting at the same time. Does this message look good to you, or you had in mind something quite different ? > > > > @@ -0,0 +1,386 @@ > > +# Demonstrate that logical can follow timeline switches. > > +# > > +# Test logical decoding on a standby. > > +# > > +use strict; > > +use warnings; > > +use 5.8.0; > > + > > +use PostgresNode; > > +use TestLib; > > +use Test::More tests => 55; > > +use RecursiveCopy; > > +use File::Copy; > > + > > +my ($stdin, $stdout, $stderr, $ret, $handle, $return); > > +my $backup_name; > > + > > +# Initialize master node > > +my $node_master = get_new_node('master'); > > +$node_master->init(allows_streaming => 1, has_archiving => 1); > > +$node_master->append_conf('postgresql.conf', q{ > > +wal_level = 'logical' > > +max_replication_slots = 4 > > +max_wal_senders = 4 > > +log_min_messages = 'debug2' > > +log_error_verbosity = verbose > > +# send status rapidly so we promptly advance xmin on master > > +wal_receiver_status_interval = 1 > > +# very promptly terminate conflicting backends > > +max_standby_streaming_delay = '2s' > > +}); > > +$node_master->dump_info; > > +$node_master->start; > > + > > +$node_master->psql('postgres', q[CREATE DATABASE testdb]); > > + > > +$node_master->safe_psql('testdb', q[SELECT * FROM > > pg_create_physical_replication_slot('decoding_standby');]); > > +$backup_name = 'b1'; > > +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; > > +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', > > $node_master->connstr('testdb'), '--slot=decoding_standby'); > > + > > +sub print_phys_xmin > > +{ > > + my $slot = $node_master->slot('decoding_standby'); > > + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); > > +} > > + > > +my ($xmin, $catalog_xmin) = print_phys_xmin(); > > +# After slot creation, xmins must be null > > +is($xmin, '', "xmin null"); > > +is($catalog_xmin, '', "catalog_xmin null"); > > + > > +my $node_replica = get_new_node('replica'); > > +$node_replica->init_from_backup( > > + $node_master, $backup_name, > > + has_streaming => 1, > > + has_restoring => 1); > > +$node_replica->append_conf('postgresql.conf', > > + q[primary_slot_name = 'decoding_standby']); > > + > > +$node_replica->start; > > +$node_master->wait_for_catchup($node_replica, 'replay', > > $node_master->lsn('flush')); > > + > > +# with hot_standby_feedback off, xmin and catalog_xmin must still be null > > +($xmin, $catalog_xmin) = print_phys_xmin(); > > +is($xmin, '', "xmin null after replica join"); > > +is($catalog_xmin, '', "catalog_xmin null after replica join"); > > + > > +$node_replica->append_conf('postgresql.conf',q[ > > +hot_standby_feedback = on > > +]); > > +$node_replica->restart; > > +sleep(2); # ensure walreceiver feedback sent > > Can we make this more robust? E.g. by waiting till pg_stat_replication > shows the change on the primary? Because I can guarantee that this'll > fail on slow buildfarm machines (say the valgrind animals). > > > > > > +$node_master->wait_for_catchup($node_replica, 'replay', > > $node_master->lsn('flush')); > > +sleep(2); # ensure walreceiver feedback sent > > Similar. Ok. I have put a copy of the get_slot_xmins() function from t/001_stream_rep.pl() into 016_logical_decoding_on_replica.pl. Renamed it to wait_for_phys_mins(). And used this to wait for the hot_standby_feedback change to propagate to master. This function waits for the physical slot's xmin and catalog_xmin to get the right values depending on whether there is a logical slot in standby and whether hot_standby_feedback is on on standby. I was not sure how pg_stat_replication could be used to identify about hot_standby_feedback change reaching to master. So i did the above way, which I think pretty much does what we want, I think. Attached v4 patch only has the testcase change, and some minor cleanup in the test file. -- Thanks, -Amit Khandekar EnterpriseDB Corporation The Postgres Database Company
From 1e3c68a644da4aa45ca72190cfa254ccd171f9e3 Mon Sep 17 00:00:00 2001 From: Amit Khandekar <amit.khandekar@enterprisedb.com> Date: Tue, 9 Apr 2019 22:06:25 +0530 Subject: [PATCH] Logical decoding on standby. Author : Andres Freund. Besides the above main changes, patch includes following : 1. Handle slot conflict recovery by dropping the conflicting slots. -Amit Khandekar. 2. test/recovery/t/016_logical_decoding_on_replica.pl added. Original author : Craig Ringer. few changes/additions from Amit Khandekar. --- src/backend/access/gist/gistxlog.c | 6 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/hash/hashinsert.c | 2 + src/backend/access/heap/heapam.c | 23 +- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/access/heap/visibilitymap.c | 2 +- src/backend/access/nbtree/nbtpage.c | 3 + src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgvacuum.c | 2 + src/backend/access/spgist/spgxlog.c | 1 + src/backend/replication/logical/logical.c | 2 + src/backend/replication/slot.c | 79 +++++ src/backend/storage/ipc/standby.c | 7 +- src/backend/utils/cache/lsyscache.c | 16 + src/include/access/gistxlog.h | 3 +- src/include/access/hash_xlog.h | 1 + src/include/access/heapam_xlog.h | 8 +- src/include/access/nbtxlog.h | 2 + src/include/access/spgxlog.h | 1 + src/include/replication/slot.h | 2 + src/include/storage/standby.h | 2 +- src/include/utils/lsyscache.h | 1 + src/include/utils/rel.h | 1 + .../recovery/t/016_logical_decoding_on_replica.pl | 391 +++++++++++++++++++++ 24 files changed, 546 insertions(+), 18 deletions(-) create mode 100644 src/test/recovery/t/016_logical_decoding_on_replica.pl diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 4fb1855..59a7910 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -342,7 +342,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -544,7 +545,7 @@ gistRedoPageReuse(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, - xlrec->node); + xlrec->onCatalogTable, xlrec->node); } } @@ -736,6 +737,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index d7b7098..00c3e0f 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c index e17f017..b67e4e6 100644 --- a/src/backend/access/hash/hashinsert.c +++ b/src/backend/access/hash/hashinsert.c @@ -17,6 +17,7 @@ #include "access/hash.h" #include "access/hash_xlog.h" +#include "catalog/catalog.h" #include "miscadmin.h" #include "utils/rel.h" #include "storage/lwlock.h" @@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf) xl_hash_vacuum_one_page xlrec; XLogRecPtr recptr; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel); xlrec.latestRemovedXid = latestRemovedXid; xlrec.ntuples = ndeletable; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a05b6a0..bfbb9d3 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7100,12 +7100,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel, * see comments for vacuum_log_cleanup_info(). */ XLogRecPtr -log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid) +log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid) { xl_heap_cleanup_info xlrec; XLogRecPtr recptr; - xlrec.node = rnode; + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); + xlrec.node = rel->rd_node; xlrec.latestRemovedXid = latestRemovedXid; XLogBeginInsert(); @@ -7141,6 +7142,7 @@ log_heap_clean(Relation reln, Buffer buffer, /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.latestRemovedXid = latestRemovedXid; xlrec.nredirected = nredirected; xlrec.ndead = ndead; @@ -7191,6 +7193,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, /* nor when there are no tuples to freeze */ Assert(ntuples > 0); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln); xlrec.cutoff_xid = cutoff_xid; xlrec.ntuples = ntuples; @@ -7221,7 +7224,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, * heap_buffer, if necessary. */ XLogRecPtr -log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, +log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 vmflags) { xl_heap_visible xlrec; @@ -7231,6 +7234,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, Assert(BufferIsValid(heap_buffer)); Assert(BufferIsValid(vm_buffer)); + xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel); xlrec.cutoff_xid = cutoff_xid; xlrec.flags = vmflags; XLogBeginInsert(); @@ -7651,7 +7655,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -7687,7 +7692,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -7783,7 +7789,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -7920,7 +7928,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index c9d8312..fad08e0 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -475,7 +475,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats) * No need to write the record at all unless it contains a valid value */ if (TransactionIdIsValid(vacrelstats->latestRemovedXid)) - (void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid); + (void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid); } /* diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index 64dfe06..c5fdd64 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, if (XLogRecPtrIsInvalid(recptr)) { Assert(!InRecovery); - recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf, + recptr = log_heap_visible(rel, heapBuf, vmBuf, cutoff_xid, flags); /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 8ade165..745cbc5 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -31,6 +31,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "utils/lsyscache.h" #include "utils/snapmgr.h" static void _bt_cachemetadata(Relation rel, BTMetaPageData *input); @@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX */ /* XLOG stuff */ + xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid); xlrec_reuse.node = rel->rd_node; xlrec_reuse.block = blkno; xlrec_reuse.latestRemovedXid = latestRemovedXid; @@ -1140,6 +1142,7 @@ _bt_delitems_delete(Relation rel, Buffer buf, XLogRecPtr recptr; xl_btree_delete xlrec_delete; + xlrec_delete.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid); xlrec_delete.latestRemovedXid = latestRemovedXid; xlrec_delete.nitems = nitems; diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 0a85d8b..2617d55 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index b9311ce..ef4910f 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -27,6 +27,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/snapmgr.h" +#include "utils/lsyscache.h" /* Entry in pending-list of TIDs we need to revisit */ @@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) OffsetNumber itemnos[MaxIndexTuplesPerPage]; spgxlogVacuumRedirect xlrec; + xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid); xlrec.nToPlaceholder = 0; xlrec.newestRedirectXid = InvalidTransactionId; diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index ebe6ae8..800609c 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6e5bc12..e8b7af4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -94,6 +94,7 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); +#ifdef NOT_ANYMORE /* ---- * TODO: We got to change that someday soon... * @@ -111,6 +112,7 @@ CheckLogicalDecodingRequirements(void) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("logical decoding cannot be used while in recovery"))); +#endif } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 006446b..5785d2f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void) } } +void +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid) +{ + int i; + bool found_conflict = false; + + if (max_replication_slots <= 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + /* + * Wait awhile for them to die so that we avoid flooding an + * unresponsive backend when system is heavily loaded. + */ + pg_usleep(100000); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + NameData slotname; + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + found_conflict = true; + + ereport(WARNING, + (errmsg("slot %s w/ xmin %u conflicts with removed xid %u", + NameStr(slotname), slot_xmin, xid))); + } + + if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + { + found_conflict = true; + + ereport(WARNING, + (errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u", + NameStr(slotname), slot_catalog_xmin, xid))); + } + + + if (found_conflict) + { + elog(WARNING, "Dropping conflicting slot %s", s->data.name.data); + LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ + ReplicationSlotDropPtr(s); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + + /* * Flush all replication slots to disk. * diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 215f146..75dbdb9 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; @@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode ResolveRecoveryConflictWithVirtualXIDs(backends, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + + if (onCatalogTable) + ResolveRecoveryConflictWithSlots(node.dbNode, latestRemovedXid); } void diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index b4f2d0f..f4da4bc 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -18,7 +18,9 @@ #include "access/hash.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/table.h" #include "bootstrap/bootstrap.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_amop.h" @@ -1893,6 +1895,20 @@ get_rel_persistence(Oid relid) return result; } +bool +get_rel_logical_catalog(Oid relid) +{ + bool res; + Relation rel; + + /* assume previously locked */ + rel = heap_open(relid, NoLock); + res = RelationIsAccessibleInLogicalDecoding(rel); + heap_close(rel, NoLock); + + return res; +} + /* ---------- TRANSFORM CACHE ---------- */ diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h index 9990d97..887a377 100644 --- a/src/include/access/gistxlog.h +++ b/src/include/access/gistxlog.h @@ -47,10 +47,10 @@ typedef struct gistxlogPageUpdate */ typedef struct gistxlogDelete { + bool onCatalogTable; RelFileNode hnode; /* RelFileNode of the heap the index currently * points at */ uint16 ntodelete; /* number of deleted offsets */ - /* * In payload of blk 0 : todelete OffsetNumbers */ @@ -95,6 +95,7 @@ typedef struct gistxlogPageDelete */ typedef struct gistxlogPageReuse { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h index 53b682c..fd70b55 100644 --- a/src/include/access/hash_xlog.h +++ b/src/include/access/hash_xlog.h @@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page */ typedef struct xl_hash_vacuum_one_page { + bool onCatalogTable; TransactionId latestRemovedXid; int ntuples; diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 22cd13c..482c874 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -237,6 +237,7 @@ typedef struct xl_heap_update */ typedef struct xl_heap_clean { + bool onCatalogTable; TransactionId latestRemovedXid; uint16 nredirected; uint16 ndead; @@ -252,6 +253,7 @@ typedef struct xl_heap_clean */ typedef struct xl_heap_cleanup_info { + bool onCatalogTable; RelFileNode node; TransactionId latestRemovedXid; } xl_heap_cleanup_info; @@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple */ typedef struct xl_heap_freeze_page { + bool onCatalogTable; TransactionId cutoff_xid; uint16 ntuples; } xl_heap_freeze_page; @@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page */ typedef struct xl_heap_visible { + bool onCatalogTable; TransactionId cutoff_xid; uint8 flags; } xl_heap_visible; @@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record); extern const char *heap2_identify(uint8 info); extern void heap_xlog_logical_rewrite(XLogReaderState *r); -extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, +extern XLogRecPtr log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid); extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, @@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple, bool *totally_frozen); extern void heap_execute_freeze_tuple(HeapTupleHeader tuple, xl_heap_freeze_tuple *xlrec_tp); -extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer, +extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags); #endif /* HEAPAM_XLOG_H */ diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h index 9beccc8..f64a33c 100644 --- a/src/include/access/nbtxlog.h +++ b/src/include/access/nbtxlog.h @@ -126,6 +126,7 @@ typedef struct xl_btree_split */ typedef struct xl_btree_delete { + bool onCatalogTable; TransactionId latestRemovedXid; int nitems; @@ -139,6 +140,7 @@ typedef struct xl_btree_delete */ typedef struct xl_btree_reuse_page { + bool onCatalogTable; RelFileNode node; BlockNumber block; TransactionId latestRemovedXid; diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h index ee8fc6f..d535441 100644 --- a/src/include/access/spgxlog.h +++ b/src/include/access/spgxlog.h @@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot typedef struct spgxlogVacuumRedirect { + bool onCatalogTable; uint16 nToPlaceholder; /* number of redirects to make placeholders */ OffsetNumber firstPlaceholder; /* first placeholder tuple to remove */ TransactionId newestRedirectXid; /* newest XID of removed redirects */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8f1d66..4e0776a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid); + #endif /* SLOT_H */ diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 2361243..f276c7e 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool catalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 9606d02..78bc639 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); +extern bool get_rel_logical_catalog(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes); extern bool get_typisdefined(Oid typid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 89a7fbf..c36e228 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -16,6 +16,7 @@ #include "access/tupdesc.h" #include "access/xlog.h" +#include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" diff --git a/src/test/recovery/t/016_logical_decoding_on_replica.pl b/src/test/recovery/t/016_logical_decoding_on_replica.pl new file mode 100644 index 0000000..9ee79b0 --- /dev/null +++ b/src/test/recovery/t/016_logical_decoding_on_replica.pl @@ -0,0 +1,391 @@ +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 51; +use RecursiveCopy; +use File::Copy; + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=decoding_standby'); + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state +sub wait_for_phys_mins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; + + my $slotinfo = $node->slot($slotname); + return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'}); +} + +sub print_phys_xmin +{ + my $slot = $node_master->slot('decoding_standby'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# After slot creation, xmins must be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->append_conf('postgresql.conf', + q[primary_slot_name = 'decoding_standby']); + +$node_replica->start; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# with hot_standby_feedback off, xmin and catalog_xmin must still be null +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "xmin null after replica join"); +is($catalog_xmin, '', "catalog_xmin null after replica join"); + +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. With hot_standby_feedback on, xmin should advance, +# but catalog_xmin should still remain NULL since there is no logical slot. +($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby', + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +# Create new slots on the replica, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); +$node_master->safe_psql('testdb', 'DROP TABLE test_table'); +$node_master->safe_psql('testdb', 'VACUUM'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +# Should show the inserts even when the table is dropped on master +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or BAIL_OUT('cannot continue if slot replay fails'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +is($new_logical_xmin, '', "logical xmin null"); +isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null"); +cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); +isnt($new_physical_xmin, '', "physical xmin not null"); +# hot standby feedback should advance phys catalog_xmin now that the standby's +# slot doesn't hold it down as far. +isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null"); +cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced"); + +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream oldestXid retention +######################################################### + +sub test_oldest_xid_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + print "upstream oldestXid $oldestXid, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid); +} + +my ($oldestXid) = test_oldest_xid_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); + +######################################################################## +# Recovery conflict: conflicting replication slot should get dropped +######################################################################## + +# One way to reproduce recovery conflict is to run VACUUM FULL with +# hot_standby_feedback turned off on slave. +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = off +]); +$node_replica->restart; +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. Both should be NULL since hs_feedback is off +($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby', + "xmin IS NULL AND catalog_xmin IS NULL"); +$node_master->safe_psql('testdb', 'VACUUM FULL'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +isnt($ret, 0, 'usage of slot failed as expected'); +like($stderr, qr/does not exist/, 'slot not found as expected'); + +# Re-create the slot now that we know it is dropped +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + +# Set hot_standby_feedback back on +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. Both should be non-NULL since hs_feedback is on and +# there is a logical slot present on standby. +($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby', + "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +($xmin, $catalog_xmin) = print_phys_xmin(); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +# ensure walreceiver feedback sent by waiting for expected xmin and +# catalog_xmin on master. catalog_xmin should become NULL because we dropped +# the logical slot. +($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby', + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + +################################################## +# Recovery: drop database drops idle slots +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB on the upstream if they're on the right DB, or not dropped if on +# another DB. + +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot'); +$node_replica->command_ok(['pg_recvlogical', '-v', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot'); + +is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped'); + + +################################################## +# Recovery: drop database drops in-use slots +################################################## + +# This time, have the slot in-use on the downstream DB when we drop it. +print "Testing dropdb when downstream slot is in-use"; +$node_master->psql('postgres', q[CREATE DATABASE testdb2]); + +print "creating slot dodropslot2"; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'], + 'pg_recvlogical created slot test_decoding'); +is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created'); + +# make sure the slot is in use +print "starting pg_recvlogical"; +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +sleep(1); + +is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active') + or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'"); + +# Master doesn't know the replica's slot is busy so dropdb should succeed +$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]); +ok(1, 'dropdb finished'); + +while ($node_replica->slot('dodropslot2')->{'active_pid'}) +{ + sleep(1); + print "waiting for walsender to exit"; +} + +print "walsender exited, waiting for pg_recvlogical to exit"; + +# our client should've terminated in response to the walsender error +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db'); +} + +is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited'); + +# The slot should be dropped by recovery now +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); -- 2.1.4