diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..d1400ec 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use oldestCatalogXmin here */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 278546a..4aaae59 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
 	int			i;
 
-	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
+	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestCatalogXmin %u",
 					 xlrec->nextXid,
 					 xlrec->latestCompletedXid,
-					 xlrec->oldestRunningXid);
+					 xlrec->oldestRunningXid,
+					 xlrec->oldestCatalogXmin);
 	if (xlrec->xcnt > 0)
 	{
 		appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..6094465 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5021,6 +5021,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6611,6 +6612,9 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6628,6 +6632,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -6913,6 +6918,7 @@ StartupXLOG(void)
 				Assert(TransactionIdIsNormal(latestCompletedXid));
 				running.latestCompletedXid = latestCompletedXid;
 				running.xids = xids;
+				running.pendingOldestCatalogXmin = InvalidTransactionId;
 
 				ProcArrayApplyRecoveryInfo(&running);
 
@@ -8704,6 +8710,10 @@ CreateCheckPoint(int flags)
 	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
 	LWLockRelease(XidGenLock);
 
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+
 	LWLockAcquire(CommitTsLock, LW_SHARED);
 	checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
 	checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
@@ -9633,6 +9643,12 @@ xlog_redo(XLogReaderState *record)
 		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 
 		/*
+		 * There can be no concurrent writers to oldestCatalogXmin during
+		 * recovery, so no need to take ProcArrayLock.
+		 */
+		ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
+
+		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
 		 * record, the backup was canceled and the end-of-backup record will
 		 * never arrive.
@@ -9675,6 +9691,7 @@ xlog_redo(XLogReaderState *record)
 			Assert(TransactionIdIsNormal(latestCompletedXid));
 			running.latestCompletedXid = latestCompletedXid;
 			running.xids = xids;
+			running.pendingOldestCatalogXmin = InvalidTransactionId;
 
 			ProcArrayApplyRecoveryInfo(&running);
 
@@ -9731,6 +9748,15 @@ xlog_redo(XLogReaderState *record)
 								  checkPoint.oldestXid))
 			SetTransactionIdLimit(checkPoint.oldestXid,
 								  checkPoint.oldestXidDB);
+
+		/*
+		 * There can be no concurrent writers to oldestCatalogXmin during
+		 * recovery, so no need to take ProcArrayLock.
+		 */
+		if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
+									checkPoint.oldestCatalogXmin)
+			ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
+
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..76155bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -68,6 +68,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
+static void EnsureActiveLogicalSlotValid(void);
+
 /*
  * Make sure the current settings & environment are capable of doing logical
  * decoding.
@@ -218,6 +220,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlot *slot;
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
+	bool force_standby_snapshot;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -276,8 +279,21 @@ CreateInitDecodingContext(char *plugin,
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
+	/*
+	 * If this is the first slot created on the master we won't have a
+	 * persistent record of the oldest safe xid for historic snapshots yet.
+	 * Force one to be recorded so that when we go to replay from this slot we
+	 * know it's safe.
+	 */
+	force_standby_snapshot =
+		!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin);
+
 	LWLockRelease(ProcArrayLock);
 
+	/* Update ShmemVariableCache->oldestCatalogXmin */
+	if (force_standby_snapshot)
+		LogStandbySnapshot();
+
 	/*
 	 * tell the snapshot builder to only assemble snapshot once reaching the
 	 * running_xact's record with the respective xmin.
@@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
+	EnsureActiveLogicalSlotValid();
+
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
 								 read_page, prepare_write, do_write);
@@ -963,3 +981,39 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
+
+/*
+ * Test to see if the active logical slot is usable.
+ */
+static void
+EnsureActiveLogicalSlotValid(void)
+{
+	TransactionId shmem_catalog_xmin;
+
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * A logical slot can become unusable if we're doing logical decoding on a
+	 * standby or using a slot created before we were promoted from standby
+	 * to master. If the master advanced its global catalog_xmin past the
+	 * threshold we need it could've removed catalog tuple versions that
+	 * we'll require to start decoding at our restart_lsn.
+	 *
+	 * We need a barrier so that if we decode in recovery on a standby we
+	 * don't allow new decoding sessions to start after redo has advanced
+	 * the threshold.
+	 */
+	if (RecoveryInProgress())
+		pg_memory_barrier();
+
+	shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (!TransactionIdIsValid(shmem_catalog_xmin) ||
+		TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("replication slot '%s' requires catalogs removed by master",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("need catalog_xmin %u, have oldestCatalogXmin %u",
+						   MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin)));
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 771ac30..c2ad791 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1233,7 +1233,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 		xmin = GetOldestXmin(NULL,
 							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
 
-		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
 
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..cdc5f95 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1658,6 +1658,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1778,6 +1783,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..48b18ec 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,11 @@ typedef struct ProcArrayStruct
 
 	/* oldest xmin of any replication slot */
 	TransactionId replication_slot_xmin;
-	/* oldest catalog xmin of any replication slot */
+	/*
+	 * Oldest catalog xmin of any replication slot
+	 *
+	 * See also ShmemVariableCache->oldestGlobalXmin
+	 */
 	TransactionId replication_slot_catalog_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -679,6 +683,20 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
 
 	/*
+	 * Update our knowledge of the oldest xid we can safely create historic
+	 * snapshots for.
+	 *
+	 * There can be no concurrent writers to oldestCatalogXmin during
+	 * recovery, so no need to take ProcArrayLock.
+	 *
+	 * If we allow logical decoding on standbys in future we must raise
+	 * recovery conflicts with catalog_xmin advances here.
+	 */
+	if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
+									  running->oldestRunningXid))
+		ShmemVariableCache->oldestCatalogXmin = running->pendingOldestCatalogXmin;
+
+	/*
 	 * Remove stale locks, if any.
 	 *
 	 * Locks are always assigned to the toplevel xid so we don't need to care
@@ -1306,6 +1324,9 @@ TransactionIdIsActive(TransactionId xid)
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
  */
 TransactionId
 GetOldestXmin(Relation rel, int flags)
@@ -1493,7 +1514,8 @@ GetMaxSnapshotSubxidCount(void)
  *			older than this are known not running any more.
  *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
  *			running transactions, except those running LAZY VACUUM).  This is
- *			the same computation done by GetOldestXmin(true, true).
+ *			the same computation done by
+ *			GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_FLAGS_VACUUM)
  *		RecentGlobalDataXmin: the global xmin for non-catalog tables
  *			>= RecentGlobalXmin
  *
@@ -1700,7 +1722,7 @@ GetSnapshotData(Snapshot snapshot)
 
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1733,9 @@ GetSnapshotData(Snapshot snapshot)
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
 	 * the same result.
+	 *
+	 * If you change computation of RecentGlobalXmin here you may need to
+	 * change GetOldestXmin(...) as well.
 	 */
 	if (TransactionIdPrecedes(xmin, globalxmin))
 		globalxmin = xmin;
@@ -2041,12 +2066,16 @@ GetRunningTransactionData(void)
 	}
 
 	/*
-	 * It's important *not* to include the limits set by slots here because
+	 * It's important *not* to include the xmin set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
 	 * were to be included here the initial value could never increase because
 	 * of a circular dependency where slots only increase their limits when
 	 * running xacts increases oldestRunningXid and running xacts only
 	 * increases if slots do.
+	 *
+	 * We can include the catalog_xmin limit here; there's no similar
+	 * circularity, and we need it to log xl_running_xacts records for
+	 * standbys.
 	 */
 
 	CurrentRunningXacts->xcnt = count - subcount;
@@ -2055,6 +2084,8 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
+	CurrentRunningXacts->pendingOldestCatalogXmin =
+		procArray->replication_slot_catalog_xmin;
 
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
@@ -2168,14 +2199,14 @@ GetOldestSafeDecodingTransactionId(void)
 	oldestSafeXid = ShmemVariableCache->nextXid;
 
 	/*
-	 * If there's already a slot pegging the xmin horizon, we can start with
-	 * that value, it's guaranteed to be safe since it's computed by this
-	 * routine initially and has been enforced since.
+	 * If there's already an effectiveCatalogXmin held down by an existing
+	 * replication slot it's definitely safe to start there, and it can't
+	 * advance while we hold ProcArrayLock.
 	 */
-	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
-		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
+	if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) &&
+		TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
 							  oldestSafeXid))
-		oldestSafeXid = procArray->replication_slot_catalog_xmin;
+		oldestSafeXid = ShmemVariableCache->oldestCatalogXmin;
 
 	/*
 	 * If we're not in recovery, we walk over the procarray and collect the
@@ -2965,18 +2996,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmin, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ *
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs, outdated replicas sending
+ * feedback, etc.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		*retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		*needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f93..819abf7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -45,6 +45,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+static void UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin);
 
 
 /*
@@ -822,6 +823,7 @@ standby_redo(XLogReaderState *record)
 		running.latestCompletedXid = xlrec->latestCompletedXid;
 		running.oldestRunningXid = xlrec->oldestRunningXid;
 		running.xids = xlrec->xids;
+		running.pendingOldestCatalogXmin = xlrec->oldestCatalogXmin;
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
@@ -953,12 +955,24 @@ LogStandbySnapshot(void)
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
+	/*
+	 * Now that we've recorded our intention to allow cleanup of catalog tuples
+	 * no longer needed by our replication slots we can make the new threshold
+	 * effective for vacuum etc.
+	 */
+	UpdateOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
 	return recptr;
 }
 
 /*
  * Record an enhanced snapshot of running transactions into WAL.
  *
+ * We also record the value of procArray->replication_slot_catalog_xmin
+ * obtained from GetRunningTransactionData here, so standbys know we're about
+ * to advance ShmemVariableCache->oldestCatalogXmin to its value and start
+ * removing dead catalog tuples below that threshold.
+ *
  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
  * similar. We keep them separate because xl_xact_running_xacts is a
  * contiguous chunk of memory and never exists fully until it is assembled in
@@ -977,6 +991,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
+	xlrec.oldestCatalogXmin = CurrRunningXacts->pendingOldestCatalogXmin;
 
 	/* Header */
 	XLogBeginInsert();
@@ -1021,6 +1036,16 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	return recptr;
 }
 
+static void
+UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, pendingOldestCatalogXmin)
+		|| (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != TransactionIdIsValid(pendingOldestCatalogXmin)))
+		ShmemVariableCache->oldestCatalogXmin = pendingOldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
+
 /*
  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
  * logged, as described in backend/storage/lmgr/README.
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..a4ecfb7 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -136,6 +136,17 @@ typedef struct VariableCacheData
 										 * aborted */
 
 	/*
+	 * This field is protected by ProcArrayLock except
+	 * during recovery, when it's set unlocked.
+	 *
+	 * oldestCatalogXmin is the oldest xid it is
+	 * guaranteed to be safe to create a historic
+	 * snapshot for. See also
+	 * procArray->replication_slot_catalog_xmin
+	 */
+	TransactionId oldestCatalogXmin;
+
+	/*
 	 * These fields are protected by CLogTruncationLock
 	 */
 	TransactionId oldestClogXid;	/* oldest it's safe to look up in clog */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index c09c0f8..0621845 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD097	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD100	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..b9461b3 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -23,7 +23,7 @@
 #define MOCK_AUTH_NONCE_LEN		32
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION	1002
+#define PG_CONTROL_VERSION	1003
 
 /*
  * Body of CheckPoint XLOG records.  This is declared here because we keep
@@ -45,6 +45,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin;	/* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..05ace64 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 3ecc446..7756a27 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -65,6 +65,10 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids);
  * is written to WAL as a separate record immediately after each
  * checkpoint. That means that wherever we start a standby from we will
  * almost immediately see the data we need to begin executing queries.
+ *
+ * Information about the oldest catalog_xmin needed by any replication slot is
+ * also included here, so we can use it to update the catalog tuple removal
+ * limit and convey the new limit to standbys.
  */
 
 typedef struct RunningTransactionsData
@@ -75,6 +79,8 @@ typedef struct RunningTransactionsData
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	/* so we can update ShmemVariableCache->oldestCatalogXmin: */
+	TransactionId pendingOldestCatalogXmin;
 
 	TransactionId *xids;		/* array of (sub)xids still running */
 } RunningTransactionsData;
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index f8444c7..6153675 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -52,6 +52,7 @@ typedef struct xl_running_xacts
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	TransactionId oldestCatalogXmin;	/* oldest safe historic snapshot */
 
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..2cfa9ac 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 25;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -17,6 +17,10 @@ $node_master->append_conf(
 wal_level = logical
 ));
 $node_master->start;
+
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+	"pg_controldata's oldestCatalogXmin is zero after start");
+
 my $backup_name = 'master_backup';
 
 $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
@@ -96,9 +100,18 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+	"pg_controldata's oldestCatalogXmin is nonzero");
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+# First checkpoint forces xl_running_xacts with the new oldestCatalogXmin
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+# Then we need a second checkpoint to write the control file with the new value
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+	"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint");
 
 # done with the node
 $node_master->stop;
