On Sat, 6 Apr 2019 at 04:45, Andres Freund <and...@anarazel.de> wrote:
>
> Hi,
>
> Thanks for the new version of the patch.  Btw, could you add Craig as a
> co-author in the commit message of the next version of the patch? Don't
> want to forget him.

I had put his name in the earlier patch. But now I have made it easier to spot.

>
> On 2019-04-05 17:08:39 +0530, Amit Khandekar wrote:
> > Regarding the test result failures, I could see that when we drop a
> > logical replication slot at standby server, then the catalog_xmin of
> > physical replication slot becomes NULL, whereas the test expects it to
> > be equal to xmin; and that's the reason a couple of test scenarios are
> > failing :
> >
> > ok 33 - slot on standby dropped manually
> > Waiting for replication conn replica's replay_lsn to pass '0/31273E0' on 
> > master
> > done
> > not ok 34 - physical catalog_xmin still non-null
> > not ok 35 - xmin and catalog_xmin equal after slot drop
> > #   Failed test 'xmin and catalog_xmin equal after slot drop'
> > #   at t/016_logical_decoding_on_replica.pl line 272.
> > #          got:
> > #     expected: 2584
> >
> > I am not sure what is expected. What actually happens is : the
> > physical xlot catalog_xmin remains NULL initially, but becomes
> > non-NULL after the logical replication slot is created on standby.
>
> That seems like the correct behaviour to me - why would we still have a
> catalog xmin if there's no slot logical slot?

Yeah ... In the earlier implementation, maybe it was different, that's
why the catalog_xmin didn't become NULL. Not sure. Anyways, I have
changed this check. Details in the following sections.

>
>
> > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> > index 006446b..5785d2f 100644
> > --- a/src/backend/replication/slot.c
> > +++ b/src/backend/replication/slot.c
> > @@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void)
> >       }
> >  }
> >
> > +void
> > +ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
> > +{
> > +     int                     i;
> > +     bool            found_conflict = false;
> > +
> > +     if (max_replication_slots <= 0)
> > +             return;
> > +
> > +restart:
> > +     if (found_conflict)
> > +     {
> > +             CHECK_FOR_INTERRUPTS();
> > +             /*
> > +              * Wait awhile for them to die so that we avoid flooding an
> > +              * unresponsive backend when system is heavily loaded.
> > +              */
> > +             pg_usleep(100000);
> > +             found_conflict = false;
> > +     }
> > +
> > +     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> > +     for (i = 0; i < max_replication_slots; i++)
> > +     {
> > +             ReplicationSlot *s;
> > +             NameData        slotname;
> > +             TransactionId slot_xmin;
> > +             TransactionId slot_catalog_xmin;
> > +
> > +             s = &ReplicationSlotCtl->replication_slots[i];
> > +
> > +             /* cannot change while ReplicationSlotCtlLock is held */
> > +             if (!s->in_use)
> > +                     continue;
> > +
> > +             /* not our database, skip */
> > +             if (s->data.database != InvalidOid && s->data.database != 
> > dboid)
> > +                     continue;
> > +
> > +             SpinLockAcquire(&s->mutex);
> > +             slotname = s->data.name;
> > +             slot_xmin = s->data.xmin;
> > +             slot_catalog_xmin = s->data.catalog_xmin;
> > +             SpinLockRelease(&s->mutex);
> > +
> > +             if (TransactionIdIsValid(slot_xmin) && 
> > TransactionIdPrecedesOrEquals(slot_xmin, xid))
> > +             {
> > +                     found_conflict = true;
> > +
> > +                     ereport(WARNING,
> > +                                     (errmsg("slot %s w/ xmin %u conflicts 
> > with removed xid %u",
> > +                                                     NameStr(slotname), 
> > slot_xmin, xid)));
> > +             }
> > +
> > +             if (TransactionIdIsValid(slot_catalog_xmin) && 
> > TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
> > +             {
> > +                     found_conflict = true;
> > +
> > +                     ereport(WARNING,
> > +                                     (errmsg("slot %s w/ catalog xmin %u 
> > conflicts with removed xid %u",
> > +                                                     NameStr(slotname), 
> > slot_catalog_xmin, xid)));
> > +             }
> > +
> > +
> > +             if (found_conflict)
> > +             {
> > +                     elog(WARNING, "Dropping conflicting slot %s", 
> > s->data.name.data);
> > +                     LWLockRelease(ReplicationSlotControlLock);      /* 
> > avoid deadlock */
> > +                     ReplicationSlotDropPtr(s);
> > +
> > +                     /* We released the lock above; so re-scan the slots. 
> > */
> > +                     goto restart;
> > +             }
> > +     }
> >
> I think this should be refactored so that the two found_conflict cases
> set a 'reason' variable (perhaps an enum?) to the particular reason, and
> then only one warning should be emitted.  I also think that LOG might be
> more appropriate than WARNING - as confusing as that is, LOG is more
> severe than WARNING (see docs about log_min_messages).

What I have in mind is :

ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Dropping conflicting slot %s", s->data.name.data),
errdetail("%s, removed xid %d.", conflict_str, xid)));
where conflict_str is a dynamically generated string containing
something like : "slot xmin : 1234, slot catalog_xmin: 5678"
So for the user, the errdetail will look like :
"slot xmin: 1234, catalog_xmin: 5678, removed xid : 9012"
I think the user can figure out whether it was xmin or catalog_xmin or
both that conflicted with removed xid.
If we don't do this way, we may not be able to show in a single
message if both xmin and catalog_xmin are conflicting at the same
time.

Does this message look good to you, or you had in mind something quite
different ?

>
>
> > @@ -0,0 +1,386 @@
> > +# Demonstrate that logical can follow timeline switches.
> > +#
> > +# Test logical decoding on a standby.
> > +#
> > +use strict;
> > +use warnings;
> > +use 5.8.0;
> > +
> > +use PostgresNode;
> > +use TestLib;
> > +use Test::More tests => 55;
> > +use RecursiveCopy;
> > +use File::Copy;
> > +
> > +my ($stdin, $stdout, $stderr, $ret, $handle, $return);
> > +my $backup_name;
> > +
> > +# Initialize master node
> > +my $node_master = get_new_node('master');
> > +$node_master->init(allows_streaming => 1, has_archiving => 1);
> > +$node_master->append_conf('postgresql.conf', q{
> > +wal_level = 'logical'
> > +max_replication_slots = 4
> > +max_wal_senders = 4
> > +log_min_messages = 'debug2'
> > +log_error_verbosity = verbose
> > +# send status rapidly so we promptly advance xmin on master
> > +wal_receiver_status_interval = 1
> > +# very promptly terminate conflicting backends
> > +max_standby_streaming_delay = '2s'
> > +});
> > +$node_master->dump_info;
> > +$node_master->start;
> > +
> > +$node_master->psql('postgres', q[CREATE DATABASE testdb]);
> > +
> > +$node_master->safe_psql('testdb', q[SELECT * FROM 
> > pg_create_physical_replication_slot('decoding_standby');]);
> > +$backup_name = 'b1';
> > +my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
> > +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', 
> > $node_master->connstr('testdb'), '--slot=decoding_standby');
> > +
> > +sub print_phys_xmin
> > +{
> > +     my $slot = $node_master->slot('decoding_standby');
> > +     return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
> > +}
> > +
> > +my ($xmin, $catalog_xmin) = print_phys_xmin();
> > +# After slot creation, xmins must be null
> > +is($xmin, '', "xmin null");
> > +is($catalog_xmin, '', "catalog_xmin null");
> > +
> > +my $node_replica = get_new_node('replica');
> > +$node_replica->init_from_backup(
> > +     $node_master, $backup_name,
> > +     has_streaming => 1,
> > +     has_restoring => 1);
> > +$node_replica->append_conf('postgresql.conf',
> > +     q[primary_slot_name = 'decoding_standby']);
> > +
> > +$node_replica->start;
> > +$node_master->wait_for_catchup($node_replica, 'replay', 
> > $node_master->lsn('flush'));
> > +
> > +# with hot_standby_feedback off, xmin and catalog_xmin must still be null
> > +($xmin, $catalog_xmin) = print_phys_xmin();
> > +is($xmin, '', "xmin null after replica join");
> > +is($catalog_xmin, '', "catalog_xmin null after replica join");
> > +
> > +$node_replica->append_conf('postgresql.conf',q[
> > +hot_standby_feedback = on
> > +]);
> > +$node_replica->restart;
> > +sleep(2); # ensure walreceiver feedback sent
>
> Can we make this more robust? E.g. by waiting till pg_stat_replication
> shows the change on the primary? Because I can guarantee that this'll
> fail on slow buildfarm machines (say the valgrind animals).
>
>
>
>
> > +$node_master->wait_for_catchup($node_replica, 'replay', 
> > $node_master->lsn('flush'));
> > +sleep(2); # ensure walreceiver feedback sent
>
> Similar.

Ok. I have put a copy of the get_slot_xmins() function from
t/001_stream_rep.pl() into 016_logical_decoding_on_replica.pl. Renamed
it to wait_for_phys_mins(). And used this to wait for the
hot_standby_feedback change to propagate to master. This function
waits for the physical slot's xmin and catalog_xmin to get the right
values depending on whether there is a logical slot in standby and
whether hot_standby_feedback is on on standby.

I was not sure how pg_stat_replication could be used to identify about
hot_standby_feedback change reaching to master. So i did the above
way, which I think pretty much does what we want, I think.

Attached v4 patch only has the testcase change, and some minor cleanup
in the test file.




--
Thanks,
-Amit Khandekar
EnterpriseDB Corporation
The Postgres Database Company
From 1e3c68a644da4aa45ca72190cfa254ccd171f9e3 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Tue, 9 Apr 2019 22:06:25 +0530
Subject: [PATCH] Logical decoding on standby.

Author : Andres Freund.

Besides the above main changes, patch includes following :

1. Handle slot conflict recovery by dropping the conflicting slots.

-Amit Khandekar.

2. test/recovery/t/016_logical_decoding_on_replica.pl added.

Original author : Craig Ringer. few changes/additions from Amit Khandekar.
---
 src/backend/access/gist/gistxlog.c                 |   6 +-
 src/backend/access/hash/hash_xlog.c                |   3 +-
 src/backend/access/hash/hashinsert.c               |   2 +
 src/backend/access/heap/heapam.c                   |  23 +-
 src/backend/access/heap/vacuumlazy.c               |   2 +-
 src/backend/access/heap/visibilitymap.c            |   2 +-
 src/backend/access/nbtree/nbtpage.c                |   3 +
 src/backend/access/nbtree/nbtxlog.c                |   4 +-
 src/backend/access/spgist/spgvacuum.c              |   2 +
 src/backend/access/spgist/spgxlog.c                |   1 +
 src/backend/replication/logical/logical.c          |   2 +
 src/backend/replication/slot.c                     |  79 +++++
 src/backend/storage/ipc/standby.c                  |   7 +-
 src/backend/utils/cache/lsyscache.c                |  16 +
 src/include/access/gistxlog.h                      |   3 +-
 src/include/access/hash_xlog.h                     |   1 +
 src/include/access/heapam_xlog.h                   |   8 +-
 src/include/access/nbtxlog.h                       |   2 +
 src/include/access/spgxlog.h                       |   1 +
 src/include/replication/slot.h                     |   2 +
 src/include/storage/standby.h                      |   2 +-
 src/include/utils/lsyscache.h                      |   1 +
 src/include/utils/rel.h                            |   1 +
 .../recovery/t/016_logical_decoding_on_replica.pl  | 391 +++++++++++++++++++++
 24 files changed, 546 insertions(+), 18 deletions(-)
 create mode 100644 src/test/recovery/t/016_logical_decoding_on_replica.pl

diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 4fb1855..59a7910 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -342,7 +342,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -544,7 +545,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
-											xlrec->node);
+											xlrec->onCatalogTable, xlrec->node);
 	}
 }
 
@@ -736,6 +737,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d7b7098..00c3e0f 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index e17f017..b67e4e6 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
 #include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
+			xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
 			xlrec.latestRemovedXid = latestRemovedXid;
 			xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a05b6a0..bfbb9d3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7100,12 +7100,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
 {
 	xl_heap_cleanup_info xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.node = rnode;
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+	xlrec.node = rel->rd_node;
 	xlrec.latestRemovedXid = latestRemovedXid;
 
 	XLogBeginInsert();
@@ -7141,6 +7142,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
@@ -7191,6 +7193,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.ntuples = ntuples;
 
@@ -7221,7 +7224,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  * heap_buffer, if necessary.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
@@ -7231,6 +7234,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
@@ -7651,7 +7655,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
 	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, xlrec->node);
 
 	/*
 	 * Actual operation is a no-op. Record type exists to provide a means for
@@ -7687,7 +7692,8 @@ heap_xlog_clean(XLogReaderState *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
@@ -7783,7 +7789,9 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+											xlrec->onCatalogTable,
+											rnode);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -7920,7 +7928,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
 		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index c9d8312..fad08e0 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -475,7 +475,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 	 * No need to write the record at all unless it contains a valid value
 	 */
 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+		(void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
 }
 
 /*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 64dfe06..c5fdd64 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			if (XLogRecPtrIsInvalid(recptr))
 			{
 				Assert(!InRecovery);
-				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+				recptr = log_heap_visible(rel, heapBuf, vmBuf,
 										  cutoff_xid, flags);
 
 				/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 8ade165..745cbc5 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -31,6 +31,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
 
 static void _bt_cachemetadata(Relation rel, BTMetaPageData *input);
@@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1140,6 +1142,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.nitems = nitems;
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 0a85d8b..2617d55 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	/*
@@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable,
 											xlrec->node);
 	}
 }
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index b9311ce..ef4910f 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
 
 
 /* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
 	spgxlogVacuumRedirect xlrec;
 
+	xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
 	xlrec.nToPlaceholder = 0;
 	xlrec.newestRedirectXid = InvalidTransactionId;
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index ebe6ae8..800609c 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+												xldata->onCatalogTable,
 												node);
 		}
 	}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e5bc12..e8b7af4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -94,6 +94,7 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
+#ifdef NOT_ANYMORE
 	/* ----
 	 * TODO: We got to change that someday soon...
 	 *
@@ -111,6 +112,7 @@ CheckLogicalDecodingRequirements(void)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("logical decoding cannot be used while in recovery")));
+#endif
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 006446b..5785d2f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,85 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+void
+ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid)
+{
+	int			i;
+	bool		found_conflict = false;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	if (found_conflict)
+	{
+		CHECK_FOR_INTERRUPTS();
+		/*
+		 * Wait awhile for them to die so that we avoid flooding an
+		 * unresponsive backend when system is heavily loaded.
+		 */
+		pg_usleep(100000);
+		found_conflict = false;
+	}
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+		NameData	slotname;
+		TransactionId slot_xmin;
+		TransactionId slot_catalog_xmin;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* not our database, skip */
+		if (s->data.database != InvalidOid && s->data.database != dboid)
+			continue;
+
+		SpinLockAcquire(&s->mutex);
+		slotname = s->data.name;
+		slot_xmin = s->data.xmin;
+		slot_catalog_xmin = s->data.catalog_xmin;
+		SpinLockRelease(&s->mutex);
+
+		if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid))
+		{
+			found_conflict = true;
+
+			ereport(WARNING,
+					(errmsg("slot %s w/ xmin %u conflicts with removed xid %u",
+							NameStr(slotname), slot_xmin, xid)));
+		}
+
+		if (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+		{
+			found_conflict = true;
+
+			ereport(WARNING,
+					(errmsg("slot %s w/ catalog xmin %u conflicts with removed xid %u",
+							NameStr(slotname), slot_catalog_xmin, xid)));
+		}
+
+
+		if (found_conflict)
+		{
+			elog(WARNING, "Dropping conflicting slot %s", s->data.name.data);
+			LWLockRelease(ReplicationSlotControlLock);	/* avoid deadlock */
+			ReplicationSlotDropPtr(s);
+
+			/* We released the lock above; so re-scan the slots. */
+			goto restart;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 215f146..75dbdb9 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
 #include "access/xloginsert.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									bool onCatalogTable, RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+
+	if (onCatalogTable)
+		ResolveRecoveryConflictWithSlots(node.dbNode, latestRemovedXid);
 }
 
 void
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index b4f2d0f..f4da4bc 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
@@ -1893,6 +1895,20 @@ get_rel_persistence(Oid relid)
 	return result;
 }
 
+bool
+get_rel_logical_catalog(Oid relid)
+{
+	bool	res;
+	Relation rel;
+
+	/* assume previously locked */
+	rel = heap_open(relid, NoLock);
+	res = RelationIsAccessibleInLogicalDecoding(rel);
+	heap_close(rel, NoLock);
+
+	return res;
+}
+
 
 /*				---------- TRANSFORM CACHE ----------						 */
 
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 9990d97..887a377 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -47,10 +47,10 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+	bool		onCatalogTable;
 	RelFileNode hnode;			/* RelFileNode of the heap the index currently
 								 * points at */
 	uint16		ntodelete;		/* number of deleted offsets */
-
 	/*
 	 * In payload of blk 0 : todelete OffsetNumbers
 	 */
@@ -95,6 +95,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 53b682c..fd70b55 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 22cd13c..482c874 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_clean
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
  */
 typedef struct xl_heap_cleanup_info
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	TransactionId latestRemovedXid;
 } xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
  */
 typedef struct xl_heap_freeze_page
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint16		ntuples;
 } xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint8		flags;
 } xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
 					  TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 			   OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 						  bool *totally_frozen);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 						  xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 				 Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 9beccc8..f64a33c 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -126,6 +126,7 @@ typedef struct xl_btree_split
  */
 typedef struct xl_btree_delete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			nitems;
 
@@ -139,6 +140,7 @@ typedef struct xl_btree_delete
  */
 typedef struct xl_btree_reuse_page
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index ee8fc6f..d535441 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+	bool		onCatalogTable;
 	uint16		nToPlaceholder; /* number of redirects to make placeholders */
 	OffsetNumber firstPlaceholder;	/* first placeholder tuple to remove */
 	TransactionId newestRedirectXid;	/* newest XID of removed redirects */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8f1d66..4e0776a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern void ResolveRecoveryConflictWithSlots(Oid dboid, TransactionId xid);
+
 #endif							/* SLOT_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2361243..f276c7e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
-									RelFileNode node);
+									bool catalogTable, RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 9606d02..78bc639 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid);
 extern bool get_rel_relispartition(Oid relid);
 extern Oid	get_rel_tablespace(Oid relid);
 extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
 extern Oid	get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
 extern Oid	get_transform_tosql(Oid typid, Oid langid, List *trftypes);
 extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 89a7fbf..c36e228 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"
diff --git a/src/test/recovery/t/016_logical_decoding_on_replica.pl b/src/test/recovery/t/016_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..9ee79b0
--- /dev/null
+++ b/src/test/recovery/t/016_logical_decoding_on_replica.pl
@@ -0,0 +1,391 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 51;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=decoding_standby');
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_phys_mins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+
+	my $slotinfo = $node->slot($slotname);
+	return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
+}
+
+sub print_phys_xmin
+{
+	my $slot = $node_master->slot('decoding_standby');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# After slot creation, xmins must be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->append_conf('postgresql.conf',
+	q[primary_slot_name = 'decoding_standby']);
+
+$node_replica->start;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# with hot_standby_feedback off, xmin and catalog_xmin must still be null
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "xmin null after replica join");
+is($catalog_xmin, '', "catalog_xmin null after replica join");
+
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+	"xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+sub print_logical_xmin
+{
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('testdb', 'DROP TABLE test_table');
+$node_master->safe_psql('testdb', 'VACUUM');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+	or BAIL_OUT('cannot continue if slot replay fails');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null");
+cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys catalog_xmin now that the standby's
+# slot doesn't hold it down as far.
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced");
+
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream oldestXid retention
+#########################################################
+
+sub test_oldest_xid_retention()
+{
+	# First burn some xids on the master in another DB, so we push the master's
+	# nextXid ahead.
+	foreach my $i (1 .. 100)
+	{
+		$node_master->safe_psql('postgres', 'SELECT txid_current()');
+	}
+
+	# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+	# past our needed xmin. The only way we have visibility into that is to force
+	# a checkpoint.
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+	foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+	{
+		$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+	}
+	sleep(1);
+	$node_master->safe_psql('postgres', 'CHECKPOINT');
+	IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+		or die "pg_controldata failed with $?";
+	my @checkpoint = split('\n', $stdout);
+	my ($oldestXid, $nextXid) = ('', '', '');
+	foreach my $line (@checkpoint)
+	{
+		if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+		{
+			$nextXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+		{
+			$oldestXid = $1;
+		}
+	}
+	die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+	my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+	my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+	print "upstream oldestXid $oldestXid, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+	return ($oldestXid);
+}
+
+my ($oldestXid) = test_oldest_xid_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+########################################################################
+# Recovery conflict: conflicting replication slot should get dropped
+########################################################################
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_replica->restart;
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. Both should be NULL since hs_feedback is off
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+	"xmin IS NULL AND catalog_xmin IS NULL");
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+isnt($ret, 0, 'usage of slot failed as expected');
+like($stderr, qr/does not exist/, 'slot not found as expected');
+
+# Re-create the slot now that we know it is dropped
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+# Set hot_standby_feedback back on
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. Both should be non-NULL since hs_feedback is on and
+# there is a logical slot present on standby.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+	"xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. catalog_xmin should become NULL because we dropped
+# the logical slot.
+($xmin, $catalog_xmin) = wait_for_phys_mins($node_master, 'decoding_standby',
+	"xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery: drop database drops idle slots
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot');
+$node_replica->command_ok(['pg_recvlogical', '-v', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot');
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+##################################################
+# Recovery: drop database drops in-use slots
+##################################################
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+print "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+print "creating slot dodropslot2";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'],
+	'pg_recvlogical created slot test_decoding');
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+print "starting pg_recvlogical";
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+  or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+while ($node_replica->slot('dodropslot2')->{'active_pid'})
+{
+	sleep(1);
+	print "waiting for walsender to exit";
+}
+
+print "walsender exited, waiting for pg_recvlogical to exit";
+
+# our client should've terminated in response to the walsender error
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict');
+	like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db');
+}
+
+is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited');
+
+# The slot should be dropped by recovery now
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
-- 
2.1.4

Reply via email to