There were conflicts again, so I rebased once more.  Didn't do anything
else.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From e2f5cedfc35652f198f9ca3bca9a45572f8845fb Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khande...@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:15 +0530
Subject: [PATCH v6 1/5] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.

Andres Freund and Amit Khandekar.
---
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 22 ++++++++-
 src/backend/replication/logical/logical.c | 37 ++++++++-------
 src/backend/replication/slot.c            | 57 +++++++++++++++--------
 src/backend/replication/walsender.c       | 10 ++--
 src/include/access/xlog.h                 |  1 +
 6 files changed, 98 insertions(+), 40 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index de2d4ee582..a666b4b935 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4967,6 +4967,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevel(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3abf8..3a072af75b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -187,11 +187,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+		{
+			xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+			/*
+			 * If wal_level on primary is reduced to less than logical, then we
+			 * want to prevent existing logical slots from being used.
+			 * Existing logical slots on standby get dropped when this WAL
+			 * record is replayed; and further, slot creation fails when the
+			 * wal level is not sufficient; but all these operations are not
+			 * synchronized, so a logical slot may creep in while the wal_level
+			 * is being reduced.  Hence this extra check.
+			 */
+			if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical decoding on standby requires "
+								"wal_level >= logical on master")));
+			break;
+		}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..03463719f8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -91,23 +91,22 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires "
+							"wal_level >= logical on master")));
+	}
 }
 
 /*
@@ -240,6 +239,12 @@ CreateInitDecodingContext(char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1cec53d748..00aa95ba15 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,37 +1016,56 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be built
+		 * using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base backup
+		 * has to start replay at.
 		 */
+		if (SlotIsPhysical(slot))
+			restart_lsn = GetRedoRecPtr();
+		else if (RecoveryInProgress())
+		{
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+			/*
+			 * Replay pointer may point one past the end of the record. If that
+			 * is a XLOG page boundary, it will not be a valid LSN for the
+			 * start of a record, so bump it up past the page header.
+			 */
+			if (!XRecOffIsValid(restart_lsn))
+			{
+				if (restart_lsn % XLOG_BLCKSZ != 0)
+					elog(ERROR, "invalid replay pointer");
+				/* For the first page of a segment file, it's a long header */
+				if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
+					restart_lsn += SizeOfXLogLongPHD;
+				else
+					restart_lsn += SizeOfXLogShortPHD;
+			}
+		}
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
+
 		if (!RecoveryInProgress() && SlotIsLogical(slot))
 		{
 			XLogRecPtr	flushptr;
 
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
 
 			/* and make sure it's fsynced to disk */
 			XLogFlush(flushptr);
 		}
-		else
-		{
-			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..9274fec10b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2819,10 +2819,12 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr();
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr();
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		flushPtr = (am_cascading_walsender ?
+					GetStandbyFlushRecPtr() : GetFlushRecPtr());
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..6228779df6 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -298,6 +298,7 @@ extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevel(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
-- 
2.20.1

>From a89d06fbca4b350192ad7895796783bf50a17757 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khande...@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 2/5] Add info in WAL records in preparation for logical
 slot conflict handling.

When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict.  While subsequent commits
will do the actual conflict handling, this commit adds a new field
onCatalogTable in such WAL records, that is true for catalog tables,
so as to arrange for conflict handling.

Andres Freund.
---
 src/backend/access/gist/gist.c          |  2 +-
 src/backend/access/gist/gistbuild.c     |  2 +-
 src/backend/access/gist/gistutil.c      |  4 ++--
 src/backend/access/gist/gistxlog.c      |  4 +++-
 src/backend/access/hash/hashinsert.c    |  2 ++
 src/backend/access/heap/heapam.c        | 10 +++++++---
 src/backend/access/heap/vacuumlazy.c    |  2 +-
 src/backend/access/heap/visibilitymap.c |  2 +-
 src/backend/access/nbtree/nbtpage.c     |  4 ++++
 src/backend/access/spgist/spgvacuum.c   |  8 ++++++++
 src/backend/utils/cache/lsyscache.c     | 16 ++++++++++++++++
 src/include/access/gist_private.h       |  6 +++---
 src/include/access/gistxlog.h           |  3 ++-
 src/include/access/hash_xlog.h          |  1 +
 src/include/access/heapam_xlog.h        |  8 ++++++--
 src/include/access/nbtxlog.h            |  2 ++
 src/include/access/spgxlog.h            |  1 +
 src/include/utils/lsyscache.h           |  1 +
 src/include/utils/rel.h                 |  9 +++++++++
 19 files changed, 71 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 90c46e86a1..43ce813a50 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -342,7 +342,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		for (; ptr; ptr = ptr->next)
 		{
 			/* Allocate new page */
-			ptr->buffer = gistNewBuffer(rel);
+			ptr->buffer = gistNewBuffer(heapRel, rel);
 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
 			ptr->page = BufferGetPage(ptr->buffer);
 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 671b5e9186..c4c5f41a2a 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -171,7 +171,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.giststate->tempCxt = createTempGistContext();
 
 	/* initialize the root page */
-	buffer = gistNewBuffer(index);
+	buffer = gistNewBuffer(heap, index);
 	Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
 	page = BufferGetPage(buffer);
 
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index dd975b164c..62b4e9e8db 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -806,7 +806,7 @@ gistcheckpage(Relation rel, Buffer buf)
  * Caller is responsible for initializing the page by calling GISTInitBuffer
  */
 Buffer
-gistNewBuffer(Relation r)
+gistNewBuffer(Relation heapRel, Relation r)
 {
 	Buffer		buffer;
 	bool		needLock;
@@ -850,7 +850,7 @@ gistNewBuffer(Relation r)
 				 * page's deleteXid.
 				 */
 				if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
-					gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
+					gistXLogPageReuse(heapRel, r, blkno, GistPageGetDeleteXid(page));
 
 				return buffer;
 			}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index d3f3a7b803..90bc4895b2 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -596,7 +596,8 @@ gistXLogPageDelete(Buffer buffer, FullTransactionId xid,
  * Write XLOG record about reuse of a deleted page.
  */
 void
-gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid)
+gistXLogPageReuse(Relation heapRel, Relation rel,
+				  BlockNumber blkno, FullTransactionId latestRemovedXid)
 {
 	gistxlogPageReuse xlrec_reuse;
 
@@ -607,6 +608,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemov
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(heapRel);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedFullXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 2ebe671967..e6cef08330 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "storage/buf_internals.h"
 #include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
+			xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
 			xlrec.latestRemovedXid = latestRemovedXid;
 			xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 29694b8aa4..bf6bfe69dc 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7145,12 +7145,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
 {
 	xl_heap_cleanup_info xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.node = rnode;
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+	xlrec.node = rel->rd_node;
 	xlrec.latestRemovedXid = latestRemovedXid;
 
 	XLogBeginInsert();
@@ -7186,6 +7187,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
@@ -7236,6 +7238,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.ntuples = ntuples;
 
@@ -7266,7 +7269,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  * heap_buffer, if necessary.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
@@ -7276,6 +7279,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 03c43efc32..b1014ef7b9 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -659,7 +659,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 	 * No need to write the record at all unless it contains a valid value
 	 */
 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+		(void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
 }
 
 /*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 0a51678c40..89f8955f36 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -282,7 +282,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			if (XLogRecPtrIsInvalid(recptr))
 			{
 				Assert(!InRecovery);
-				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+				recptr = log_heap_visible(rel, heapBuf, vmBuf,
 										  cutoff_xid, flags);
 
 				/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 39b8f17f4b..22ea714382 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -32,6 +32,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
 
 static BTMetaPageData *_bt_getmeta(Relation rel, Buffer metabuf);
@@ -760,6 +761,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1210,6 +1212,8 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.onCatalogTable =
+			RelationIsAccessibleInLogicalDecoding(heapRel);
 		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.ndeleted = ndeletable;
 
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index bd98707f3c..f97597dba1 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
 
 
 /* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,13 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
 	spgxlogVacuumRedirect xlrec;
 
+	/*
+	 * There is no chance of endless recursion even when we are doing catalog
+	 * acceses here; because, spgist is never used for catalogs. Check
+	 * comments in RelationIsAccessibleInLogicalDecoding().
+	 */
+	xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
+
 	xlrec.nToPlaceholder = 0;
 	xlrec.newestRedirectXid = InvalidTransactionId;
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 27bbb58f56..e663523af1 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
@@ -1919,6 +1921,20 @@ get_rel_persistence(Oid relid)
 	return result;
 }
 
+bool
+get_rel_logical_catalog(Oid relid)
+{
+	bool	res;
+	Relation rel;
+
+	/* assume previously locked */
+	rel = table_open(relid, NoLock);
+	res = RelationIsAccessibleInLogicalDecoding(rel);
+	table_close(rel, NoLock);
+
+	return res;
+}
+
 
 /*				---------- TRANSFORM CACHE ----------						 */
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 18f2b0d98e..eff9696b18 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -439,8 +439,8 @@ extern XLogRecPtr gistXLogPageDelete(Buffer buffer,
 									 FullTransactionId xid, Buffer parentBuffer,
 									 OffsetNumber downlinkOffset);
 
-extern void gistXLogPageReuse(Relation rel, BlockNumber blkno,
-							  FullTransactionId latestRemovedXid);
+extern void gistXLogPageReuse(Relation heapRel, Relation rel,
+				  BlockNumber blkno, FullTransactionId latestRemovedXid);
 
 extern XLogRecPtr gistXLogUpdate(Buffer buffer,
 								 OffsetNumber *todelete, int ntodelete,
@@ -478,7 +478,7 @@ extern bool gistproperty(Oid index_oid, int attno,
 extern bool gistfitpage(IndexTuple *itvec, int len);
 extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
 extern void gistcheckpage(Relation rel, Buffer buf);
-extern Buffer gistNewBuffer(Relation r);
+extern Buffer gistNewBuffer(Relation heapRel, Relation r);
 extern bool gistPageRecyclable(Page page);
 extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
 						   OffsetNumber off);
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 55fc843d3a..390b8b65f6 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -48,9 +48,9 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		ntodelete;		/* number of deleted offsets */
-
 	/*
 	 * In payload of blk 0 : todelete OffsetNumbers
 	 */
@@ -96,6 +96,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	FullTransactionId latestRemovedFullXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index d1aa6daa40..b44892cc48 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 95d18cdb12..5edeb99808 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_clean
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
  */
 typedef struct xl_heap_cleanup_info
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	TransactionId latestRemovedXid;
 } xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
  */
 typedef struct xl_heap_freeze_page
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint16		ntuples;
 } xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint8		flags;
 } xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
 										TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 								 OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  bool *totally_frozen);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 									  xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 								   Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 347976c532..fe69dabca0 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -186,6 +186,7 @@ typedef struct xl_btree_dedup
  */
 typedef struct xl_btree_delete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint32		ndeleted;
 
@@ -203,6 +204,7 @@ typedef struct xl_btree_delete
  */
 typedef struct xl_btree_reuse_page
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 63d3c63db2..83121c7266 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+	bool		onCatalogTable;
 	uint16		nToPlaceholder; /* number of redirects to make placeholders */
 	OffsetNumber firstPlaceholder;	/* first placeholder tuple to remove */
 	TransactionId newestRedirectXid;	/* newest XID of removed redirects */
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 4e646c55e9..ab0d16c1f9 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -132,6 +132,7 @@ extern char get_rel_relkind(Oid relid);
 extern bool get_rel_relispartition(Oid relid);
 extern Oid	get_rel_tablespace(Oid relid);
 extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
 extern Oid	get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
 extern Oid	get_transform_tosql(Oid typid, Oid langid, List *trftypes);
 extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 44ed04dd3f..b6dc5dfc7d 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"
@@ -316,6 +317,9 @@ typedef struct StdRdOptions
  * RelationIsUsedAsCatalogTable
  *		Returns whether the relation should be treated as a catalog table
  *		from the pov of logical decoding.  Note multiple eval of argument!
+ *		This definition should not invoke anything that performs catalog
+ *		access; otherwise it may cause infinite recursion. Check the comments
+ *		in RelationIsAccessibleInLogicalDecoding() for details.
  */
 #define RelationIsUsedAsCatalogTable(relation)	\
 	((relation)->rd_options && \
@@ -580,6 +584,11 @@ typedef struct ViewOptions
  * RelationIsAccessibleInLogicalDecoding
  *		True if we need to log enough information to have access via
  *		decoding snapshot.
+ *		This definition should not invoke anything that performs catalog
+ *		access. Otherwise, e.g. logging a WAL entry for catalog relation may
+ *		invoke this function, which will in turn do catalog access, which may
+ *		in turn cause another similar WAL entry to be logged, leading to
+ *		infinite recursion.
  */
 #define RelationIsAccessibleInLogicalDecoding(relation) \
 	(XLogLogicalInfoActive() && \
-- 
2.20.1

>From ee5c0944effd0634d82d90828c2f749e9abf5fba Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khande...@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 3/5] Handle logical slot conflicts on standby.

During WAL replay on standby, when slot conflict is identified,
drop such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_get_activity field:
confl_logicalslot.

Amit Khandekar, reviewed by Andres Freund.
---
 doc/src/sgml/monitoring.sgml         |   6 +
 src/backend/access/gist/gistxlog.c   |   4 +-
 src/backend/access/hash/hash_xlog.c  |   3 +-
 src/backend/access/heap/heapam.c     |  13 +-
 src/backend/access/nbtree/nbtxlog.c  |   4 +-
 src/backend/access/spgist/spgxlog.c  |   1 +
 src/backend/access/transam/xlog.c    |  14 +++
 src/backend/catalog/system_views.sql |   1 +
 src/backend/postmaster/pgstat.c      |   4 +
 src/backend/replication/slot.c       | 176 +++++++++++++++++++++++++++
 src/backend/storage/ipc/procarray.c  |   4 +
 src/backend/storage/ipc/procsignal.c |   3 +
 src/backend/storage/ipc/standby.c    |   4 +-
 src/backend/tcop/postgres.c          |  22 ++++
 src/backend/utils/adt/pgstatfuncs.c  |  16 +++
 src/include/catalog/pg_proc.dat      |   5 +
 src/include/pgstat.h                 |   1 +
 src/include/replication/slot.h       |   2 +
 src/include/storage/procsignal.h     |   1 +
 src/include/storage/standby.h        |   2 +-
 src/test/regress/expected/rules.out  |   1 +
 21 files changed, 278 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7626987808..aa65478b31 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2742,6 +2742,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry>Number of queries in this database that have been canceled due to
       old snapshots</entry>
     </row>
+    <row>
+     <entry><structfield>confl_logicalslot</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of queries in this database that have been canceled due to
+      logical slots</entry>
+    </row>
     <row>
      <entry><structfield>confl_bufferpin</structfield></entry>
      <entry><type>bigint</type></entry>
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 90bc4895b2..289176305a 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -414,6 +415,7 @@ gistRedoPageReuse(XLogReaderState *record)
 
 			latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
 			ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+												xlrec->onCatalogTable,
 												xlrec->node);
 		}
 	}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index 3c60677662..5767890fa4 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index bf6bfe69dc..440f444dc1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7703,7 +7703,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
 	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, xlrec->node);
 
 	/*
 	 * Actual operation is a no-op. Record type exists to provide a means for
@@ -7739,7 +7740,8 @@ heap_xlog_clean(XLogReaderState *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
@@ -7835,7 +7837,9 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+											xlrec->onCatalogTable,
+											rnode);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -7972,7 +7976,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
 		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 99d0914e72..09734b3e94 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -659,7 +659,8 @@ btree_xlog_delete(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	/*
@@ -935,6 +936,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable,
 											xlrec->node);
 	}
 }
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 7be2291d07..0672f994ec 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+												xldata->onCatalogTable,
 												node);
 		}
 	}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a666b4b935..c679f2d767 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9942,6 +9942,20 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Drop logical slots if we are in hot standby and the primary does not
+		 * have a WAL level sufficient for logical decoding. No need to search
+		 * for potentially conflicting logically slots if standby is running
+		 * with wal_level lower than logical, because in that case, we would
+		 * have either disallowed creation of logical slots or dropped existing
+		 * ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId,
+				gettext_noop("Logical decoding on standby requires wal_level >= logical on master."));
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b8a3f46912..471a3e78f9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -906,6 +906,7 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace,
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
             pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
     FROM pg_database D;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f9287b7942..0dc26dcb19 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4599,6 +4599,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 	dbentry->n_conflict_tablespace = 0;
 	dbentry->n_conflict_lock = 0;
 	dbentry->n_conflict_snapshot = 0;
+	dbentry->n_conflict_logicalslot = 0;
 	dbentry->n_conflict_bufferpin = 0;
 	dbentry->n_conflict_startup_deadlock = 0;
 	dbentry->n_temp_files = 0;
@@ -6223,6 +6224,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			dbentry->n_conflict_snapshot++;
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			dbentry->n_conflict_logicalslot++;
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
 			dbentry->n_conflict_bufferpin++;
 			break;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 00aa95ba15..3a18ef052d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
 #include "pgstat.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
+#include "storage/lock.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -101,6 +102,7 @@ int			max_replication_slots = 0;	/* the maximum number of replication
 
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static void ReplicationSlotDropConflicting(ReplicationSlot *slot);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -637,6 +639,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	LWLockRelease(ReplicationSlotAllocationLock);
 }
 
+/*
+ * Permanently drop a conflicting replication slot. If it's already active by
+ * another backend, send it a recovery conflict signal, and then try again.
+ */
+static void
+ReplicationSlotDropConflicting(ReplicationSlot *slot)
+{
+	pid_t		active_pid;
+	PGPROC	   *proc;
+	VirtualTransactionId	vxid;
+
+	ConditionVariablePrepareToSleep(&slot->active_cv);
+	while (1)
+	{
+		SpinLockAcquire(&slot->mutex);
+		active_pid = slot->active_pid;
+		if (active_pid == 0)
+			active_pid = slot->active_pid = MyProcPid;
+		SpinLockRelease(&slot->mutex);
+
+		/* Drop the acquired slot, unless it is acquired by another backend */
+		if (active_pid == MyProcPid)
+		{
+			elog(DEBUG1, "acquired conflicting slot, now dropping it");
+			ReplicationSlotDropPtr(slot);
+			break;
+		}
+
+		/* Send the other backend, a conflict recovery signal */
+
+		SetInvalidVirtualTransactionId(vxid);
+		LWLockAcquire(ProcArrayLock, LW_SHARED);
+		proc = BackendPidGetProcWithLock(active_pid);
+		if (proc)
+			GET_VXID_FROM_PGPROC(vxid, *proc);
+		LWLockRelease(ProcArrayLock);
+
+		/*
+		 * If coincidently that process finished, some other backend may
+		 * acquire the slot again. So start over again.
+		 * Note: Even if vxid.localTransactionId is invalid, we need to cancel
+		 * that backend, because there is no other way to make it release the
+		 * slot. So don't bother to validate vxid.localTransactionId.
+		 */
+		if (vxid.backendId == InvalidBackendId)
+			continue;
+
+		elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot",
+					 active_pid, vxid.backendId);
+
+		CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+		ConditionVariableSleep(&slot->active_cv,
+							   WAIT_EVENT_REPLICATION_SLOT_DROP);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
 /*
  * Serialize the currently acquired slot's state from memory to disk, thereby
  * guaranteeing the current state will survive a crash.
@@ -1083,6 +1143,122 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that rows older than xid might have been
+ * removed. Therefore we need to drop slots that depend on seeing those rows.
+ * When xid is invalid, drop all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be dropped. Also, when xid is invalid, a common 'conflict_reason' is
+ * provided for the error detail; otherwise it is NULL, in which case it is
+ * constructed out of the xid value.
+ */
+void
+ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid,
+										char *conflict_reason)
+{
+	int			i;
+	bool		found_conflict = false;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	if (found_conflict)
+	{
+		CHECK_FOR_INTERRUPTS();
+		found_conflict = false;
+	}
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* We are only dealing with *logical* slot conflicts. */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* Invalid xid means caller is asking to drop all logical slots */
+		if (!TransactionIdIsValid(xid))
+			found_conflict = true;
+		else
+		{
+			TransactionId slot_xmin;
+			TransactionId slot_catalog_xmin;
+			StringInfoData	conflict_str, conflict_xmins;
+			char	   *conflict_sentence =
+				gettext_noop("Slot conflicted with xid horizon which was being increased to");
+
+			/* not our database, skip */
+			if (s->data.database != InvalidOid && s->data.database != dboid)
+				continue;
+
+			SpinLockAcquire(&s->mutex);
+			slot_xmin = s->data.xmin;
+			slot_catalog_xmin = s->data.catalog_xmin;
+			SpinLockRelease(&s->mutex);
+
+			/*
+			 * Build the conflict_str which will look like :
+			 * "Slot conflicted with xid horizon which was being increased
+			 * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)."
+			 */
+			initStringInfo(&conflict_xmins);
+			if (TransactionIdIsValid(slot_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_xmin, xid))
+			{
+				appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin);
+			}
+			if (TransactionIdIsValid(slot_catalog_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+				appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d",
+								 conflict_xmins.len > 0 ? ", " : "",
+								 slot_catalog_xmin);
+
+			if (conflict_xmins.len > 0)
+			{
+				initStringInfo(&conflict_str);
+				appendStringInfo(&conflict_str, "%s %d (%s).",
+								 conflict_sentence, xid, conflict_xmins.data);
+				found_conflict = true;
+				conflict_reason = conflict_str.data;
+			}
+		}
+
+		if (found_conflict)
+		{
+			NameData	slotname;
+
+			SpinLockAcquire(&s->mutex);
+			slotname = s->data.name;
+			SpinLockRelease(&s->mutex);
+
+			/* ReplicationSlotDropConflicting() will acquire the lock below */
+			LWLockRelease(ReplicationSlotControlLock);
+
+			ReplicationSlotDropConflicting(s);
+
+			ereport(LOG,
+					(errmsg("dropped conflicting slot %s", NameStr(slotname)),
+					 errdetail("%s", conflict_reason)));
+
+			/* We released the lock above; so re-scan the slots. */
+			goto restart;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index f45a619deb..db232306b8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2670,6 +2670,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
 
 		GET_VXID_FROM_PGPROC(procvxid, *proc);
 
+		/*
+		 * Note: vxid.localTransactionId can be invalid, which means the
+		 * request is to signal the pid that is not running a transaction.
+		 */
 		if (procvxid.backendId == vxid.backendId &&
 			procvxid.localTransactionId == vxid.localTransactionId)
 		{
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 65d3946386..68c438d0c5 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -558,6 +558,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 08f695a980..de64eee0e7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
 #include "access/xloginsert.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -296,7 +297,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									bool onCatalogTable, RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 00c77b66c7..c3ad1d5988 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2445,6 +2445,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -2913,6 +2916,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 			case PROCSIG_RECOVERY_CONFLICT_LOCK:
 			case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
 			case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				/*
+				 * For conflicts that require a logical slot to be dropped, the
+				 * requirement is for the signal receiver to release the slot,
+				 * so that it could be dropped by the signal sender. So for
+				 * normal backends, the transaction should be aborted, just
+				 * like for other recovery conflicts. But if it's walsender on
+				 * standby, then it has to be killed so as to release an
+				 * acquired logical slot.
+				 */
+				if (am_cascading_walsender &&
+					reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+					MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
+				{
+					RecoveryConflictPending = true;
+					QueryCancelPending = true;
+					InterruptPending = true;
+					break;
+				}
 
 				/*
 				 * If we aren't in a transaction any longer then ignore.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index cea01534a5..b4b1dbc177 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1471,6 +1471,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS)
 	PG_RETURN_INT64(result);
 }
 
+Datum
+pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS)
+{
+	Oid			dbid = PG_GETARG_OID(0);
+	int64		result;
+	PgStat_StatDBEntry *dbentry;
+
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (dbentry->n_conflict_logicalslot);
+
+	PG_RETURN_INT64(result);
+}
+
 Datum
 pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS)
 {
@@ -1514,6 +1529,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 		result = (int64) (dbentry->n_conflict_tablespace +
 						  dbentry->n_conflict_lock +
 						  dbentry->n_conflict_snapshot +
+						  dbentry->n_conflict_logicalslot +
 						  dbentry->n_conflict_bufferpin +
 						  dbentry->n_conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7fb574f9dc..12e7be1132 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5323,6 +5323,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '3434',
+  descr => 'statistics: recovery conflicts in database caused by logical replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 1a19921f80..1f4a000c48 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -604,6 +604,7 @@ typedef struct PgStat_StatDBEntry
 	PgStat_Counter n_conflict_tablespace;
 	PgStat_Counter n_conflict_lock;
 	PgStat_Counter n_conflict_snapshot;
+	PgStat_Counter n_conflict_logicalslot;
 	PgStat_Counter n_conflict_bufferpin;
 	PgStat_Counter n_conflict_startup_deadlock;
 	PgStat_Counter n_temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 3e95b019b3..200720c589 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -204,4 +204,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason);
+
 #endif							/* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 90607df106..f400069c78 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -39,6 +39,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index cfbe426e5a..7e0bc43ac4 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
-												RelFileNode node);
+									bool onCatalogTable, RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index c7304611c3..5ed064627f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1845,6 +1845,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
     pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace,
     pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
+    pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot,
     pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
     pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
    FROM pg_database d;
-- 
2.20.1

>From e5a1e2d6efb1a0a24291f7d0478b7d726c0da3c5 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khande...@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 4/5] New TAP test for logical decoding on standby.

This test was originally written by Craig Ringer, then
extended/modified by me, to test various slot conflict scenarios.

Authors: Craig Ringer, Amit Khandekar.
---
 src/test/perl/PostgresNode.pm                 |  37 +++
 .../t/018_standby_logical_decoding_xmins.pl   | 272 ++++++++++++++++++
 .../019_standby_logical_decoding_conflicts.pl | 216 ++++++++++++++
 3 files changed, 525 insertions(+)
 create mode 100644 src/test/recovery/t/018_standby_logical_decoding_xmins.pl
 create mode 100644 src/test/recovery/t/019_standby_logical_decoding_conflicts.pl

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 9575268bd7..3dbfcf56c8 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2117,6 +2117,43 @@ sub pg_recvlogical_upto
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+	my ($self, $master, $slot_name, $dbname) = @_;
+	my ($stdout, $stderr);
+
+	my $handle;
+
+	$handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+	# Once slot restart_lsn is created, the standby looks for xl_running_xacts
+	# WAL record from the restart_lsn onwards. So firstly, wait until the slot
+	# restart_lsn is evaluated.
+
+	$self->poll_query_until(
+		'postgres', qq[
+		SELECT restart_lsn IS NOT NULL
+		FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+	]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+	# Now arrange for the xl_running_xacts record for which pg_recvlogical
+	# is waiting.
+	$master->safe_psql('postgres', 'CHECKPOINT');
+
+	$handle->finish();
+
+	is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+		or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/t/018_standby_logical_decoding_xmins.pl b/src/test/recovery/t/018_standby_logical_decoding_xmins.pl
new file mode 100644
index 0000000000..d654d79526
--- /dev/null
+++ b/src/test/recovery/t/018_standby_logical_decoding_xmins.pl
@@ -0,0 +1,272 @@
+# logical decoding on a standby : ensure xmins are appropriately updated
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 23;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_master = get_new_node('master');
+my $node_standby = get_new_node('standby');
+
+# Name for the physical slot on master
+my $master_slotname = 'master_physical';
+# Name for the logical slot on standby
+my $standby_slotname = 'standby_logical';
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+}
+
+
+########################
+# Initialize master node
+########################
+
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->safe_psql('postgres', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]);
+my $backup_name = 'b1';
+$node_master->backup($backup_name);
+
+# After slot creation, xmins must be null
+$slot = $node_master->slot($master_slotname);
+is($slot->{'xmin'}, '', "xmin null");
+is($slot->{'catalog_xmin'}, '', "catalog_xmin null");
+
+#######################
+# Initialize slave node
+#######################
+
+$node_standby->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$master_slotname']);
+$node_standby->start;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+################################
+# xmin/catalog_xmin verification before and after standby-logical-slot creation.
+################################
+
+# With hot_standby_feedback off, xmin and catalog_xmin must still be null
+$slot = $node_master->slot($master_slotname);
+is($slot->{'xmin'}, '', "xmin null after standby join");
+is($slot->{'catalog_xmin'}, '', "catalog_xmin null after standby join");
+
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+# Create new slots on the standby, ignoring the ones on the master completely.
+#
+# This must succeed since we know we have a catalog_xmin reservation. We
+# might've already sent hot standby feedback to advance our physical slot's
+# catalog_xmin but not received the corresponding xlog for the catalog xmin
+# advance, in which case we'll create a slot that isn't usable. The calling
+# application can prevent this by creating a temporary slot on the master to
+# lock in its catalog_xmin. For a truly race-free solution we'd need
+# master-to-standby hot_standby_feedback replies.
+#
+# In this case it won't race because there's no concurrent activity on the
+# master.
+#
+$node_standby->create_logical_slot_on_standby($node_master, $standby_slotname, 'postgres');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# Now that slot is created on standby, xmin and catalog_xmin should be non NULL
+# on both master and standby. But on master, the xmins will be updated only
+# after hot standby feedback is received.
+wait_for_xmins($node_master, $master_slotname, "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+
+$slot = $node_standby->slot($standby_slotname);
+is($slot->{'xmin'}, '', "logical xmin null");
+isnt($slot->{'catalog_xmin'}, '', "logical catalog_xmin not null");
+
+
+################################
+# Standby logical slot should be able to fetch the table changes even when the
+# table is dropped.
+################################
+
+$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]);
+$node_master->safe_psql('postgres', 'DROP TABLE test_table');
+$node_master->safe_psql('postgres', 'VACUUM');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# Should show the inserts even when the table is dropped on master
+($ret, $stdout, $stderr) = $node_standby->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($stderr, '', 'stderr is empty');
+is($ret, 0, 'replay from slot succeeded')
+	or die 'cannot continue if slot replay fails';
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+$slot = $node_master->slot($master_slotname);
+isnt($slot->{'xmin'}, '', "physical xmin not null");
+my $saved_physical_catalog_xmin = $slot->{'catalog_xmin'};
+isnt($saved_physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+$slot = $node_standby->slot($standby_slotname);
+is($slot->{'xmin'}, '', "logical xmin null");
+my $saved_logical_catalog_xmin = $slot->{'catalog_xmin'};
+isnt($saved_logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+
+################################
+# Catalog xmins should advance after standby logical slot fetches the changes.
+################################
+
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]);
+$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
+for my $i (0 .. 2000)
+{
+    $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_2();]);
+$node_master->safe_psql('postgres', 'VACUUM');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+cmp_ok($node_standby->slot($standby_slotname)->{'catalog_xmin'}, "==",
+	   $saved_logical_catalog_xmin,
+	   "logical slot catalog_xmin hasn't advanced before get_changes");
+
+($ret, $stdout, $stderr) = $node_standby->psql('postgres',
+	qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+isnt($stdout, '', 'replayed some rows');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+# logical slot catalog_xmin on slave should advance after pg_logical_slot_get_changes
+wait_for_xmins($node_standby, $standby_slotname,
+			   "catalog_xmin::varchar::int > ${saved_logical_catalog_xmin}");
+$slot = $node_standby->slot($standby_slotname);
+my $new_logical_catalog_xmin = $slot->{'catalog_xmin'};
+is($slot->{'xmin'}, '', "logical xmin null");
+
+# hot standby feedback should advance master's phys catalog_xmin now that the
+# standby's slot doesn't hold it down as far.
+# But master's phys catalog_xmin should not go past the slave's logical slot's
+# catalog_xmin, even while master's phys xmin advances.
+#
+# First, make sure master's xmin is advanced. This happens on hot standby
+# feedback. So this check for master's xmin advance also makes sure hot standby
+# feedback has reached the master, which is required for the subsequent
+# catalog_xmin test.
+my $temp_phys_xmin = $node_master->slot($master_slotname)->{'xmin'};
+$node_master->safe_psql('postgres', 'SELECT txid_current()');
+wait_for_xmins($node_master, $master_slotname,
+			   "xmin::varchar::int > ${temp_phys_xmin}");
+$slot = $node_master->slot($master_slotname);
+# Now check that the master's phys catalog_xmin has advanced but not beyond
+# standby's logical catalog_xmin
+cmp_ok($slot->{'catalog_xmin'}, ">", $saved_physical_catalog_xmin,
+	'upstream physical slot catalog_xmin has advanced with hs_feedback on');
+cmp_ok($slot->{'catalog_xmin'}, "==", $new_logical_catalog_xmin,
+	'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on');
+
+
+######################
+# Upstream oldestXid should not go past downstream catalog_xmin
+######################
+
+# First burn some xids on the master in another DB, so we push the master's
+# nextXid ahead.
+foreach my $i (1 .. 100)
+{
+	$node_master->safe_psql('postgres', 'SELECT txid_current()');
+}
+
+# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
+# past our needed xmin. The only way we have visibility into that is to force
+# a checkpoint.
+$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
+foreach my $dbname ('template1', 'postgres', 'postgres', 'template0')
+{
+	$node_master->safe_psql($dbname, 'VACUUM FREEZE');
+}
+$node_master->safe_psql('postgres', 'CHECKPOINT');
+IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
+	or die "pg_controldata failed with $?";
+my @checkpoint = split('\n', $stdout);
+my $oldestXid = '';
+foreach my $line (@checkpoint)
+{
+	if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
+	{
+		$oldestXid = $1;
+	}
+}
+die 'no oldestXID found in checkpoint' unless $oldestXid;
+
+cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'},
+	   'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
+
+$node_master->safe_psql('postgres',
+	"UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
+
+
+##################################################
+# Drop slot
+# Make sure standby slots are droppable, and properly clear the upstream's xmin
+##################################################
+
+is($node_standby->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on');
+
+$node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$standby_slotname')]);
+
+is($node_standby->slot($standby_slotname)->{'slot_type'}, '', 'slot on standby dropped manually');
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. catalog_xmin should become NULL because we dropped
+# the logical slot.
+wait_for_xmins($node_master, $master_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NULL");
diff --git a/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl b/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl
new file mode 100644
index 0000000000..d0c449338f
--- /dev/null
+++ b/src/test/recovery/t/019_standby_logical_decoding_conflicts.pl
@@ -0,0 +1,216 @@
+# logical decoding on a standby : test conflict recovery; and other tests that
+# verify slots get dropped as expected.
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 24;
+use RecursiveCopy;
+use File::Copy;
+use Time::HiRes qw(usleep);
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_master = get_new_node('master');
+my $node_standby = get_new_node('standby');
+
+# Name for the physical slot on master
+my $master_slotname = 'master_physical';
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state
+sub wait_for_xmins
+{
+	my ($node, $slotname, $check_expr) = @_;
+
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT $check_expr
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = '$slotname';
+	]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+	$node_standby->create_logical_slot_on_standby($node_master, 'dropslot', 'testdb');
+	$node_standby->create_logical_slot_on_standby($node_master, 'activeslot', 'testdb');
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots()
+sub make_slot_active
+{
+	my $slot_user_handle;
+
+	# make sure activeslot is in use
+	print "starting pg_recvlogical\n";
+	$slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
+
+	while (!$node_standby->slot('activeslot')->{'active_pid'})
+	{
+		usleep(100_000);
+		print "waiting for slot to become active\n";
+	}
+	return $slot_user_handle;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'dropslot'.
+sub check_slots_dropped
+{
+	my ($slot_user_handle) = @_;
+	my $return;
+
+	is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped');
+	is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+	# our client should've terminated in response to the walsender error
+	eval {
+		$slot_user_handle->finish;
+	};
+	$return = $?;
+	cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n");
+	if ($return) {
+		like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict');
+		like($stderr, qr/must be dropped/, 'recvlogical error detail');
+	}
+
+	return 0;
+}
+
+
+########################
+# Initialize master node
+########################
+
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+# send status rapidly so we promptly advance xmin on master
+wal_receiver_status_interval = 1
+# very promptly terminate conflicting backends
+max_standby_streaming_delay = '2s'
+});
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_master->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$master_slotname');]);
+my $backup_name = 'b1';
+$node_master->backup($backup_name);
+
+#######################
+# Initialize slave node
+#######################
+
+$node_standby->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+	qq[primary_slot_name = '$master_slotname']);
+$node_standby->start;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 1 : hot_standby_feedback off
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on slave.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on master. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_master, $master_slotname,
+			   "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active();
+
+# This should trigger the conflict
+$node_master->safe_psql('testdb', 'VACUUM FULL');
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped($handle);
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on master. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_master, $master_slotname,
+			   "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Drop conflicting slots, including in-use slots
+# Scenario 2 : incorrect wal_level at master
+##################################################
+
+create_logical_slots();
+
+$handle = make_slot_active();
+
+# Make master wal_level replica. This will trigger slot conflict.
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_master->restart;
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+check_slots_dropped($handle);
+
+# Restore master wal_level
+$node_master->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_master->restart;
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+create_logical_slots();
+$handle = make_slot_active();
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_master, 'otherslot', 'postgres');
+
+# dropdb on the master to verify slots are dropped on standby
+$node_master->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_master->wait_for_catchup($node_standby, 'replay', $node_master->lsn('flush'));
+
+is($node_standby->safe_psql('postgres',
+	q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+	'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+	'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
-- 
2.20.1

>From 20b254f9c2379833688a46581a8bebf07280e818 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khande...@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 5/5] Doc changes describing details about logical decoding
 on standby.

---
 doc/src/sgml/logicaldecoding.sgml | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index bce6d379bf..745c65ff02 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -248,6 +248,24 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     dropped.  Existing logical slots on standby also get dropped if wal_level
+     on primary is reduced to less than 'logical'.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.20.1

Reply via email to