From fcd5b0e39c11e270fd2e0f5b34dce33304908212 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 31 Jul 2025 06:05:13 -0400
Subject: [PATCH v8] Fix a possible deadlock during ALTER SUBSCRIPTION ... DROP
 PUBLICATION

In most situations the tablesync worker will drop the corresponding origin before it
finishes executing, but if an error causes the tablesync worker to fail just prior to
dropping the origin, or if it is delayed or it didn't get cpu time, the apply worker
could find the origin and attempt to drop the origin. During this time if the
user simultaneously issues an  ALTER SUBSCRIPTION ... DROP PUBLICATION, there is a
possibility of a deadlock between the apply worker and the user process, because of the
order in which the locks are taken.

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       | 21 +++++++++++++++---
 src/backend/replication/logical/tablesync.c | 34 +++++++++++++++++++++++++----
 src/include/catalog/pg_subscription_rel.h   |  2 +-
 3 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 63c2992..eb99b89 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -320,7 +320,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  */
 void
 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+						   XLogRecPtr sublsn, bool already_locked)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -328,9 +328,24 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+	if (already_locked)
+	{
+#ifdef USE_ASSERT_CHECKING
+		LOCKTAG	tag;
 
-	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+		Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
+										  RowExclusiveLock, true));
+		SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
+		Assert(LockHeldByMe(&tag, AccessShareLock, true));
+#endif
+
+		rel = table_open(SubscriptionRelRelationId, NoLock);
+	}
+	else
+	{
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+		rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	}
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3fea0a0..d3356bc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
-								   MyLogicalRepWorker->relstate_lsn);
+								   MyLogicalRepWorker->relstate_lsn,
+								   false);
 
 		/*
 		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 	bool		should_exit = false;
+	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * worker to remove the origin tracking as if there is any
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
+				 *
+				 * Lock the subscription and origin in the same order as we
+				 * are doing during DDL commands to avoid deadlocks. See
+				 * AlterSubscription_refresh.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
 				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 												   rstate->relid,
 												   originname,
@@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
-										   rstate->lsn);
+										   rstate->lsn, true);
 			}
 		}
 		else
@@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, NoLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -623,6 +642,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* Close table if opened */
+	if (rel)
+		table_close(rel, NoLock);
+
+
 	if (started_tx)
 	{
 		/*
@@ -1414,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1547,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   SUBREL_STATE_FINISHEDCOPY,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 
 	CommitTransactionCommand();
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index c91797c..f458447 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -85,7 +85,7 @@ typedef struct SubscriptionRelState
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn, bool retain_lock);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-									   XLogRecPtr sublsn);
+									   XLogRecPtr sublsn, bool already_locked);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

