From 241adc15d4175d66ee96414e360c9878a5f24023 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 12 May 2017 14:28:05 +0900
Subject: [PATCH 1/2] Fix a deadlock bug between DROP SUBSCRIPTION and apply
 worker.

---
 src/backend/commands/subscriptioncmds.c    | 48 +++++++++++++++---------------
 src/backend/replication/logical/launcher.c |  8 +++++
 src/backend/storage/lmgr/lwlock.c          |  2 +-
 src/backend/storage/lmgr/lwlocknames.txt   |  1 +
 4 files changed, 34 insertions(+), 25 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b76cdc5..1cb8353 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -811,18 +811,32 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	StringInfoData		cmd;
 
 	/*
-	 * Lock pg_subscription with AccessExclusiveLock to ensure
-	 * that the launcher doesn't restart new worker during dropping
-	 * the subscription
+	 * To prevent from launcher restarts the new worker, we need to
+	 * keep holding LogicalRepLauncherLock until commit. On the
+	 * other hand, keeping to hold LWLock until commit can be cause
+	 * of deadlock. So we disallow DROP SUBSCRIPTION being called
+	 * in a transaction block.
+	 *
+	 * XXX The command name should really be something like "DROP SUBSCRIPTION
+	 * of a subscription that is associated with a replication slot", but we
+	 * don't have the proper facilities for that.
+	 */
+	PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
+
+	/*
+	 * Protect against launcher restarting the worker. This lock will
+	 * be released at commit.
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
+
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
 
 	if (!HeapTupleIsValid(tup))
 	{
-		heap_close(rel, NoLock);
+		heap_close(rel, RowExclusiveLock);
 
 		if (!stmt->missing_ok)
 			ereport(ERROR,
@@ -844,6 +858,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
+	/* Kill the apply worker so that the slot becomes accessible first. */
+	logicalrep_worker_stop(subid, InvalidOid);
+
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
@@ -873,20 +890,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	else
 		slotname = NULL;
 
-	/*
-	 * Since dropping a replication slot is not transactional, the replication
-	 * slot stays dropped even if the transaction rolls back.  So we cannot
-	 * run DROP SUBSCRIPTION inside a transaction block if dropping the
-	 * replication slot.
-	 *
-	 * XXX The command name should really be something like "DROP SUBSCRIPTION
-	 * of a subscription that is associated with a replication slot", but we
-	 * don't have the proper facilities for that.
-	 */
-	if (slotname)
-		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
-
-
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 	EventTriggerSQLDropAddObject(&myself, true, true);
 
@@ -901,9 +904,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
@@ -913,7 +913,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* If there is no slot associated with the subscription, we can finish here. */
 	if (!slotname)
 	{
-		heap_close(rel, NoLock);
+		heap_close(rel, RowExclusiveLock);
 		return;
 	}
 
@@ -964,7 +964,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	pfree(cmd.data);
 
-	heap_close(rel, NoLock);
+	heap_close(rel, RowExclusiveLock);
 }
 
 /*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 09c87d7..dfce49d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -405,6 +405,9 @@ retry:
 /*
  * Stop the logical replication worker and wait until it detaches from the
  * slot.
+ *
+ * The caller must hold LogicalRepLauncherLock to ensure that new workers are
+ * not being started during this function call.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -832,6 +835,9 @@ ApplyLauncherMain(Datum main_arg)
 										   ALLOCSET_DEFAULT_MAXSIZE);
 			oldctx = MemoryContextSwitchTo(subctx);
 
+			/* Block any concurrent DROP SUBSCRIPTION */
+			LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
+
 			/* search for subscriptions to start or stop. */
 			sublist = get_subscription_list();
 
@@ -855,6 +861,8 @@ ApplyLauncherMain(Datum main_arg)
 				}
 			}
 
+			LWLockRelease(LogicalRepLauncherLock);
+
 			/* Switch back to original memory context. */
 			MemoryContextSwitchTo(oldctx);
 			/* Clean the temporary memory. */
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3e13394..949b333 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
 
 	if (LWLockTrancheArray == NULL)
 	{
-		LWLockTranchesAllocated = 64;
+		LWLockTranchesAllocated = 128;
 		LWLockTrancheArray = (char **)
 			MemoryContextAllocZero(TopMemoryContext,
 						  LWLockTranchesAllocated * sizeof(char *));
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ec..eda66fe 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock				42
 BackendRandomLock					43
 LogicalRepWorkerLock				44
 CLogTruncationLock					45
+LogicalRepLauncherLock					46
-- 
2.8.1

