On 4 April 2017 at 22:32, Craig Ringer <cr...@2ndquadrant.com> wrote:
> Hi all
>
> Here's the final set of three patches on top of what's already committed.
>
> The first is catalog_xmin logging, which is unchanged from the prior post.
>
> The 2nd is support for conflict with recovery, with changes that
> should address Andres's concerns there.
>
> The 3rd actually enables decoding on standby. Unlike the prior
> version, no attempt is made to check the walsender configuration for
> slot use, etc. The ugly code to try to mitigate races is also removed.
> Instead, if wal_level is logical the catalog_xmin sent by
> hot_standby_feedback is now the same as the xmin if there's no local
> slot holding it down. So we're always sending a catalog_xmin in
> feedback and we should always expect to have a valid local
> oldestCatalogXmin once hot_standby_feedback kicks in. This makes the
> race in slot creation no worse than the existing race between
> hot_standby_feedback establishment and the first queries run on a
> downstream, albeit with more annoying consequences. Apps can still
> ensure a slot created on standby is guaranteed safe and conflict-free
> by having a slot on the master first.
>
> I'm much happier with this. I'm still fixing some issues in the tests
> for 03 and tidying them up, but 03 should allow 01 and 02 to be
> reviewed in their proper context now.

Dammit. Attached.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 9b8b1236eb32819430062031ff76750ed8bc1661 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:36:49 +0800
Subject: [PATCH 1/3] Log catalog_xmin advances before removing catalog tuples

Before advancing the effective catalog_xmin we use to remove old catalog
tuple versions, make sure it is written to WAL. This allows standbys
to know the oldest xid they can safely create a historic snapshot for.
They can then refuse to start decoding from a slot or raise a recovery
conflict.

The catalog_xmin advance is logged in a new xl_catalog_xmin_advance record,
emitted before vacuum or periodically by the bgwriter. WAL is only written if
the lowest catalog_xmin needed by any replication slot has advanced.
---
 src/backend/access/heap/rewriteheap.c       |   3 +-
 src/backend/access/rmgrdesc/xactdesc.c      |   9 ++
 src/backend/access/rmgrdesc/xlogdesc.c      |   3 +-
 src/backend/access/transam/varsup.c         |  15 ++++
 src/backend/access/transam/xact.c           |  36 ++++++++
 src/backend/access/transam/xlog.c           |  23 ++++-
 src/backend/postmaster/bgwriter.c           |   9 ++
 src/backend/replication/logical/decode.c    |  12 +++
 src/backend/replication/walreceiver.c       |   2 +-
 src/backend/replication/walsender.c         |   8 ++
 src/backend/storage/ipc/procarray.c         | 134 ++++++++++++++++++++++++++--
 src/bin/pg_controldata/pg_controldata.c     |   2 +
 src/include/access/transam.h                |   5 ++
 src/include/access/xact.h                   |  12 ++-
 src/include/catalog/pg_control.h            |   1 +
 src/include/storage/procarray.h             |   5 +-
 src/test/recovery/t/006_logical_decoding.pl |  90 +++++++++++++++++--
 17 files changed, 348 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..d1400ec 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use oldestCatalogXmin here */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 735f8c5..96ea163 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -297,6 +297,12 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+	{
+		xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+		appendStringInfo(buf, "catalog_xmin %u", xlrec->new_catalog_xmin);
+	}
 }
 
 const char *
@@ -324,6 +330,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_CATALOG_XMIN_ADV:
+			id = "CATALOG_XMIN";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5f07eb1..a66cfc6 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -47,7 +47,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; "
 						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
 						 "oldest/newest commit timestamp xid: %u/%u; "
-						 "oldest running xid %u; %s",
+						 "oldest running xid %u; oldest catalog xmin %u; %s",
 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
 						 checkpoint->ThisTimeLineID,
 						 checkpoint->PrevTimeLineID,
@@ -63,6 +63,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 checkpoint->oldestCommitTsXid,
 						 checkpoint->newestCommitTsXid,
 						 checkpoint->oldestActiveXid,
+						 checkpoint->oldestCatalogXmin,
 				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
 	}
 	else if (info == XLOG_NEXTOID)
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 5efbfbd..ffabf1c 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -414,6 +414,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	}
 }
 
+/*
+ * Set the global oldest catalog_xmin used to determine when tuples
+ * may be removed from catalogs and user-catalogs accessible from logical
+ * decoding.
+ *
+ * Only to be called from the startup process or from LogCurrentRunningXacts()
+ * which ensures the update is properly written to xlog first.
+ */
+void
+SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
 
 /*
  * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c8751c6..63453d7 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5652,6 +5652,42 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_CATALOG_XMIN_ADV)
+	{
+		xl_xact_catalog_xmin_advance *xlrec = (xl_xact_catalog_xmin_advance *) XLogRecGetData(record);
+
+		/*
+		 * Apply the new catalog_xmin limit immediately. New decoding sessions
+		 * will refuse to start if their slot is past it, and old ones will
+		 * notice when we signal them with a recovery conflict. There's no
+		 * effect on the catalogs themselves yet, so it's safe for backends
+		 * with older catalog_xmins to still exist.
+		 *
+		 * We don't have to take ProcArrayLock since only the startup process
+		 * is allowed to change oldestCatalogXmin when we're in recovery.
+		 *
+		 * Existing sessions are not notified and must check the safe xmin.
+		 */
+		SetOldestCatalogXmin(xlrec->new_catalog_xmin);
+
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * Record when we advance the catalog_xmin used for tuple removal
+ * so standbys find out before we remove catalog tuples they might
+ * need for logical decoding.
+ */
+XLogRecPtr
+XactLogCatalogXminUpdate(TransactionId new_catalog_xmin)
+{
+	xl_xact_catalog_xmin_advance xlrec;
+
+	xlrec.new_catalog_xmin = new_catalog_xmin;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfXactCatalogXminAdvance);
+	return XLogInsert(RM_XACT_ID, XLOG_XACT_CATALOG_XMIN_ADV);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..8d713e9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5021,6 +5021,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6611,6 +6612,12 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6628,6 +6635,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -8537,6 +8545,9 @@ CreateCheckPoint(int flags)
 	 */
 	InitXLogInsert();
 
+	/* Checkpoints are a handy time to update the effective catalog_xmin */
+	UpdateOldestCatalogXmin();
+
 	/*
 	 * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
 	 * (This is just pro forma, since in the present system structure there is
@@ -8726,6 +8737,10 @@ CreateCheckPoint(int flags)
 							 &checkPoint.oldestMulti,
 							 &checkPoint.oldestMultiDB);
 
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+
 	/*
 	 * Having constructed the checkpoint record, ensure all shmem disk buffers
 	 * and commit-log buffers are flushed to disk.
@@ -9632,6 +9647,8 @@ xlog_redo(XLogReaderState *record)
 		 */
 		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
+
 		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
 		 * record, the backup was canceled and the end-of-backup record will
@@ -9729,8 +9746,10 @@ xlog_redo(XLogReaderState *record)
 							   checkPoint.oldestMultiDB);
 		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
 								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+			SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
+
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index dcb4cf2..3bb5200 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -51,6 +51,7 @@
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "storage/spin.h"
@@ -333,6 +334,14 @@ BackgroundWriterMain(void)
 				last_snapshot_lsn = LogStandbySnapshot();
 				last_snapshot_ts = now;
 			}
+
+			/*
+			 * We can also advance the threshold used for catalog tuple
+			 * cleanup, rate-limited so we don't write it too often. The delay
+			 * slightly increases catalog bloat but reduces the volume of
+			 * catalog_xmin advance records written.
+			 */
+			UpdateOldestCatalogXmin();
 		}
 
 		/*
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26..b5084b9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -288,6 +288,18 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 */
 			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
 			break;
+		case XLOG_XACT_CATALOG_XMIN_ADV:
+
+			/*
+			 * The global catalog_xmin has been advanced. By the time we see
+			 * this in logical decoding it no longer matters, since it's
+			 * guaranteed that all later records will be consistent with the
+			 * advanced catalog_xmin, so we ignore it here. If we were running
+			 * on a standby and it applied a catalog xmin advance past our
+			 * needed catalog_xmin we would've already been terminated with a
+			 * conflict with standby error.
+			 */
+			break;
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index df93265..277f196 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 		xmin = GetOldestXmin(NULL,
 							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
 
-		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
 
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index dbb10c7..9f3a86b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1778,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..9e98af8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,12 @@ typedef struct ProcArrayStruct
 
 	/* oldest xmin of any replication slot */
 	TransactionId replication_slot_xmin;
-	/* oldest catalog xmin of any replication slot */
+
+	/*
+	 * Oldest catalog xmin of any replication slot
+	 *
+	 * See also ShmemVariableCache->oldestGlobalXmin
+	 */
 	TransactionId replication_slot_catalog_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -1306,6 +1311,9 @@ TransactionIdIsActive(TransactionId xid)
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
  */
 TransactionId
 GetOldestXmin(Relation rel, int flags)
@@ -1444,6 +1452,89 @@ GetOldestXmin(Relation rel, int flags)
 }
 
 /*
+ * Return true if ShmemVariableCache->oldestCatalogXmin needs to be updated
+ * to reflect an advance in procArray->replication_slot_catalog_xmin or
+ * it becoming newly set or unset.
+ *
+ */
+static bool
+CatalogXminNeedsUpdate(TransactionId vacuum_catalog_xmin, TransactionId slots_catalog_xmin)
+{
+	return (TransactionIdPrecedes(vacuum_catalog_xmin, slots_catalog_xmin)
+			|| (TransactionIdIsValid(vacuum_catalog_xmin) != TransactionIdIsValid(slots_catalog_xmin)));
+}
+
+/*
+ * If necessary, copy the current catalog_xmin needed by replication slots to
+ * the effective catalog_xmin used for dead tuple removal and write a WAL
+ * record recording the change.
+ *
+ * This allows standbys to know the oldest xid for which it is safe to create
+ * a historic snapshot for logical decoding. VACUUM or other cleanup may have
+ * removed catalog tuple versions needed to correctly decode transactions older
+ * than this threshold. Standbys can use this information to cancel conflicting
+ * decoding sessions and invalidate slots that need discarded information.
+ *
+ * (We can't use the transaction IDs in WAL records emitted by VACUUM etc for
+ * this, since they don't identify the relation as a catalog or not.  Nor can a
+ * standby look up the relcache to get the Relation for the affected
+ * relfilenode to check if it is a catalog. The standby would also have no way
+ * to know the oldest safe position at startup if it wasn't in the control
+ * file.)
+ */
+void
+UpdateOldestCatalogXmin(void)
+{
+	TransactionId vacuum_catalog_xmin;
+	TransactionId slots_catalog_xmin;
+
+	Assert(XLogInsertAllowed());
+
+	/*
+	 * It's most likely that replication_slot_catalog_xmin and
+	 * oldestCatalogXmin will be the same and no action is required, so do a
+	 * pre-check before doing expensive WAL writing and exclusive locking.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	vacuum_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+	slots_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	LWLockRelease(ProcArrayLock);
+
+	if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
+	{
+		/*
+		 * We must prevent a concurrent checkpoint, otherwise the catalog xmin
+		 * advance xlog record with the new value might be written before the
+		 * checkpoint but the checkpoint may still see the old
+		 * oldestCatalogXmin value.
+		 */
+		if (!LWLockConditionalAcquire(CheckpointLock, LW_SHARED))
+			/* Couldn't get checkpointer lock; will retry later */
+			return;
+
+		XactLogCatalogXminUpdate(slots_catalog_xmin);
+
+		/*
+		 * A concurrent updater could've changed the oldestCatalogXmin so we
+		 * need to re-check under ProcArrayLock before updating. The LWLock
+		 * provides a barrier.
+		 *
+		 * We must not re-read replication_slot_catalog_xmin even if it has
+		 * advanced, since we xlog'd the older value. If it advanced since, a
+		 * later run will xlog the new value and advance.
+		 */
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		vacuum_catalog_xmin = *((volatile TransactionId *) &ShmemVariableCache->oldestCatalogXmin);
+		if (CatalogXminNeedsUpdate(vacuum_catalog_xmin, slots_catalog_xmin))
+			ShmemVariableCache->oldestCatalogXmin = slots_catalog_xmin;
+		LWLockRelease(ProcArrayLock);
+
+		LWLockRelease(CheckpointLock);
+	}
+
+}
+
+/*
  * GetMaxSnapshotXidCount -- get max size for snapshot XID array
  *
  * We have to export this for use by snapmgr.c.
@@ -1700,7 +1791,7 @@ GetSnapshotData(Snapshot snapshot)
 
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1802,9 @@ GetSnapshotData(Snapshot snapshot)
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
 	 * the same result.
+	 *
+	 * If you change computation of RecentGlobalXmin here you may need to
+	 * change GetOldestXmin(...) as well.
 	 */
 	if (TransactionIdPrecedes(xmin, globalxmin))
 		globalxmin = xmin;
@@ -2041,12 +2135,16 @@ GetRunningTransactionData(void)
 	}
 
 	/*
-	 * It's important *not* to include the limits set by slots here because
+	 * It's important *not* to include the xmin set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
 	 * were to be included here the initial value could never increase because
-	 * of a circular dependency where slots only increase their limits when
-	 * running xacts increases oldestRunningXid and running xacts only
+	 * of a circular dependency where slots only increase their xmin limits
+	 * when running xacts increases oldestRunningXid and running xacts only
 	 * increases if slots do.
+	 *
+	 * We can safely report the catalog_xmin limit for replication slots here
+	 * because it's only used to advance oldestCatalogXmin. Slots'
+	 * catalog_xmin advance does not depend on it so there's no circularity.
 	 */
 
 	CurrentRunningXacts->xcnt = count - subcount;
@@ -2171,6 +2269,13 @@ GetOldestSafeDecodingTransactionId(void)
 	 * If there's already a slot pegging the xmin horizon, we can start with
 	 * that value, it's guaranteed to be safe since it's computed by this
 	 * routine initially and has been enforced since.
+	 *
+	 * We don't use ShmemVariableCache->oldestCatalogXmin here because another
+	 * backend may have already logged its intention to advance it to a higher
+	 * value (still <= replication_slot_catalog_xmin) and just be waiting on
+	 * ProcArrayLock to actually apply the change. On a standby
+	 * replication_slot_catalog_xmin is what the walreceiver will be sending
+	 * in hot_standby_feedback, not oldestCatalogXmin.
 	 */
 	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
@@ -2965,18 +3070,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmins, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ *
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs, outdated replicas sending
+ * feedback, etc.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		*retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		*needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..c2cb0a1 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -134,6 +134,10 @@ typedef struct VariableCacheData
 	 */
 	TransactionId latestCompletedXid;	/* newest XID that has committed or
 										 * aborted */
+	TransactionId oldestCatalogXmin;	/* oldestCatalogXmin guarantees that
+										 * no valid catalog tuples >= than it
+										 * are removed. That property is used
+										 * for logical decoding. */
 
 	/*
 	 * These fields are protected by CLogTruncationLock
@@ -179,6 +183,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 					  Oid oldest_datoid);
+extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin);
 extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5b37c05..6d18d18 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,7 +137,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_CATALOG_XMIN_ADV	0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -187,6 +187,13 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
+typedef struct xl_xact_catalog_xmin_advance
+{
+	TransactionId new_catalog_xmin;
+}	xl_xact_catalog_xmin_advance;
+
+#define SizeOfXactCatalogXminAdvance (offsetof(xl_xact_catalog_xmin_advance, new_catalog_xmin) + sizeof(TransactionId))
+
 /*
  * Commit and abort records can contain a lot of information. But a large
  * portion of the records won't need all possible pieces of information. So we
@@ -391,6 +398,9 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
 				   int xactflags, TransactionId twophase_xid);
+
+extern XLogRecPtr XactLogCatalogXminUpdate(TransactionId new_catalog_xmin);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..1fe89ae 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -45,6 +45,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin;	/* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..69a82d7 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
+
+extern void UpdateOldestCatalogXmin(void);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..80b976b 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,24 +7,79 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 44;
 
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1);
-$node_master->append_conf(
-		'postgresql.conf', qq(
+$node_master->append_conf('postgresql.conf', qq(
 wal_level = logical
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+log_min_messages = debug1
 ));
 $node_master->start;
-my $backup_name = 'master_backup';
 
+# Set up some changes before we make base backups
 $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
 
 $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
 
 $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
 
+# Launch two streaming replicas, one with and one without
+# physical replication slots. We'll use these for tests
+# involving interaction of logical and physical standby.
+#
+# Both backups are created with pg_basebackup.
+#
+my $backup_name = 'master_backup';
+$node_master->backup($backup_name);
+
+$node_master->safe_psql('postgres', q[SELECT pg_create_physical_replication_slot('slot_replica');]);
+my $node_slot_replica = get_new_node('slot_replica');
+$node_slot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_slot_replica->append_conf('recovery.conf', "primary_slot_name = 'slot_replica'");
+
+my $node_noslot_replica = get_new_node('noslot_replica');
+$node_noslot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+
+$node_slot_replica->start;
+$node_noslot_replica->start;
+
+sub restartpoint_standbys
+{
+	# Force restartpoints to update control files on replicas
+	$node_slot_replica->safe_psql('postgres', 'CHECKPOINT');
+	$node_noslot_replica->safe_psql('postgres', 'CHECKPOINT');
+}
+
+sub wait_standbys
+{
+	my $lsn = $node_master->lsn('insert');
+	$node_master->wait_for_catchup($node_noslot_replica, 'replay', $lsn);
+	$node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn);
+}
+
+# pg_basebackup doesn't copy replication slots
+is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef,
+	'logical slot test_slot on master not copied by pg_basebackup');
+
+# Make sure oldestCatalogXmin lands in the control file on master
+$node_master->safe_psql('postgres', 'VACUUM;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica);
+
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	# Master had an oldestCatalogXmin, so we must've inherited it via checkpoint
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero after start on " . $node->name);
+}
+
 # Basic decoding works
 my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
 is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
@@ -64,6 +119,9 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+# Create a second DB we'll use for testing dropping and accessing slots across
+# databases. This matters since logical slots are globally visible objects that
+# can only actually be used on one DB for most purposes.
 $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
 
 is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
@@ -96,9 +154,29 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero on " . $node->name);
+}
+
+# Dropping the slot must clear catalog_xmin
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+		"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name);
+}
 
-# done with the node
-$node_master->stop;
+foreach my $node (@nodes)
+{
+	$node->stop;
+}
-- 
2.5.5

From c62acf7789a4d1a3db666fa1b2f67ba69af1f237 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 3 Apr 2017 17:31:19 +0800
Subject: [PATCH 2/3] Support conflict with standby on logical walsender

---
 src/backend/access/heap/heapam.c          |   2 +-
 src/backend/access/transam/xact.c         |   6 +-
 src/backend/access/transam/xlog.c         |   3 +-
 src/backend/replication/logical/logical.c | 139 ++++++++++++++++++++++++++++++
 src/backend/replication/slot.c            |   4 +-
 src/backend/replication/walsender.c       |  14 +--
 src/backend/storage/ipc/procarray.c       |  51 +++++++++++
 src/backend/storage/ipc/procsignal.c      |   3 +
 src/backend/storage/ipc/standby.c         |   7 +-
 src/backend/tcop/postgres.c               |  43 ++++++---
 src/include/storage/procarray.h           |   2 +
 src/include/storage/procsignal.h          |   1 +
 src/include/storage/standby.h             |   3 +
 13 files changed, 245 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0c3e2b0..93bf143 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7273,7 +7273,7 @@ heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
  * ratchet forwards latestRemovedXid to the greatest one found.
  * This is used as the basis for generating Hot Standby conflicts, so
  * if a tuple was never visible then removing it should not conflict
- * with queries.
+ * with queries or logical decoding output plugin callbacks.
  */
 void
 HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 63453d7..48ca884 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5662,14 +5662,10 @@ xact_redo(XLogReaderState *record)
 		 * notice when we signal them with a recovery conflict. There's no
 		 * effect on the catalogs themselves yet, so it's safe for backends
 		 * with older catalog_xmins to still exist.
-		 *
-		 * We don't have to take ProcArrayLock since only the startup process
-		 * is allowed to change oldestCatalogXmin when we're in recovery.
-		 *
-		 * Existing sessions are not notified and must check the safe xmin.
 		 */
 		SetOldestCatalogXmin(xlrec->new_catalog_xmin);
 
+		ResolveRecoveryConflictWithLogicalDecoding(xlrec->new_catalog_xmin);
 	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8d713e9..a98601a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8546,7 +8546,8 @@ CreateCheckPoint(int flags)
 	InitXLogInsert();
 
 	/* Checkpoints are a handy time to update the effective catalog_xmin */
-	UpdateOldestCatalogXmin();
+	if (XLogInsertAllowed())
+		UpdateOldestCatalogXmin();
 
 	/*
 	 * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..282e330 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "pgstat.h"
 
 #include "access/xact.h"
 #include "access/xlog_internal.h"
@@ -38,11 +39,14 @@
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
+#include "replication/walreceiver.h"
 
+#include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 
 #include "utils/memutils.h"
+#include "utils/ps_status.h"
 
 /* data for errcontext callback */
 typedef struct LogicalErrorCallbackState
@@ -68,6 +72,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
+static void EnsureActiveLogicalSlotValid(void);
+
 /*
  * Make sure the current settings & environment are capable of doing logical
  * decoding.
@@ -218,6 +224,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlot *slot;
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
+	bool force_standby_snapshot;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -276,8 +283,21 @@ CreateInitDecodingContext(char *plugin,
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
+	/*
+	 * If this is the first slot created on the master we won't have a
+	 * persistent record of the oldest safe xid for historic snapshots yet.
+	 * Force one to be recorded so that when we go to replay from this slot we
+	 * know it's safe.
+	 */
+	force_standby_snapshot =
+		!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin);
+
 	LWLockRelease(ProcArrayLock);
 
+	/* Update ShmemVariableCache->oldestCatalogXmin */
+	if (force_standby_snapshot)
+		UpdateOldestCatalogXmin();
+
 	/*
 	 * tell the snapshot builder to only assemble snapshot once reaching the
 	 * running_xact's record with the respective xmin.
@@ -376,6 +396,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
+	EnsureActiveLogicalSlotValid();
+
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
 								 read_page, prepare_write, do_write);
@@ -963,3 +985,120 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
+
+/*
+ * Test to see if the active logical slot is usable.
+ */
+static void
+EnsureActiveLogicalSlotValid(void)
+{
+	TransactionId shmem_catalog_xmin;
+
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * A logical slot can become unusable if we're doing logical decoding on a
+	 * standby or using a slot created before we were promoted from standby
+	 * to master. If the master advanced its global catalog_xmin past the
+	 * threshold we need it could've removed catalog tuple versions that
+	 * we'll require to start decoding at our restart_lsn.
+	 */
+
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+
+	if (!TransactionIdIsValid(shmem_catalog_xmin) ||
+		TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("replication slot '%s' requires catalogs removed by master",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("need catalog_xmin %u, have oldestCatalogXmin %u",
+						   MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin)));
+}
+
+/*
+ * Scan to see if any clients are using replication slots that are below a
+ * newly-applied new catalog_xmin theshold and signal them to terminate with a
+ * recovery conflict.
+ */
+void
+ResolveRecoveryConflictWithLogicalDecoding(TransactionId new_catalog_xmin)
+{
+	int i;
+
+	if (!InHotStandby)
+		/* nobody can be actively using logical slots */
+		return;
+
+	/* Already applied new limit, can't have replayed later one yet */
+	Assert(ShmemVariableCache->oldestCatalogXmin == new_catalog_xmin);
+
+	/*
+	 * Find the first conflicting active slot and signal its owning backend
+	 * to exit. We'll be called repeatedly by the recovery code until there
+	 * are no more conflicts.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *slot;
+		pid_t active_pid;
+
+		slot = &ReplicationSlotCtl->replication_slots[i];
+
+		/*
+		 * Physical slots can have a catalog_xmin, but conflicts are the
+		 * problem of the leaf replica with the logical slot.
+		 */
+		if (!(slot->in_use && SlotIsLogical(slot)))
+			continue;
+
+		/*
+		 * We only care about the effective_catalog_xmin of active logical
+		 * slots. Anything else gets checked when a new decoding session tries
+		 * to start.
+		 */
+		 while (slot->in_use && slot->active_pid != 0 &&
+				TransactionIdIsValid(slot->effective_catalog_xmin) &&
+				(!TransactionIdIsValid(new_catalog_xmin) ||
+				 TransactionIdPrecedes(slot->effective_catalog_xmin, new_catalog_xmin)))
+		{
+			/*
+			 * We'll be sleeping, so release the control lock. New conflicting
+			 * backends cannot appear and if old ones go away that's what we
+			 * want, so release and re-acquire is OK here.
+			 */
+			active_pid = slot->active_pid;
+			LWLockRelease(ReplicationSlotControlLock);
+
+			if (WaitExceedsMaxStandbyDelay())
+			{
+				ereport(INFO,
+						(errmsg("terminating logical decoding session due to recovery conflict"),
+						 errdetail("Pid %u requires catalog_xmin %u for replication slot '%s' but the master has removed catalogs up to xid %u.",
+								   active_pid, slot->effective_catalog_xmin,
+								   NameStr(slot->data.name), new_catalog_xmin)));
+
+				/*
+				 * Signal the proc. If the slot is already released or even if
+				 * pid is re-used we don't care, backends are required to
+				 * tolerate spurious recovery signals.
+				 */
+				CancelLogicalDecodingSessionWithRecoveryConflict(active_pid);
+
+				/* Don't flood the system with signals */
+				pg_usleep(10000);
+			}
+
+			/*
+			 * We need to re-acquire the lock before re-checking the slot or
+			 * continuing the scan.
+			 */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		}
+
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6c5ec7a..57a3994 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -48,6 +48,7 @@
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/standby.h"
 #include "utils/builtins.h"
 
 /*
@@ -931,7 +932,8 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. We can't do that on a standby; there we must wait for the
+		 * bgwriter to get around to logging its periodic standby snapshot.
 		 *
 		 * That's not needed (or indeed helpful) for physical slots as they'll
 		 * start replay at the last logged checkpoint anyway. Instead return
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9f3a86b..ef63b63 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -212,7 +212,6 @@ static struct
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -2831,17 +2830,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
-/* SIGUSR1: set flag to send WAL records */
-static void
-WalSndXLogSendHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	latch_sigusr1_handler();
-
-	errno = save_errno;
-}
-
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
@@ -2875,7 +2863,7 @@ WalSndSignals(void)
 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
 	InitializeTimeouts();		/* establishes SIGALRM handler */
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9e98af8..05e3058 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2762,6 +2762,57 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
 }
 
 /*
+ * Notify a logical decoding session that it conflicts with newly set
+ * catalog_xmin from the master. We're about to start replaying WAL
+ * that will make its historic snapshot potentially unsafe by removing
+ * system tuples it might need.
+ */
+void
+CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid)
+{
+	ProcArrayStruct *arrayP = procArray;
+	int			index;
+	BackendId	backend_id = InvalidBackendId;
+
+	/*
+	 * We have to scan ProcArray to find the process and set a pending recovery
+	 * conflict even though we know the pid. At least we can get the BackendId
+	 * and avoid a ProcSignal scan by SendProcSignal.
+	 *
+	 * The pid might've gone away, in which case we got the desired
+	 * outcome anyway.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	for (index = 0; index < arrayP->numProcs; index++)
+	{
+		int			pgprocno = arrayP->pgprocnos[index];
+		volatile PGPROC *proc = &allProcs[pgprocno];
+
+		if (proc->pid == session_pid)
+		{
+			VirtualTransactionId procvxid;
+
+			GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+			proc->recoveryConflictPending = true;
+			backend_id = procvxid.backendId;
+			break;
+		}
+	}
+
+	LWLockRelease(ProcArrayLock);
+
+	/*
+	 * Kill the pid if it's still here. If not, that's what we
+	 * wanted so ignore any errors.
+	 */
+	if (backend_id != InvalidBackendId)
+		(void) SendProcSignal(session_pid,
+			PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN, backend_id);
+}
+
+/*
  * MinimumActiveBackends --- count backends (other than myself) that are
  *		in active transactions.  Return true if the count exceeds the
  *		minimum threshold passed.  This is used as a heuristic to decide if
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 4a21d55..16c2e1f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -273,6 +273,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_TABLESPACE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f93..f6106ca 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -29,6 +29,7 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "replication/slot.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -152,11 +153,13 @@ GetStandbyLimitTime(void)
 static int	standbyWait_us = STANDBY_INITIAL_WAIT_US;
 
 /*
- * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
+ * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs and
+ * ResolveRecoveryConflictWithLogicalDecoding.
+ *
  * We wait here for a while then return. If we decide we can't wait any
  * more then we return true, if we can wait some more return false.
  */
-static bool
+bool
 WaitExceedsMaxStandbyDelay(void)
 {
 	TimestampTz ltime;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a2282058..530dcbe 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2276,6 +2276,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN:
+			errdetail("Logical replication slot requires catalog rows that will be removed.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
 			errdetail("User was connected to a database that must be dropped.");
 			break;
@@ -2698,8 +2701,12 @@ SigHupHandler(SIGNAL_ARGS)
 /*
  * RecoveryConflictInterrupt: out-of-line portion of recovery conflict
  * handling following receipt of SIGUSR1. Designed to be similar to die()
- * and StatementCancelHandler(). Called only by a normal user backend
- * that begins a transaction during recovery.
+ * and StatementCancelHandler().
+ *
+ * Called by normal user backends running during recovery. Also used by the
+ * walsender to handle recovery conflicts with logical decoding, and by
+ * background workers that call CHECK_FOR_INTERRUPTS() and respect recovery
+ * conflicts.
  */
 void
 RecoveryConflictInterrupt(ProcSignalReason reason)
@@ -2781,6 +2788,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 
 				/* Intentional drop through to session cancel */
 
+			case PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN:
 			case PROCSIG_RECOVERY_CONFLICT_DATABASE:
 				RecoveryConflictPending = true;
 				ProcDiePending = true;
@@ -2795,12 +2803,18 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 		Assert(RecoveryConflictPending && (QueryCancelPending || ProcDiePending));
 
 		/*
-		 * All conflicts apart from database cause dynamic errors where the
-		 * command or transaction can be retried at a later point with some
-		 * potential for success. No need to reset this, since non-retryable
-		 * conflict errors are currently FATAL.
+		 * All conflicts apart from database and catalog_xmin cause dynamic
+		 * errors where the command or transaction can be retried at a later
+		 * point with some potential for success. No need to reset this, since
+		 * non-retryable conflict errors are currently FATAL.
+		 *
+		 * catalog_xmin is non-retryable because once we advance the
+		 * catalog_xmin threshold we might replay wal that removes
+		 * needed catalog tuples. The slot can't (re)start decoding
+		 * because its catalog_xmin cannot be satisifed.
 		 */
-		if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE)
+		if (reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
+			reason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
 			RecoveryConflictRetryable = false;
 	}
 
@@ -2855,11 +2869,20 @@ ProcessInterrupts(void)
 		}
 		else if (RecoveryConflictPending)
 		{
-			/* Currently there is only one non-retryable recovery conflict */
-			Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE);
+			int code;
+
+			Assert(RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_DATABASE ||
+				   RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN);
+
+			if (RecoveryConflictReason == PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN)
+				/* XXX more appropriate error code? */
+				code = ERRCODE_PROGRAM_LIMIT_EXCEEDED;
+			else
+				code = ERRCODE_DATABASE_DROPPED;
+
 			pgstat_report_recovery_conflict(RecoveryConflictReason);
 			ereport(FATAL,
-					(errcode(ERRCODE_DATABASE_DROPPED),
+					(errcode(code),
 			  errmsg("terminating connection due to conflict with recovery"),
 					 errdetail_recovery_conflict()));
 		}
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 69a82d7..231297d 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -112,6 +112,8 @@ extern int	CountUserBackends(Oid roleid);
 extern bool CountOtherDBBackends(Oid databaseId,
 					 int *nbackends, int *nprepared);
 
+extern void CancelLogicalDecodingSessionWithRecoveryConflict(pid_t session_pid);
+
 extern void XidCacheRemoveRunningXids(TransactionId xid,
 						  int nxids, const TransactionId *xids,
 						  TransactionId latestXid);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index d068dde..3a3ba72 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -40,6 +40,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
+	PROCSIG_RECOVERY_CONFLICT_CATALOG_XMIN,
 
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 3ecc446..b17ba6f 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -34,10 +34,13 @@ extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
 extern void ResolveRecoveryConflictWithLock(LOCKTAG locktag);
 extern void ResolveRecoveryConflictWithBufferPin(void);
+extern void ResolveRecoveryConflictWithLogicalDecoding(
+	TransactionId new_catalog_xmin);
 extern void CheckRecoveryConflictDeadlock(void);
 extern void StandbyDeadLockHandler(void);
 extern void StandbyTimeoutHandler(void);
 extern void StandbyLockTimeoutHandler(void);
+extern bool WaitExceedsMaxStandbyDelay(void);
 
 /*
  * Standby Rmgr (RM_STANDBY_ID)
-- 
2.5.5

From 854e0f586d6c4f28d02469717122a2997b4410fd Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 4 Apr 2017 11:50:30 +0800
Subject: [PATCH 3/3] Support decoding on standby

---
 src/backend/replication/logical/logical.c          |  61 ++-
 src/backend/replication/walreceiver.c              |  12 +
 src/test/recovery/t/006_logical_decoding.pl        |  70 ++-
 .../recovery/t/012_logical_decoding_on_replica.pl  | 497 +++++++++++++++++++++
 4 files changed, 600 insertions(+), 40 deletions(-)
 create mode 100644 src/test/recovery/t/012_logical_decoding_on_replica.pl

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 282e330..35d110f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,23 +93,40 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-			   errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*----
+		 * We really want to enforce that:
+		 * - we're connected to the primary via a replication slot
+		 * - hot_standby_feedback is enabled
+		 * - the user cannot turn hot_standby_feedback off while we have
+		 *   logical slots on the standby (it's PGC_SIGHUP)
+		 * - hot_standby_feedback has actually taken effect on the master
+		 *
+		 * ... but because the walreceiver doesn't use normal GUCs and may or
+		 * may not actually be running we can't reliably enforce those
+		 * conditions yet. We also have no way of knowing when hot standby
+		 * feedback has reached the master and locked in a catalog_xmin.
+		 *
+		 * So on standbys, slot creation or decoding from a slot may fail with
+		 * a recovery conflict. But we keep track of the master's true
+		 * catalog_xmin in WAL, so we'll never attempt to decode unsafely.
+		 *
+		 * Make a best effort sanity check anyway.
+		 *---
+		 */
+		if (!hot_standby_feedback)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires hot_standby_feedback = on")));
+
+		LWLockAcquire(ProcArrayLock, LW_SHARED);
+		if (!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("hot_standby_feedback has not yet taken effect")));
+		LWLockRelease(ProcArrayLock);
+	}
 }
 
 /*
@@ -224,7 +241,6 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlot *slot;
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
-	bool force_standby_snapshot;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -283,19 +299,16 @@ CreateInitDecodingContext(char *plugin,
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
+	LWLockRelease(ProcArrayLock);
+
 	/*
 	 * If this is the first slot created on the master we won't have a
 	 * persistent record of the oldest safe xid for historic snapshots yet.
 	 * Force one to be recorded so that when we go to replay from this slot we
 	 * know it's safe.
 	 */
-	force_standby_snapshot =
-		!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin);
-
-	LWLockRelease(ProcArrayLock);
-
-	/* Update ShmemVariableCache->oldestCatalogXmin */
-	if (force_standby_snapshot)
+	if (!RecoveryInProgress() &&
+		!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin))
 		UpdateOldestCatalogXmin();
 
 	/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 277f196..c0f6cec 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1239,6 +1239,18 @@ XLogWalRcvSendHSFeedback(bool immed)
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
 			xmin = slot_xmin;
+
+		/*
+		 * If there's no local catalog_xmin, report it as == xmin, so that
+		 * we lock in a catalog_xmin before we need to create any logical slots
+		 * on this standby. This won't add much catalog bloat until we create
+		 * local slots and catalog_xmin starts lagging behind xmin, but it will
+		 * cause the master to start logging
+		 * xl_xact_catalog_xmin_advance records we need for logical
+		 * decoding on standby.
+		 */
+		if (!TransactionIdIsValid(catalog_xmin) && XLogLogicalInfoActive())
+			catalog_xmin = xmin;
 	}
 	else
 	{
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 80b976b..88ddf00 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 44;
+use Test::More tests => 57;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -61,18 +61,22 @@ sub wait_standbys
 	$node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn);
 }
 
+sub sync_up
+{
+	$node_master->safe_psql('postgres', 'CHECKPOINT;');
+	wait_standbys();
+	restartpoint_standbys();
+	# for hot_standby_feedback wal_sender_status_interval
+	sleep(1.5);
+}
+
 # pg_basebackup doesn't copy replication slots
 is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef,
 	'logical slot test_slot on master not copied by pg_basebackup');
 
-# Make sure oldestCatalogXmin lands in the control file on master
-$node_master->safe_psql('postgres', 'VACUUM;');
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
 
 my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica);
-
-wait_standbys();
-restartpoint_standbys();
+sync_up();
 foreach my $node (@nodes)
 {
 	# Master had an oldestCatalogXmin, so we must've inherited it via checkpoint
@@ -154,26 +158,60 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
-wait_standbys();
-restartpoint_standbys();
+sync_up();
 foreach my $node (@nodes)
 {
 	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
 		"pg_controldata's oldestCatalogXmin is nonzero on " . $node->name);
 }
 
-# Dropping the slot must clear catalog_xmin
+# Drop the logical slot on the master; make sure feedback from standbys continues to peg
+# catalog_xmin.
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
-is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
-$node_master->safe_psql('postgres', 'VACUUM;');
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-wait_standbys();
-restartpoint_standbys();
+is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped on master');
+# Do a dummy xact so we can make sure catalog_xmin will advance, and we can see that
+# catalog_xmin will advance along with it.
+my $xmin = $node_master->safe_psql('postgres', 'BEGIN; CREATE TABLE dummy_xact(blah integer); SELECT txid_current(); COMMIT;');
+
+# even though the logical slot on the upstream is dropped, master's
+# oldestCatalogXmin is held down by hot standby feedback from the replicas.
+# Since the replicas have no logical slots of their own, it should've advanced
+# to be the same as the physical slot xmin for the slot replica.
+sync_up();
+# There are no transactions on the replicas so their xmin and catalog_xmin
+# will both be nextXid.
+cmp_ok($node_master->slot('slot_replica')->{'xmin'}, "eq", $xmin + 1,
+	'xmin advanced to latest master xid on slot_replica on master');
+cmp_ok($node_master->slot('slot_replica')->{'catalog_xmin'}, "le", $xmin + 1,
+	'xmin == catalog_xmin on phys slot held down by standby catalog_xmin');
+# Control files will still contain the xid, since there won't have been another
+# checkpoint to advance the nextXid reported by feedback and write it to the
+# control file.
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:$xmin$/m,
+		"pg_controldata's oldestCatalogXmin advanced after drop, vacuum and checkpoint on " . $node->name);
+}
+
+# if we turn hot_standby_feedback off on the replica that uses a slot, the
+# master should no longer have anything holding down its catalog_xmin. Even
+# though hot_standby_feedback is still enabled on the non-slot replica, it
+# cannot set the master's catalog_xmin because it has no destination slot,
+# it can only set xmin in its procarray entry.
+$node_slot_replica->safe_psql('postgres', q[ALTER SYSTEM SET hot_standby_feedback = off;]);
+# simplest way to force new hot standby feedback to be sent
+$node_slot_replica->restart;
+sleep(1);
+# hot standby feedback should've cleared minimums
+is($node_master->slot('slot_replica')->{'xmin'}, '', 'phys slot xmin null with hs_feedback off');
+is($node_master->slot('slot_replica')->{'catalog_xmin'}, '', 'phys slot catalog_xmin null with hs_feedback off');
+sync_up();
+# Everyone should now see the cleared catalog_xmin
 foreach my $node (@nodes)
 {
 	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
-		"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name);
+		"pg_controldata's oldestCatalogXmin zero after turning off hs_feedback: " . $node->name);
 }
 
 foreach my $node (@nodes)
diff --git a/src/test/recovery/t/012_logical_decoding_on_replica.pl b/src/test/recovery/t/012_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..6ed0abc
--- /dev/null
+++ b/src/test/recovery/t/012_logical_decoding_on_replica.pl
@@ -0,0 +1,497 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 63;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--write-recovery-conf', '--slot=decoding_standby');
+
+open(my $fh, "<", $backup_dir . "/recovery.conf")
+  or die "can't open recovery.conf";
+
+my $found = 0;
+while (my $line = <$fh>)
+{
+	chomp($line);
+	if ($line eq "primary_slot_name = 'decoding_standby'")
+	{
+		$found = 1;
+		last;
+	}
+}
+ok($found, "using physical slot for standby");
+
+sub print_phys_xmin
+{
+	my $slot = $node_master->slot('decoding_standby');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# After slot creation, xmins must be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# with hot_standby_feedback off, xmin and catalog_xmin must still be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+sleep(2); # ensure walreceiver feedback sent
+
+# If no slot on standby exists to hold down catalog_xmin it must follow xmin,
+# (which is nextXid when no xacts are running on the standby).
+($xmin, $catalog_xmin) = print_phys_xmin();
+ok($xmin, "xmin not null");
+is($xmin, $catalog_xmin, "xmin and catalog_xmin equal");
+
+# We need catalog_xmin advance to take effect on the master and be replayed
+# on standby.
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+diag "creating slot standby_logical";
+my $start_time = [Time::HiRes::gettimeofday()];
+is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+
+sub print_logical_xmin
+{
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+	or BAIL_OUT('cannot continue if slot replay fails');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null");
+cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys catalog_xmin now the standby's slot
+# doesn't hold it down as far.
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced");
+
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream catalog retention
+#########################################################
+
+sub test_catalog_xmin_retention()
+{
+	# First burn some xids on the master in another DB, so we push the master's
+	# nextXid ahead.
+	foreach my $i (1 .. 100)
+	{
+		$node_master->safe_psql('postgres', 'SELECT txid_current()');
+	}
+
+	# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+	# past our needed xmin. The only way we have visibility into that is to force
+	# a checkpoint.
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+	foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+	{
+		$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+	}
+	sleep(1);
+	$node_master->safe_psql('postgres', 'CHECKPOINT');
+	IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+		or die "pg_controldata failed with $?";
+	my @checkpoint = split('\n', $stdout);
+	my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', '');
+	foreach my $line (@checkpoint)
+	{
+		if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+		{
+			$nextXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+		{
+			$oldestXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/)
+		{
+			$oldestCatalogXmin = $1;
+		}
+	}
+	die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+	my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+	my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+	diag "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+	return ($oldestXid, $oldestCatalogXmin);
+}
+
+diag "Testing catalog_xmin retention with hs_feedback on";
+my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+cmp_ok($oldestCatalogXmin, ">=", $oldestXid, "oldestCatalogXmin >= oldestXid");
+cmp_ok($oldestCatalogXmin, "<=", $new_logical_catalog_xmin,, "oldestCatalogXmin >= downstream catalog_xmin");
+
+#########################################################
+# Conflict with recovery: xmin cancels decoding session
+#########################################################
+#
+# Start a transaction on the replica then perform work that should cause a
+# recovery conflict with it. We'll check to make sure the client gets
+# terminated with recovery conflict.
+#
+# Temporarily disable hs feedback so we can test recovery conflicts.
+# It's fine to continue using a physical slot, the xmin should be
+# cleared. We only check hot_standby_feedback when establishing
+# a new decoding session so this approach circumvents the safeguards
+# in place and forces a conflict.
+#
+# We'll also create an unrelated table so we can drop it later, making
+# sure there are catalog changes to replay.
+$node_master->safe_psql('testdb', 'CREATE TABLE dummy_table(blah integer)');
+
+# Start pg_recvlogical before we turn off hs_feedback so its slot's
+# catalog_xmin is above the downstream's catalog_threshold when we start
+# decoding.
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+
+$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off');
+$node_replica->reload;
+
+sleep(2);
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "physical xmin null after hs_feedback disabled");
+is($catalog_xmin, '', "physical catalog_xmin null after hs_feedback disabled");
+
+# Burn a bunch of XIDs and make sure upstream catalog_xmin is past what we'll
+# need here
+($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention();
+cmp_ok($oldestXid, ">", $new_logical_catalog_xmin, 'upstream oldestXid advanced past downstream catalog_xmin with hs_feedback off');
+cmp_ok($oldestCatalogXmin, "==", 0, "oldestCatalogXmin = InvalidTransactionId with hs_feedback off");
+
+# Data-only changes, no effect on catalogs. We should replay them fine
+# without a conflict, since they advance xmin but not catalog_xmin.
+$node_master->safe_psql('testdb', 'DELETE FROM test_table');
+$node_master->safe_psql('testdb', 'VACUUM FULL test_table');
+$node_master->safe_psql('testdb', 'VACUUM;');
+
+diag "waiting for catchup";
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+diag "pumping";
+$handle->pump;
+diag "pumped";
+
+ok($node_replica->slot('standby_logical')->{'active_pid'}, 'pg_recvlogical still connected to slot');
+
+# If we change the catalogs, we'll get a conflict with recovery, but only if
+# there's an active xact when decoding.
+diag "dropping dummy_table";
+$node_master->safe_psql('testdb', 'DROP TABLE dummy_table;');
+
+diag "waiting for catchup";
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+diag "caught up, waiting for client";
+
+# client dies?
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server on recovery conflict");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict errmsg');
+	like($stderr, qr/requires catalog rows that will be removed/, 'pg_recvlogical exited with catalog_xmin conflict');
+}
+else
+{
+	fail("pg_recvlogical returned ok $return with stdout '$stdout', stderr '$stderr'");
+}
+
+#####################################################################
+# Conflict with recovery: refuse to run without hot_standby_feedback
+#####################################################################
+#
+# When hot_standby_feedback is off, new connections should fail.
+#
+
+IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+is($?, 256, 'pg_recvlogical failed to connect to slot while hot_standby_feedback off');
+like($stderr, qr/hot_standby_feedback/, 'recvlogical recovery conflict errmsg');
+
+#####################################################################
+# Conflict with recovery: catalog_xmin advance invalidates idle slot
+#####################################################################
+#
+# The slot that pg_recvlogical was using before it was terminated
+# should not accept new connections now, since its catalog_xmin
+# is lower than the replica's threshold. Even once we re-enable
+# hot_standby_feedback, the removed tuples won't somehow come back.
+#
+
+$node_replica->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on');
+$node_replica->reload;
+# make sure we see the effect promptly
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2);
+($xmin, $catalog_xmin) = print_phys_xmin();
+ok($xmin, 'xmin on phys slot non-null after re-establishing hot standby feedback');
+ok($catalog_xmin, 'catalog_xmin on phys slot non-null after re-establishing hot standby feedback')
+	or BAIL_OUT('further results meaningless if catalog_xmin not set on master');
+
+IPC::Run::run(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'standby_logical', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+is($?, 256, 'pg_recvlogical failed to connect to slot with past catalog_xmin');
+like($stderr, qr/replication slot '.*' requires catalogs removed by master/, 'recvlogical recovery conflict errmsg');
+
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_xmin, $new_catalog_xmin) = print_phys_xmin();
+# We're now back to the old behaviour of hot_standby_feedback
+# reporting nextXid for both thresholds
+ok($new_catalog_xmin, "physical catalog_xmin still non-null");
+cmp_ok($new_catalog_xmin, 'gt', $catalog_xmin,
+	'catalog_xmin increased after slot drop');
+cmp_ok($new_catalog_xmin, 'eq', $new_xmin,
+	'xmin and catalog_xmin equal after slot drop');
+
+
+
+
+##################################################
+# Recovery: drop database drops idle slots
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB on the upstream if they're on the right DB, or not dropped if on
+# another DB.
+
+diag "Testing dropdb when downstream slot is not in-use";
+diag "creating slot dodropslot";
+$start_time = [Time::HiRes::gettimeofday()];
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot')
+	or BAIL_OUT('slot creation failed, subsequent results would be meaningless');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+diag "creating slot otherslot";
+$start_time = [Time::HiRes::gettimeofday()];
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot')
+	or BAIL_OUT('slot creation failed, subsequent results would be meaningless');
+diag sprintf("Creation took %.2d seconds", Time::HiRes::tv_interval($start_time));
+
+is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped');
+
+
+##################################################
+# Recovery: drop database drops in-use slots
+##################################################
+
+# This time, have the slot in-use on the downstream DB when we drop it.
+diag "Testing dropdb when downstream slot is in-use";
+$node_master->psql('postgres', q[CREATE DATABASE testdb2]);
+
+diag "creaitng slot dodropslot2";
+$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot']);
+is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created');
+
+# make sure the slot is in use
+diag "starting pg_recvlogical";
+$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+sleep(1);
+
+is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active')
+  or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'");
+
+diag "pg_recvlogical backend pid is " . $node_replica->slot('dodropslot2')->{'active_pid'};
+
+# Master doesn't know the replica's slot is busy so dropdb should succeed
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]);
+ok(1, 'dropdb finished');
+
+while ($node_replica->slot('dodropslot2')->{'active_pid'})
+{
+	sleep(1);
+	diag "waiting for walsender to exit";
+}
+
+diag "walsender exited, waiting for pg_recvlogical to exit";
+
+# our client should've terminated in response to the walsender error
+eval {
+	$handle->finish;
+};
+$return = $?;
+if ($return) {
+	is($return, 256, "pg_recvlogical terminated by server");
+	like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict');
+	like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db');
+}
+
+is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited');
+
+# The slot should be dropped by recovery now
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f',
+  'database dropped on standby');
+
+is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped');
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to