From c7ccd6618afc6d020bc2d8b81000a435df3bfa8b Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 30 Jul 2025 05:29:24 -0400
Subject: [PATCH v7] Fix a deadlock during ALTER SUBSCRIPTION... DROP
 PUBLICATION

When user drops a publication from a subscription, this will result in a
publication refresh on the subscriber which will try and drop any pending origins.
Meanwhile the apply worker could also be trying to cleanup origins. There could be
a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId are not aligned between functions
process_syncing_tables_for_apply() and AlterSubscription_refresh().

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       | 37 +++++++++++++++++++++++++----
 src/backend/replication/logical/tablesync.c | 27 ++++++++++++++++++---
 src/include/catalog/pg_subscription_rel.h   |  2 ++
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc915..f161131 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -287,8 +287,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  * Update the state of a subscription table.
  */
 void
-UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+							 XLogRecPtr sublsn, bool already_locked)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -296,9 +296,26 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
-
-	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	if (already_locked)
+	{
+#ifdef USE_ASSERT_CHECKING
+		LOCKTAG		tag;
+#endif
+
+		Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
+										  RowExclusiveLock, true));
+
+		rel = table_open(SubscriptionRelRelationId, NoLock);
+#ifdef USE_ASSERT_CHECKING
+		SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
+		Assert(LockHeldByMe(&tag, AccessShareLock, true));
+#endif
+	}
+	else
+	{
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+		rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	}
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
@@ -333,6 +350,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 }
 
 /*
+ * Update the state of a subscription table.
+ */
+void
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false);
+}
+
+/*
  * Get state of subscription table.
  *
  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c8893ff..1da5a7e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -426,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 	bool		should_exit = false;
+	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -493,7 +494,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * worker to remove the origin tracking as if there is any
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
+				 *
+				 * Lock the subscription and origin in the same order as we
+				 * are doing during DDL commands to avoid deadlocks. See
+				 * AlterSubscription_refresh.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
 				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 												   rstate->relid,
 												   originname,
@@ -503,9 +513,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				/*
 				 * Update the state to READY only after the origin cleanup.
 				 */
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
-										   rstate->lsn);
+				UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid,
+											 rstate->relid, rstate->state,
+											 rstate->lsn, true);
 			}
 		}
 		else
@@ -556,7 +566,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, NoLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -623,6 +640,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* Close table if opened */
+	if (rel)
+		table_close(rel, NoLock);
+
 	if (started_tx)
 	{
 		/*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 8244ad5..0afda88 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -86,6 +86,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn, bool retain_lock);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 									   XLogRecPtr sublsn);
+extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+										 XLogRecPtr sublsn, bool already_locked);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

