From aa3004a70e1ab2ee304367b29dde1549326354f1 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Mon, 1 Jul 2019 10:49:50 +0530
Subject: [PATCH] Logical decoding on standby - v11

Author : Andres Freund.

Besides the above main changes, patch includes following :

1. Handle slot conflict recovery by dropping the conflicting slots.

-Amit Khandekar.

2. test/recovery/t/016_logical_decoding_on_replica.pl added.

Original author : Craig Ringer. few changes/additions from Amit Khandekar.

3. Handle slot conflicts when master wal_level becomes less than logical.

Changes in v6 patch :

While creating the slot, lastReplayedEndRecPtr is used to set the
restart_lsn, but its position is later adjusted in
DecodingContextFindStartpoint() in case it does not point to a
valid record location. This can happen because replay pointer
points to 1 + end of last record replayed, which means it can
coincide with first byte of a new WAL block, i.e. inside block
header.

Also, modified the test to handle the requirement that the
logical slot creation on standby requires a checkpoint
(or any other transaction commit) to be given from master. For
that, in src/test/perl/PostgresNode.pm, added a new function
create_logical_slot_on_standby() which does the reqiured steps.

Changes in v7 patch :

Merge the two conflict messages for xmin and catalog_xmin into
a single one.

Changes in v8 :

Fix incorrect flush ptr on standby (reported by Tushar Ahuja).
In XLogSendLogical(), GetFlushRecPtr() was used to get the flushed
point. On standby, GetFlushRecPtr() does not give a valid value, so it
was wrongly determined that the sent record is beyond flush point, as
a result of which, WalSndCaughtUp was set to true, causing
WalSndLoop() to sleep for some duration after every record.
This was reported by Tushar Ahuja, where pg_recvlogical seems like it
is hanging when there are loads of insert.
Fix: Use GetStandbyFlushRecPtr() if am_cascading_walsender

Changes in v9 :
While dropping a conflicting logical slot, if a backend has acquired it, send
it a conflict recovery signal. Check new function ReplicationSlotDropConflicting().
Also, miscellaneous review comments addressed, but not all of them yet.

Changes in v10 :
Adjust restart_lsn if it's a Replay Pointer.
This was earlier done in DecodingContextFindStartpoint() but now it
is done in in ReplicationSlotReserveWal(), when restart_lsn is initialized.

Changes in v11 :
Added some test scenarios to test drop-slot conflicts. Organized the
test file a bit.
Also improved the conflict error message.
---
 src/backend/access/gist/gistxlog.c                 |   6 +-
 src/backend/access/hash/hash_xlog.c                |   3 +-
 src/backend/access/hash/hashinsert.c               |   2 +
 src/backend/access/heap/heapam.c                   |  23 +-
 src/backend/access/heap/vacuumlazy.c               |   2 +-
 src/backend/access/heap/visibilitymap.c            |   2 +-
 src/backend/access/nbtree/nbtpage.c                |   4 +
 src/backend/access/nbtree/nbtxlog.c                |   4 +-
 src/backend/access/spgist/spgvacuum.c              |   2 +
 src/backend/access/spgist/spgxlog.c                |   1 +
 src/backend/access/transam/xlog.c                  |  22 ++
 src/backend/postmaster/pgstat.c                    |   4 +
 src/backend/replication/logical/decode.c           |  14 +-
 src/backend/replication/logical/logical.c          |  33 +-
 src/backend/replication/slot.c                     | 233 +++++++++++-
 src/backend/replication/walsender.c                |   8 +-
 src/backend/storage/ipc/procarray.c                |   4 +
 src/backend/storage/ipc/procsignal.c               |   3 +
 src/backend/storage/ipc/standby.c                  |   7 +-
 src/backend/tcop/postgres.c                        |  23 +-
 src/backend/utils/adt/pgstatfuncs.c                |   1 +
 src/backend/utils/cache/lsyscache.c                |  16 +
 src/include/access/gistxlog.h                      |   3 +-
 src/include/access/hash_xlog.h                     |   1 +
 src/include/access/heapam_xlog.h                   |   8 +-
 src/include/access/nbtxlog.h                       |   2 +
 src/include/access/spgxlog.h                       |   1 +
 src/include/access/xlog.h                          |   1 +
 src/include/pgstat.h                               |   1 +
 src/include/replication/slot.h                     |   2 +
 src/include/storage/procsignal.h                   |   1 +
 src/include/storage/standby.h                      |   2 +-
 src/include/utils/lsyscache.h                      |   1 +
 src/include/utils/rel.h                            |   1 +
 src/test/perl/PostgresNode.pm                      |  27 ++
 .../recovery/t/018_logical_decoding_on_replica.pl  | 420 +++++++++++++++++++++
 36 files changed, 830 insertions(+), 58 deletions(-)
 create mode 100644 src/test/recovery/t/018_logical_decoding_on_replica.pl

diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 503db34..385ea1f 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -397,7 +398,7 @@ gistRedoPageReuse(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
-											xlrec->node);
+											xlrec->onCatalogTable, xlrec->node);
 	}
 }
 
@@ -589,6 +590,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, TransactionId latestRemovedXi
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d7b7098..00c3e0f 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 5321762..e28465a 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
 #include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
+			xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
 			xlrec.latestRemovedXid = latestRemovedXid;
 			xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d768b9b..10b7857 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7149,12 +7149,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
 {
 	xl_heap_cleanup_info xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.node = rnode;
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+	xlrec.node = rel->rd_node;
 	xlrec.latestRemovedXid = latestRemovedXid;
 
 	XLogBeginInsert();
@@ -7190,6 +7191,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
@@ -7240,6 +7242,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.ntuples = ntuples;
 
@@ -7270,7 +7273,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  * heap_buffer, if necessary.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
@@ -7280,6 +7283,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
@@ -7700,7 +7704,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
 	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, xlrec->node);
 
 	/*
 	 * Actual operation is a no-op. Record type exists to provide a means for
@@ -7736,7 +7741,8 @@ heap_xlog_clean(XLogReaderState *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
@@ -7832,7 +7838,9 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+											xlrec->onCatalogTable,
+											rnode);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -7969,7 +7977,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
 		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index a3c4a1d..bf34d3a 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -473,7 +473,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 	 * No need to write the record at all unless it contains a valid value
 	 */
 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+		(void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
 }
 
 /*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 64dfe06..c5fdd64 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -281,7 +281,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			if (XLogRecPtrIsInvalid(recptr))
 			{
 				Assert(!InRecovery);
-				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+				recptr = log_heap_visible(rel, heapBuf, vmBuf,
 										  cutoff_xid, flags);
 
 				/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 0357030..6b641c9 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -31,6 +31,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
 
 static void _bt_cachemetadata(Relation rel, BTMetaPageData *input);
@@ -773,6 +774,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1140,6 +1142,8 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.onCatalogTable =
+			RelationIsAccessibleInLogicalDecoding(heapRel);
 		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.nitems = nitems;
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 6532a25..b874bda 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -526,7 +526,8 @@ btree_xlog_delete(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	/*
@@ -810,6 +811,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable,
 											xlrec->node);
 	}
 }
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 2b1662a..eaaf631 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
 
 
 /* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
 	spgxlogVacuumRedirect xlrec;
 
+	xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
 	xlrec.nToPlaceholder = 0;
 	xlrec.newestRedirectXid = InvalidTransactionId;
 
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index ebe6ae8..800609c 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+												xldata->onCatalogTable,
 												node);
 		}
 	}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e08320e..7417bcf 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4926,6 +4926,15 @@ LocalProcessControlFile(bool reset)
 }
 
 /*
+ * Get the wal_level from the control file.
+ */
+WalLevel
+GetActiveWalLevel(void)
+{
+	return ControlFile->wal_level;
+}
+
+/*
  * Initialization of shared memory for XLOG
  */
 Size
@@ -9843,6 +9852,19 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Drop logical slots if we are in hot standby and master does not have
+		 * logical data. Don't bother to search for the slots if standby is
+		 * running with wal_level lower than logical, because in that case,
+		 * we would have either disallowed creation of logical slots or dropped
+		 * existing ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId,
+				gettext_noop("Logical decoding on standby requires wal_level >= logical on master."));
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b4f2b28..797ea0c 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4728,6 +4728,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 	dbentry->n_conflict_tablespace = 0;
 	dbentry->n_conflict_lock = 0;
 	dbentry->n_conflict_snapshot = 0;
+	dbentry->n_conflict_logicalslot = 0;
 	dbentry->n_conflict_bufferpin = 0;
 	dbentry->n_conflict_startup_deadlock = 0;
 	dbentry->n_temp_files = 0;
@@ -6352,6 +6353,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			dbentry->n_conflict_snapshot++;
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			dbentry->n_conflict_logicalslot++;
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
 			dbentry->n_conflict_bufferpin++;
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 151c3ef..c1bd028 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -190,11 +190,23 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+		{
+			xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+			/* Cannot proceed if master itself does not have logical data */
+			if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical decoding on standby requires "
+								"wal_level >= logical on master")));
+			break;
+		}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index bbd38c0..4169828 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -94,23 +94,22 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires "
+							"wal_level >= logical on master")));
+	}
 }
 
 /*
@@ -241,6 +240,8 @@ CreateInitDecodingContext(char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 55c306e..47c7dd8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
 #include "pgstat.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
+#include "storage/lock.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -101,6 +102,7 @@ int			max_replication_slots = 0;	/* the maximum number of replication
 
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static void ReplicationSlotDropConflicting(ReplicationSlot *slot);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -638,6 +640,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 }
 
 /*
+ * Permanently drop a conflicting replication slot. If it's already active by
+ * another backend, send it a recovery conflict signal, and then try again.
+ */
+static void
+ReplicationSlotDropConflicting(ReplicationSlot *slot)
+{
+	pid_t		active_pid;
+	PGPROC	   *proc;
+	VirtualTransactionId	vxid;
+
+	ConditionVariablePrepareToSleep(&slot->active_cv);
+	while (1)
+	{
+		SpinLockAcquire(&slot->mutex);
+		active_pid = slot->active_pid;
+		if (active_pid == 0)
+			active_pid = slot->active_pid = MyProcPid;
+		SpinLockRelease(&slot->mutex);
+
+		/* Drop the acquired slot, unless it is acquired by another backend */
+		if (active_pid == MyProcPid)
+		{
+			elog(DEBUG1, "acquired conflicting slot, now dropping it");
+			ReplicationSlotDropPtr(slot);
+			break;
+		}
+
+		/* Send the other backend, a conflict recovery signal */
+
+		SetInvalidVirtualTransactionId(vxid);
+		LWLockAcquire(ProcArrayLock, LW_SHARED);
+		proc = BackendPidGetProcWithLock(active_pid);
+		if (proc)
+			GET_VXID_FROM_PGPROC(vxid, *proc);
+		LWLockRelease(ProcArrayLock);
+
+		/*
+		 * If coincidently that process finished, some other backend may
+		 * acquire the slot again. So start over again.
+		 * Note: Even if vxid.localTransactionId is invalid, we need to cancel
+		 * that backend, because there is no other way to make it release the
+		 * slot. So don't bother to validate vxid.localTransactionId.
+		 */
+		if (vxid.backendId == InvalidBackendId)
+			continue;
+
+		elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot",
+					 active_pid, vxid.backendId);
+
+		CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+		ConditionVariableSleep(&slot->active_cv,
+							   WAIT_EVENT_REPLICATION_SLOT_DROP);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+/*
  * Serialize the currently acquired slot's state from memory to disk, thereby
  * guaranteeing the current state will survive a crash.
  */
@@ -1016,37 +1076,56 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be built
+		 * using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base backup
+		 * has to start replay at.
 		 */
+		if (SlotIsPhysical(slot))
+			restart_lsn = GetRedoRecPtr();
+		else if (RecoveryInProgress())
+		{
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+			/*
+			 * Replay pointer may point one past the end of the record. If that
+			 * is a XLOG page boundary, it will not be a valid LSN for the
+			 * start of a record, so bump it up past the page header.
+			 */
+			if (!XRecOffIsValid(restart_lsn))
+			{
+				if (restart_lsn % XLOG_BLCKSZ != 0)
+					elog(ERROR, "invalid replay pointer");
+				/* For the first page of a segment file, it's a long header */
+				if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
+					restart_lsn += SizeOfXLogLongPHD;
+				else
+					restart_lsn += SizeOfXLogShortPHD;
+			}
+		}
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
+
 		if (!RecoveryInProgress() && SlotIsLogical(slot))
 		{
 			XLogRecPtr	flushptr;
 
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
 
 			/* and make sure it's fsynced to disk */
 			XLogFlush(flushptr);
 		}
-		else
-		{
-			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
@@ -1065,6 +1144,122 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that rows older than xid might have been
+ * removed. Therefore we need to drop slots that depend on seeing those rows.
+ * When xid is invalid, drop all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be dropped. Also, when xid is invalid, a common 'conflict_reason' is
+ * provided for the error detail; otherwise it is NULL, in which case it is
+ * constructed out of the xid value.
+ */
+void
+ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid,
+										char *conflict_reason)
+{
+	int			i;
+	bool		found_conflict = false;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	if (found_conflict)
+	{
+		CHECK_FOR_INTERRUPTS();
+		found_conflict = false;
+	}
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* We are only dealing with *logical* slot conflicts. */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* Invalid xid means caller is asking to drop all logical slots */
+		if (!TransactionIdIsValid(xid))
+			found_conflict = true;
+		else
+		{
+			TransactionId slot_xmin;
+			TransactionId slot_catalog_xmin;
+			StringInfoData	conflict_str, conflict_xmins;
+			char	   *conflict_sentence =
+				gettext_noop("Slot conflicted with xid horizon which was being increased to");
+
+			/* not our database, skip */
+			if (s->data.database != InvalidOid && s->data.database != dboid)
+				continue;
+
+			SpinLockAcquire(&s->mutex);
+			slot_xmin = s->data.xmin;
+			slot_catalog_xmin = s->data.catalog_xmin;
+			SpinLockRelease(&s->mutex);
+
+			/*
+			 * Build the conflict_str which will look like :
+			 * "Slot conflicted with xid horizon which was being increased
+			 * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)."
+			 */
+			initStringInfo(&conflict_xmins);
+			if (TransactionIdIsValid(slot_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_xmin, xid))
+			{
+				appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin);
+			}
+			if (TransactionIdIsValid(slot_catalog_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+				appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d",
+								 conflict_xmins.len > 0 ? ", " : "",
+								 slot_catalog_xmin);
+
+			if (conflict_xmins.len > 0)
+			{
+				initStringInfo(&conflict_str);
+				appendStringInfo(&conflict_str, "%s %d (%s).",
+								 conflict_sentence, xid, conflict_xmins.data);
+				found_conflict = true;
+				conflict_reason = conflict_str.data;
+			}
+		}
+
+		if (found_conflict)
+		{
+			NameData	slotname;
+
+			SpinLockAcquire(&s->mutex);
+			slotname = s->data.name;
+			SpinLockRelease(&s->mutex);
+
+			/* ReplicationSlotDropPtr() would acquire the lock below */
+			LWLockRelease(ReplicationSlotControlLock);
+
+			ReplicationSlotDropConflicting(s);
+
+			ereport(LOG,
+					(errmsg("dropped conflicting slot %s", NameStr(slotname)),
+					 errdetail("%s", conflict_reason)));
+
+			/* We released the lock above; so re-scan the slots. */
+			goto restart;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
+
+/*
  * Flush all replication slots to disk.
  *
  * This needn't actually be part of a checkpoint, but it's a convenient
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 92fa86f..4ce7096 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2814,6 +2814,7 @@ XLogSendLogical(void)
 {
 	XLogRecord *record;
 	char	   *errm;
+	XLogRecPtr	flushPtr;
 
 	/*
 	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
@@ -2830,10 +2831,11 @@ XLogSendLogical(void)
 	if (errm != NULL)
 		elog(ERROR, "%s", errm);
 
+	flushPtr = (am_cascading_walsender ?
+				GetStandbyFlushRecPtr() : GetFlushRecPtr());
+
 	if (record != NULL)
 	{
-		/* XXX: Note that logical decoding cannot be used while in recovery */
-		XLogRecPtr	flushPtr = GetFlushRecPtr();
 
 		/*
 		 * Note the lack of any call to LagTrackerWrite() which is handled by
@@ -2857,7 +2859,7 @@ XLogSendLogical(void)
 		 * If the record we just wanted read is at or beyond the flushed
 		 * point, then we're caught up.
 		 */
-		if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
+		if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
 		{
 			WalSndCaughtUp = true;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 18a0f62..ec696f4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2669,6 +2669,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
 
 		GET_VXID_FROM_PGPROC(procvxid, *proc);
 
+		/*
+		 * Note: vxid.localTransactionId can be invalid, which means the
+		 * request is to signal the pid that is not running a transaction.
+		 */
 		if (procvxid.backendId == vxid.backendId &&
 			procvxid.localTransactionId == vxid.localTransactionId)
 		{
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 7605b2c..645f320 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -286,6 +286,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 25b7e31..7cfb6d5 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
 #include "access/xloginsert.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -291,7 +292,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									bool onCatalogTable, RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -312,6 +314,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
+
+	if (onCatalogTable)
+		ResolveRecoveryConflictWithLogicalSlots(node.dbNode, latestRemovedXid, NULL);
 }
 
 void
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 44a59e1..c23d361 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2393,6 +2393,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -2879,6 +2882,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 			case PROCSIG_RECOVERY_CONFLICT_LOCK:
 			case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
 			case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				/*
+				 * For conflicts that require a logical slot to be dropped, the
+				 * requirement is for the signal receiver to release the slot,
+				 * so that it could be dropped by the signal sender. So for
+				 * normal backends, the transaction should be aborted, just
+				 * like for other recovery conflicts. But if it's walsender on
+				 * standby, then it has to be killed so as to release an
+				 * acquired logical slot.
+				 */
+				if (am_cascading_walsender &&
+					reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+					MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
+				{
+					RecoveryConflictPending = true;
+					QueryCancelPending = true;
+					InterruptPending = true;
+					break;
+				}
 
 				/*
 				 * If we aren't in a transaction any longer then ignore.
@@ -2920,7 +2942,6 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 
 				/* Intentional fall through to session cancel */
 				/* FALLTHROUGH */
-
 			case PROCSIG_RECOVERY_CONFLICT_DATABASE:
 				RecoveryConflictPending = true;
 				ProcDiePending = true;
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 05240bf..7dfbef7 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1499,6 +1499,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 						  dbentry->n_conflict_tablespace +
 						  dbentry->n_conflict_lock +
 						  dbentry->n_conflict_snapshot +
+						  dbentry->n_conflict_logicalslot +
 						  dbentry->n_conflict_bufferpin +
 						  dbentry->n_conflict_startup_deadlock);
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index c13c08a..bd35bc1 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
@@ -1893,6 +1895,20 @@ get_rel_persistence(Oid relid)
 	return result;
 }
 
+bool
+get_rel_logical_catalog(Oid relid)
+{
+	bool	res;
+	Relation rel;
+
+	/* assume previously locked */
+	rel = heap_open(relid, NoLock);
+	res = RelationIsAccessibleInLogicalDecoding(rel);
+	heap_close(rel, NoLock);
+
+	return res;
+}
+
 
 /*				---------- TRANSFORM CACHE ----------						 */
 
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 969a537..59246c3 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -48,9 +48,9 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		ntodelete;		/* number of deleted offsets */
-
 	/*
 	 * In payload of blk 0 : todelete OffsetNumbers
 	 */
@@ -96,6 +96,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 53b682c..fd70b55 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index f6cdca8..a1d1f11 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_clean
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
  */
 typedef struct xl_heap_cleanup_info
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	TransactionId latestRemovedXid;
 } xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
  */
 typedef struct xl_heap_freeze_page
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint16		ntuples;
 } xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint8		flags;
 } xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
 										TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 								 OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  bool *totally_frozen);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 									  xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 								   Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 9beccc8..f64a33c 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -126,6 +126,7 @@ typedef struct xl_btree_split
  */
 typedef struct xl_btree_delete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			nitems;
 
@@ -139,6 +140,7 @@ typedef struct xl_btree_delete
  */
 typedef struct xl_btree_reuse_page
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 073f740..d3dad69 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+	bool		onCatalogTable;
 	uint16		nToPlaceholder; /* number of redirects to make placeholders */
 	OffsetNumber firstPlaceholder;	/* first placeholder tuple to remove */
 	TransactionId newestRedirectXid;	/* newest XID of removed redirects */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 237f4e0..e7439c1 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -299,6 +299,7 @@ extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevel(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0a3ad3a..4fe8684 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -604,6 +604,7 @@ typedef struct PgStat_StatDBEntry
 	PgStat_Counter n_conflict_tablespace;
 	PgStat_Counter n_conflict_lock;
 	PgStat_Counter n_conflict_snapshot;
+	PgStat_Counter n_conflict_logicalslot;
 	PgStat_Counter n_conflict_bufferpin;
 	PgStat_Counter n_conflict_startup_deadlock;
 	PgStat_Counter n_temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8fbddea..73b954e 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -205,4 +205,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason);
+
 #endif							/* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 05b186a..956d3c2 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -39,6 +39,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index a3f8f82..6dedebc 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
-												RelFileNode node);
+									bool onCatalogTable, RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index c8df5bf..579d9ff 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -131,6 +131,7 @@ extern char get_rel_relkind(Oid relid);
 extern bool get_rel_relispartition(Oid relid);
 extern Oid	get_rel_tablespace(Oid relid);
 extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
 extern Oid	get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
 extern Oid	get_transform_tosql(Oid typid, Oid langid, List *trftypes);
 extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index d7f33ab..8c90fd7 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 6019f37..719837d 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2000,6 +2000,33 @@ sub pg_recvlogical_upto
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+	my ($self, $master, $slot_name, $dbname) = @_;
+	my ($stdout, $stderr);
+
+	my $handle;
+
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+	sleep(1);
+
+	# Slot creation on standby waits for an xl_running_xacts record. So arrange
+	# for it.
+	$master->safe_psql('postgres', 'CHECKPOINT');
+
+	$handle->finish();
+
+	return 0;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/t/018_logical_decoding_on_replica.pl b/src/test/recovery/t/018_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..fd77e19
--- /dev/null
+++ b/src/test/recovery/t/018_logical_decoding_on_replica.pl
@@ -0,0 +1,420 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+use 5.8.0;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 58;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $return);
+my $backup_name;
+
+my $node_master = get_new_node('master');
+my $node_replica = get_new_node('replica');
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+
+	my $slotinfo = $node->slot($slotname);
+	return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
+}
+
+sub print_phys_xmin
+{
+	my $slot = $node_master->slot('master_physical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+sub print_logical_xmin
+{
+	my $slot = $node_replica->slot('standby_logical');
+	return ($slot->{'xmin'}, $slot->{'catalog_xmin'});
+}
+
+sub create_logical_slots
+{
+	is($node_replica->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb'),
+	   0, 'created dropslot on testdb')
+		or BAIL_OUT('cannot continue if slot creation fails, see logs');
+	is($node_replica->slot('dropslot')->{'slot_type'}, 'logical', 'dropslot on standby created');
+	is($node_replica->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb'),
+	   0, 'created activeslot on testdb')
+		or BAIL_OUT('cannot continue if slot creation fails, see logs');
+	is($node_replica->slot('activeslot')->{'slot_type'}, 'logical', 'activeslot on standby created');
+
+	return 0;
+}
+
+sub make_slot_active
+{
+	# make sure activeslot is in use
+	print "starting pg_recvlogical";
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+
+	while (!$node_replica->slot('activeslot')->{'active_pid'})
+	{
+		usleep(100_000);
+		print "waiting for slot to become active\n";
+	}
+	return 0;
+}
+
+sub check_slots_dropped
+{
+	is($node_replica->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped');
+	is($node_replica->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+	# our client should've terminated in response to the walsender error
+	eval {
+		$handle->finish;
+	};
+	$return = $?;
+	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero ");
+	if ($return) {
+		like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict');
+		like($stderr, qr/must be dropped/, 'recvlogical error detail');
+	}
+
+	return 0;
+}
+
+# Initialize master node
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('master_physical');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=master_physical');
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# After slot creation, xmins must be null
+is($xmin, '', "xmin null");
+is($catalog_xmin, '', "catalog_xmin null");
+
+# Initialize slave node
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_replica->append_conf('postgresql.conf',
+	q[primary_slot_name = 'master_physical']);
+
+$node_replica->start;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# with hot_standby_feedback off, xmin and catalog_xmin must still be null
+($xmin, $catalog_xmin) = print_phys_xmin();
+is($xmin, '', "xmin null after replica join");
+is($catalog_xmin, '', "catalog_xmin null after replica join");
+
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical',
+	"xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+is($node_replica->create_logical_slot_on_standby($node_master, 'standby_logical', 'testdb'),
+   0, 'logical slot creation on standby succeeded')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('testdb', 'DROP TABLE test_table');
+$node_master->safe_psql('testdb', 'VACUUM');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+	or BAIL_OUT('cannot continue if slot replay fails');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('testdb', 'VACUUM');
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin,
+		"logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_replica->psql('testdb',
+	qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+# logical slot catalog_xmin on slave should advance after
+# pg_logical_slot_get_changes
+($new_logical_xmin, $new_logical_catalog_xmin) =
+	wait_for_xmins($node_replica, 'standby_logical',
+				   "catalog_xmin::varchar::int > ${logical_catalog_xmin}");
+is($new_logical_xmin, '', "logical xmin null");
+
+# hot standby feedback should advance master's phys catalog_xmin now that the
+# standby's slot doesn't hold it down as far.
+my ($new_physical_xmin, $new_physical_catalog_xmin) =
+	wait_for_xmins($node_master, 'master_physical',
+				   "catalog_xmin::varchar::int > ${physical_catalog_xmin}");
+isnt($new_physical_xmin, '', "physical xmin not null");
+cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin,
+	'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+#########################################################
+# Upstream oldestXid retention
+#########################################################
+
+sub test_oldest_xid_retention()
+{
+	# First burn some xids on the master in another DB, so we push the master's
+	# nextXid ahead.
+	foreach my $i (1 .. 100)
+	{
+		$node_master->safe_psql('postgres', 'SELECT txid_current()');
+	}
+
+	# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+	# past our needed xmin. The only way we have visibility into that is to force
+	# a checkpoint.
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+	foreach my $dbname ('template1', 'postgres', 'testdb', 'template0')
+	{
+		$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+	}
+	sleep(1);
+	$node_master->safe_psql('postgres', 'CHECKPOINT');
+	IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+		or die "pg_controldata failed with $?";
+	my @checkpoint = split('\n', $stdout);
+	my ($oldestXid, $nextXid) = ('', '', '');
+	foreach my $line (@checkpoint)
+	{
+		if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/)
+		{
+			$nextXid = $1;
+		}
+		if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+		{
+			$oldestXid = $1;
+		}
+	}
+	die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+	my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+	my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+
+	print "upstream oldestXid $oldestXid, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin";
+
+	$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+	return ($oldestXid);
+}
+
+my ($oldestXid) = test_oldest_xid_retention();
+
+cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin,
+	'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+##################################################
+# Drop slot
+##################################################
+#
+is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+# Make sure slots on replicas are droppable, and properly clear the upstream's xmin
+$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually');
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. catalog_xmin should become NULL because we dropped
+# the logical slot.
+($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical',
+	"xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 1 : hot_standby_feedback off
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_replica->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on master. Both should be NULL since hs_feedback is off
+($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical',
+	"xmin IS NULL AND catalog_xmin IS NULL");
+
+make_slot_active();
+
+# This should trigger the conflict
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped();
+
+# Turn hot_standby_feedback back on
+$node_replica->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_replica->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+($xmin, $catalog_xmin) = wait_for_xmins($node_master, 'master_physical',
+	"xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 2 : incorrect wal_level at master
+##################################################
+
+create_logical_slots();
+
+make_slot_active();
+
+# Make master wal_level replica. This will trigger slot conflict.
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_master->restart;
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped();
+
+# Restore master wal_level
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_master->restart;
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+##################################################
+# Recovery: drop database drops slots, including active slots.
+##################################################
+
+# Create a couple of slots on the DB to ensure they are dropped when we drop
+# the DB.
+create_logical_slots();
+
+make_slot_active();
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+is($node_replica->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres'),
+   0, 'created otherslot on postgres')
+	or BAIL_OUT('cannot continue if slot creation fails, see logs');
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby created');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush'));
+
+is($node_replica->safe_psql('postgres',
+	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+	'database dropped on standby');
+
+check_slots_dropped();
+
+is($node_replica->slot('otherslot')->{'slot_type'}, 'logical',
+	'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_replica->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
-- 
2.1.4

