On 18/09/2023 07:08, David Rowley wrote:
On Fri, 15 Sept 2023 at 22:37, Heikki Linnakangas <hlinn...@iki.fi> wrote:
I've added a call to LockAssertNoneHeld(false) in there.

I don't see it in the patch?

hmm. I must've git format-patch before committing that part.

I'll try that again... see attached.

This needed a rebase after my ResourceOwner refactoring. Attached.

A few quick comments:

- It would be nice to add a test for the issue that you fixed in patch v7, i.e. if you prepare a transaction while holding session-level locks.

- GrantLockLocal() now calls MemoryContextAlloc(), which can fail if you are out of memory. Is that handled gracefully or is the lock leaked?

--
Heikki Linnakangas
Neon (https://neon.tech)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 74ce5f9491c..8c8585be7ab 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2415,6 +2415,7 @@ PrepareTransaction(void)
 	TransactionId xid = GetCurrentTransactionId();
 	GlobalTransaction gxact;
 	TimestampTz prepared_at;
+	HTAB		   *sessionandxactlocks;
 
 	Assert(!IsInParallelMode());
 
@@ -2560,7 +2561,17 @@ PrepareTransaction(void)
 	StartPrepare(gxact);
 
 	AtPrepare_Notify();
-	AtPrepare_Locks();
+
+	/*
+	 * Prepare the locks and save the returned hash table that describes if
+	 * the lock is held at the session and/or transaction level.  We need to
+	 * know if we're dealing with session locks inside PostPrepare_Locks(),
+	 * but we're unable to build the hash table there due to that function
+	 * only discovering if we're dealing with a session lock while we're in a
+	 * critical section, in which case we can't allocate memory for the hash
+	 * table.
+	 */
+	sessionandxactlocks = AtPrepare_Locks();
 	AtPrepare_PredicateLocks();
 	AtPrepare_PgStat();
 	AtPrepare_MultiXact();
@@ -2587,7 +2598,10 @@ PrepareTransaction(void)
 	 * ProcArrayClearTransaction().  Otherwise, a GetLockConflicts() would
 	 * conclude "xact already committed or aborted" for our locks.
 	 */
-	PostPrepare_Locks(xid);
+	PostPrepare_Locks(xid, sessionandxactlocks);
+
+	/* We no longer need this hash table */
+	hash_destroy(sessionandxactlocks);
 
 	/*
 	 * Let others know about no transaction in progress by me.  This has to be
diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index 296dc82d2ee..5baf83ac6ce 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -71,7 +71,12 @@ DiscardAll(bool isTopLevel)
 	ResetAllOptions();
 	DropAllPreparedStatements();
 	Async_UnlistenAll();
-	LockReleaseAll(USER_LOCKMETHOD, true);
+	LockReleaseSession(USER_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+	LockAssertNoneHeld(false);
+#endif
+
 	ResetPlanCache();
 	ResetTempTableNamespace();
 	ResetSequenceCaches();
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 501910b4454..835f112f751 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -841,7 +841,11 @@ logicalrep_worker_onexit(int code, Datum arg)
 	 * The locks will be acquired once the worker is initialized.
 	 */
 	if (!InitializingApplyWorker)
-		LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+		LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+	LockAssertNoneHeld(false);
+#endif
 
 	ApplyLauncherWakeup();
 }
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 45de0fd2bd6..e7e0f29347a 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -182,12 +182,6 @@ holdMask -
     subset of the PGPROC object's heldLocks mask (if the PGPROC is
     currently waiting for another lock mode on this lock).
 
-releaseMask -
-    A bitmask for the lock modes due to be released during LockReleaseAll.
-    This must be a subset of the holdMask.  Note that it is modified without
-    taking the partition LWLock, and therefore it is unsafe for any
-    backend except the one owning the PROCLOCK to examine/change it.
-
 lockLink -
     List link for shared memory queue of all the PROCLOCK objects for the
     same LOCK.
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index b8c57b3e16e..ce54b589b0d 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -22,8 +22,7 @@
  *	Interface:
  *
  *	InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
- *	LockAcquire(), LockRelease(), LockReleaseAll(),
- *	LockCheckConflicts(), GrantLock()
+ *	LockAcquire(), LockRelease(), LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
@@ -162,6 +161,16 @@ typedef struct TwoPhaseLockRecord
 	LOCKMODE	lockmode;
 } TwoPhaseLockRecord;
 
+/*
+ * Used by for a hash table entry type in AtPrepare_Locks() to communicate the
+ * session/xact lock status of each held lock for use in PostPrepare_Locks().
+ */
+typedef struct PerLockTagEntry
+{
+	LOCKTAG lock;  /* identifies the lockable object */
+	bool sessLock; /* is any lockmode held at session level? */
+	bool xactLock; /* is any lockmode held at xact level? */
+} PerLockTagEntry;
 
 /*
  * Count of the number of fast path lock slots we believe to be used.  This
@@ -270,6 +279,8 @@ static HTAB *LockMethodLockHash;
 static HTAB *LockMethodProcLockHash;
 static HTAB *LockMethodLocalHash;
 
+/* A memory context for storing LOCALLOCKOWNER structs */
+MemoryContext LocalLockOwnerContext;
 
 /* private state for error cleanup */
 static LOCALLOCK *StrongLockInProgress;
@@ -277,6 +288,9 @@ static LOCALLOCK *awaitedLock;
 static ResourceOwner awaitedOwner;
 
 
+static dlist_head session_locks[lengthof(LockMethods)];
+
+
 #ifdef LOCK_DEBUG
 
 /*------
@@ -363,8 +377,8 @@ static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
-static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
-static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
+static void ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock);
+static void LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 						PROCLOCK *proclock, LockMethod lockMethodTable);
 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -465,6 +479,21 @@ InitLocks(void)
 									  16,
 									  &info,
 									  HASH_ELEM | HASH_BLOBS);
+
+	/* Initialize each element of the session_locks array */
+	for (int i = 0; i < lengthof(LockMethods); i++)
+		dlist_init(&session_locks[i]);
+
+	/*
+	 * Create a slab context for storing LOCALLOCKOWNERs.  Slab seems like a
+	 * good context type for this as it will manage fragmentation better than
+	 * aset.c contexts and it will free() excess memory rather than maintain
+	 * excessively long freelists after a large surge in locking requirements.
+	 */
+	LocalLockOwnerContext = SlabContextCreate(TopMemoryContext,
+											  "LOCALLOCKOWNER context",
+											  SLAB_DEFAULT_BLOCK_SIZE,
+											  sizeof(LOCALLOCKOWNER));
 }
 
 
@@ -827,26 +856,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		locallock->nLocks = 0;
 		locallock->holdsStrongLockCount = false;
 		locallock->lockCleared = false;
-		locallock->numLockOwners = 0;
-		locallock->maxLockOwners = 8;
-		locallock->lockOwners = NULL;	/* in case next line fails */
-		locallock->lockOwners = (LOCALLOCKOWNER *)
-			MemoryContextAlloc(TopMemoryContext,
-							   locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+		dlist_init(&locallock->locallockowners);
 	}
-	else
-	{
-		/* Make sure there will be room to remember the lock */
-		if (locallock->numLockOwners >= locallock->maxLockOwners)
-		{
-			int			newsize = locallock->maxLockOwners * 2;
 
-			locallock->lockOwners = (LOCALLOCKOWNER *)
-				repalloc(locallock->lockOwners,
-						 newsize * sizeof(LOCALLOCKOWNER));
-			locallock->maxLockOwners = newsize;
-		}
-	}
 	hashcode = locallock->hashcode;
 
 	if (locallockp)
@@ -1249,7 +1261,6 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 		proclock->groupLeader = proc->lockGroupLeader != NULL ?
 			proc->lockGroupLeader : proc;
 		proclock->holdMask = 0;
-		proclock->releaseMask = 0;
 		/* Add proclock to appropriate lists */
 		dlist_push_tail(&lock->procLocks, &proclock->lockLink);
 		dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink);
@@ -1343,17 +1354,19 @@ CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired)
 static void
 RemoveLocalLock(LOCALLOCK *locallock)
 {
-	int			i;
+	dlist_mutable_iter iter;
 
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	dlist_foreach_modify(iter, &locallock->locallockowners)
 	{
-		if (locallock->lockOwners[i].owner != NULL)
-			ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
+		LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+		Assert(locallockowner->owner != NULL);
+		dlist_delete(&locallockowner->locallock_node);
+		ResourceOwnerForgetLock(locallockowner);
+		pfree(locallockowner);
 	}
-	locallock->numLockOwners = 0;
-	if (locallock->lockOwners != NULL)
-		pfree(locallock->lockOwners);
-	locallock->lockOwners = NULL;
+
+	Assert(dlist_is_empty(&locallock->locallockowners));
 
 	if (locallock->holdsStrongLockCount)
 	{
@@ -1659,26 +1672,38 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 static void
 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
 {
-	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
-	int			i;
+	LOCALLOCKOWNER *locallockowner;
+	dlist_iter	iter;
 
-	Assert(locallock->numLockOwners < locallock->maxLockOwners);
 	/* Count the total */
 	locallock->nLocks++;
 	/* Count the per-owner lock */
-	for (i = 0; i < locallock->numLockOwners; i++)
+	dlist_foreach(iter, &locallock->locallockowners)
 	{
-		if (lockOwners[i].owner == owner)
+		locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+		if (locallockowner->owner == owner)
 		{
-			lockOwners[i].nLocks++;
+			locallockowner->nLocks++;
 			return;
 		}
 	}
-	lockOwners[i].owner = owner;
-	lockOwners[i].nLocks = 1;
-	locallock->numLockOwners++;
+	locallockowner = MemoryContextAlloc(LocalLockOwnerContext, sizeof(LOCALLOCKOWNER));
+	locallockowner->owner = owner;
+	locallockowner->nLocks = 1;
+	locallockowner->locallock = locallock;
+
+	dlist_push_tail(&locallock->locallockowners, &locallockowner->locallock_node);
+
 	if (owner != NULL)
-		ResourceOwnerRememberLock(owner, locallock);
+		ResourceOwnerRememberLock(owner, locallockowner);
+	else
+	{
+		LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallockowner->locallock);
+
+		Assert(lockmethodid > 0 && lockmethodid <= 2);
+		dlist_push_tail(&session_locks[lockmethodid - 1], &locallockowner->resowner_node);
+	}
 
 	/* Indicate that the lock is acquired for certain types of locks. */
 	CheckAndSetLockHeld(locallock, true);
@@ -1971,9 +1996,9 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	 * Decrease the count for the resource owner.
 	 */
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		ResourceOwner owner;
-		int			i;
+		dlist_mutable_iter	iter;
+		bool		found = false;
 
 		/* Identify owner for lock */
 		if (sessionLock)
@@ -1981,24 +2006,33 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 		else
 			owner = CurrentResourceOwner;
 
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach_modify(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == owner)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+			if (locallockowner->owner != owner)
+				continue;
+
+			found = true;
+
+			if (--locallockowner->nLocks == 0)
 			{
-				Assert(lockOwners[i].nLocks > 0);
-				if (--lockOwners[i].nLocks == 0)
-				{
-					if (owner != NULL)
-						ResourceOwnerForgetLock(owner, locallock);
-					/* compact out unused slot */
-					locallock->numLockOwners--;
-					if (i < locallock->numLockOwners)
-						lockOwners[i] = lockOwners[locallock->numLockOwners];
-				}
-				break;
+				dlist_delete(&locallockowner->locallock_node);
+
+				if (owner != NULL)
+					ResourceOwnerForgetLock(locallockowner);
+				else
+					dlist_delete(&locallockowner->resowner_node);
+				pfree(locallockowner);
+			}
+			else
+			{
+				/* ensure nLocks didn't go negative */
+				Assert(locallockowner->nLocks >= 0);
 			}
 		}
-		if (i < 0)
+
+		if (!found)
 		{
 			/* don't release a lock belonging to another owner */
 			elog(WARNING, "you don't own a lock of type %s",
@@ -2016,6 +2050,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	if (locallock->nLocks > 0)
 		return true;
 
+	Assert(locallock->nLocks == 0);
+
 	/*
 	 * At this point we can no longer suppose we are clear of invalidation
 	 * messages related to this lock.  Although we'll delete the LOCALLOCK
@@ -2118,274 +2154,44 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	return true;
 }
 
+#ifdef USE_ASSERT_CHECKING
 /*
- * LockReleaseAll -- Release all locks of the specified lock method that
- *		are held by the current process.
- *
- * Well, not necessarily *all* locks.  The available behaviors are:
- *		allLocks == true: release all locks including session locks.
- *		allLocks == false: release all non-session locks.
+ * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
+ * locks during an abort.
  */
 void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
+LockAssertNoneHeld(bool isCommit)
 {
 	HASH_SEQ_STATUS status;
-	LockMethod	lockMethodTable;
-	int			i,
-				numLockModes;
 	LOCALLOCK  *locallock;
-	LOCK	   *lock;
-	int			partition;
-	bool		have_fast_path_lwlock = false;
-
-	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
-		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
-	lockMethodTable = LockMethods[lockmethodid];
-
-#ifdef LOCK_DEBUG
-	if (*(lockMethodTable->trace_flag))
-		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
-#endif
-
-	/*
-	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
-	 * the only way that the lock we hold on our own VXID can ever get
-	 * released: it is always and only released when a toplevel transaction
-	 * ends.
-	 */
-	if (lockmethodid == DEFAULT_LOCKMETHOD)
-		VirtualXactLockTableCleanup();
-
-	numLockModes = lockMethodTable->numLockModes;
 
-	/*
-	 * First we run through the locallock table and get rid of unwanted
-	 * entries, then we scan the process's proclocks and get rid of those. We
-	 * do this separately because we may have multiple locallock entries
-	 * pointing to the same proclock, and we daren't end up with any dangling
-	 * pointers.  Fast-path locks are cleaned up during the locallock table
-	 * scan, though.
-	 */
-	hash_seq_init(&status, LockMethodLocalHash);
-
-	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	if (!isCommit)
 	{
-		/*
-		 * If the LOCALLOCK entry is unused, we must've run out of shared
-		 * memory while trying to set up this lock.  Just forget the local
-		 * entry.
-		 */
-		if (locallock->nLocks == 0)
-		{
-			RemoveLocalLock(locallock);
-			continue;
-		}
-
-		/* Ignore items that are not of the lockmethod to be removed */
-		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-			continue;
+		hash_seq_init(&status, LockMethodLocalHash);
 
-		/*
-		 * If we are asked to release all locks, we can just zap the entry.
-		 * Otherwise, must scan to see if there are session locks. We assume
-		 * there is at most one lockOwners entry for session locks.
-		 */
-		if (!allLocks)
+		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 		{
-			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+			dlist_iter	local_iter;
 
-			/* If session lock is above array position 0, move it down to 0 */
-			for (i = 0; i < locallock->numLockOwners; i++)
-			{
-				if (lockOwners[i].owner == NULL)
-					lockOwners[0] = lockOwners[i];
-				else
-					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
-			}
+			Assert(locallock->nLocks >= 0);
 
-			if (locallock->numLockOwners > 0 &&
-				lockOwners[0].owner == NULL &&
-				lockOwners[0].nLocks > 0)
+			dlist_foreach(local_iter, &locallock->locallockowners)
 			{
-				/* Fix the locallock to show just the session locks */
-				locallock->nLocks = lockOwners[0].nLocks;
-				locallock->numLockOwners = 1;
-				/* We aren't deleting this locallock, so done */
-				continue;
-			}
-			else
-				locallock->numLockOwners = 0;
-		}
-
-		/*
-		 * If the lock or proclock pointers are NULL, this lock was taken via
-		 * the relation fast-path (and is not known to have been transferred).
-		 */
-		if (locallock->proclock == NULL || locallock->lock == NULL)
-		{
-			LOCKMODE	lockmode = locallock->tag.mode;
-			Oid			relid;
+				LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+																 locallock_node,
+																 local_iter.cur);
 
-			/* Verify that a fast-path lock is what we've got. */
-			if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
-				elog(PANIC, "locallock table corrupted");
+				Assert(locallockowner->owner == NULL);
 
-			/*
-			 * If we don't currently hold the LWLock that protects our
-			 * fast-path data structures, we must acquire it before attempting
-			 * to release the lock via the fast-path.  We will continue to
-			 * hold the LWLock until we're done scanning the locallock table,
-			 * unless we hit a transferred fast-path lock.  (XXX is this
-			 * really such a good idea?  There could be a lot of entries ...)
-			 */
-			if (!have_fast_path_lwlock)
-			{
-				LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE);
-				have_fast_path_lwlock = true;
-			}
-
-			/* Attempt fast-path release. */
-			relid = locallock->tag.lock.locktag_field2;
-			if (FastPathUnGrantRelationLock(relid, lockmode))
-			{
-				RemoveLocalLock(locallock);
-				continue;
+				if (locallockowner->nLocks > 0 &&
+					LOCALLOCK_LOCKMETHOD(*locallock) == DEFAULT_LOCKMETHOD)
+					Assert(false);
 			}
-
-			/*
-			 * Our lock, originally taken via the fast path, has been
-			 * transferred to the main lock table.  That's going to require
-			 * some extra work, so release our fast-path lock before starting.
-			 */
-			LWLockRelease(&MyProc->fpInfoLock);
-			have_fast_path_lwlock = false;
-
-			/*
-			 * Now dump the lock.  We haven't got a pointer to the LOCK or
-			 * PROCLOCK in this case, so we have to handle this a bit
-			 * differently than a normal lock release.  Unfortunately, this
-			 * requires an extra LWLock acquire-and-release cycle on the
-			 * partitionLock, but hopefully it shouldn't happen often.
-			 */
-			LockRefindAndRelease(lockMethodTable, MyProc,
-								 &locallock->tag.lock, lockmode, false);
-			RemoveLocalLock(locallock);
-			continue;
 		}
-
-		/* Mark the proclock to show we need to release this lockmode */
-		if (locallock->nLocks > 0)
-			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
-
-		/* And remove the locallock hashtable entry */
-		RemoveLocalLock(locallock);
 	}
-
-	/* Done with the fast-path data structures */
-	if (have_fast_path_lwlock)
-		LWLockRelease(&MyProc->fpInfoLock);
-
-	/*
-	 * Now, scan each lock partition separately.
-	 */
-	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
-	{
-		LWLock	   *partitionLock;
-		dlist_head *procLocks = &MyProc->myProcLocks[partition];
-		dlist_mutable_iter proclock_iter;
-
-		partitionLock = LockHashPartitionLockByIndex(partition);
-
-		/*
-		 * If the proclock list for this partition is empty, we can skip
-		 * acquiring the partition lock.  This optimization is trickier than
-		 * it looks, because another backend could be in process of adding
-		 * something to our proclock list due to promoting one of our
-		 * fast-path locks.  However, any such lock must be one that we
-		 * decided not to delete above, so it's okay to skip it again now;
-		 * we'd just decide not to delete it again.  We must, however, be
-		 * careful to re-fetch the list header once we've acquired the
-		 * partition lock, to be sure we have a valid, up-to-date pointer.
-		 * (There is probably no significant risk if pointer fetch/store is
-		 * atomic, but we don't wish to assume that.)
-		 *
-		 * XXX This argument assumes that the locallock table correctly
-		 * represents all of our fast-path locks.  While allLocks mode
-		 * guarantees to clean up all of our normal locks regardless of the
-		 * locallock situation, we lose that guarantee for fast-path locks.
-		 * This is not ideal.
-		 */
-		if (dlist_is_empty(procLocks))
-			continue;			/* needn't examine this partition */
-
-		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-
-		dlist_foreach_modify(proclock_iter, procLocks)
-		{
-			PROCLOCK   *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
-			bool		wakeupNeeded = false;
-
-			Assert(proclock->tag.myProc == MyProc);
-
-			lock = proclock->tag.myLock;
-
-			/* Ignore items that are not of the lockmethod to be removed */
-			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
-				continue;
-
-			/*
-			 * In allLocks mode, force release of all locks even if locallock
-			 * table had problems
-			 */
-			if (allLocks)
-				proclock->releaseMask = proclock->holdMask;
-			else
-				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
-
-			/*
-			 * Ignore items that have nothing to be released, unless they have
-			 * holdMask == 0 and are therefore recyclable
-			 */
-			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
-				continue;
-
-			PROCLOCK_PRINT("LockReleaseAll", proclock);
-			LOCK_PRINT("LockReleaseAll", lock, 0);
-			Assert(lock->nRequested >= 0);
-			Assert(lock->nGranted >= 0);
-			Assert(lock->nGranted <= lock->nRequested);
-			Assert((proclock->holdMask & ~lock->grantMask) == 0);
-
-			/*
-			 * Release the previously-marked lock modes
-			 */
-			for (i = 1; i <= numLockModes; i++)
-			{
-				if (proclock->releaseMask & LOCKBIT_ON(i))
-					wakeupNeeded |= UnGrantLock(lock, i, proclock,
-												lockMethodTable);
-			}
-			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
-			Assert(lock->nGranted <= lock->nRequested);
-			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
-
-			proclock->releaseMask = 0;
-
-			/* CleanUpLock will wake up waiters if needed. */
-			CleanUpLock(lock, proclock,
-						lockMethodTable,
-						LockTagHashCode(&lock->tag),
-						wakeupNeeded);
-		}						/* loop over PROCLOCKs within this partition */
-
-		LWLockRelease(partitionLock);
-	}							/* loop over partitions */
-
-#ifdef LOCK_DEBUG
-	if (*(lockMethodTable->trace_flag))
-		elog(LOG, "LockReleaseAll done");
-#endif
+	Assert(MyProc->fpLockBits == 0);
 }
+#endif
 
 /*
  * LockReleaseSession -- Release all session locks of the specified lock method
@@ -2394,59 +2200,39 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 void
 LockReleaseSession(LOCKMETHODID lockmethodid)
 {
-	HASH_SEQ_STATUS status;
-	LOCALLOCK  *locallock;
+	dlist_mutable_iter iter;
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
-	hash_seq_init(&status, LockMethodLocalHash);
-
-	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	dlist_foreach_modify(iter, &session_locks[lockmethodid - 1])
 	{
-		/* Ignore items that are not of the specified lock method */
-		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-			continue;
+		LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
 
-		ReleaseLockIfHeld(locallock, true);
+		Assert(LOCALLOCK_LOCKMETHOD(*locallockowner->locallock) == lockmethodid);
+
+		ReleaseLockIfHeld(locallockowner, true);
 	}
+
+	Assert(dlist_is_empty(&session_locks[lockmethodid - 1]));
 }
 
 /*
  * LockReleaseCurrentOwner
- *		Release all locks belonging to CurrentResourceOwner
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held.
- * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
- * table to find them.
+ *		Release all locks belonging to 'owner'
  */
 void
-LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReleaseCurrentOwner(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-	if (locallocks == NULL)
-	{
-		HASH_SEQ_STATUS status;
-		LOCALLOCK  *locallock;
-
-		hash_seq_init(&status, LockMethodLocalHash);
-
-		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
-			ReleaseLockIfHeld(locallock, false);
-	}
-	else
-	{
-		int			i;
+	Assert(locallockowner->owner == owner);
 
-		for (i = nlocks - 1; i >= 0; i--)
-			ReleaseLockIfHeld(locallocks[i], false);
-	}
+	ReleaseLockIfHeld(locallockowner, false);
 }
 
 /*
  * ReleaseLockIfHeld
- *		Release any session-level locks on this lockable object if sessionLock
- *		is true; else, release any locks held by CurrentResourceOwner.
+ *		Release any session-level locks on this 'locallockowner' if
+ *		sessionLock is true; else, release any locks held by 'locallockowner'.
  *
  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
  * locks), but without refactoring LockRelease() we cannot support releasing
@@ -2457,52 +2243,39 @@ LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
  * convenience.
  */
 static void
-ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
+ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock)
 {
-	ResourceOwner owner;
-	LOCALLOCKOWNER *lockOwners;
-	int			i;
+	LOCALLOCK  *locallock = locallockowner->locallock;
+
+	/* release all references to the lock by this resource owner */
 
-	/* Identify owner for lock (must match LockRelease!) */
 	if (sessionLock)
-		owner = NULL;
+		Assert(locallockowner->owner == NULL);
 	else
-		owner = CurrentResourceOwner;
+		Assert(locallockowner->owner != NULL);
 
-	/* Scan to see if there are any locks belonging to the target owner */
-	lockOwners = locallock->lockOwners;
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	/* We will still hold this lock after forgetting this ResourceOwner. */
+	if (locallockowner->nLocks < locallock->nLocks)
 	{
-		if (lockOwners[i].owner == owner)
-		{
-			Assert(lockOwners[i].nLocks > 0);
-			if (lockOwners[i].nLocks < locallock->nLocks)
-			{
-				/*
-				 * We will still hold this lock after forgetting this
-				 * ResourceOwner.
-				 */
-				locallock->nLocks -= lockOwners[i].nLocks;
-				/* compact out unused slot */
-				locallock->numLockOwners--;
-				if (owner != NULL)
-					ResourceOwnerForgetLock(owner, locallock);
-				if (i < locallock->numLockOwners)
-					lockOwners[i] = lockOwners[locallock->numLockOwners];
-			}
-			else
-			{
-				Assert(lockOwners[i].nLocks == locallock->nLocks);
-				/* We want to call LockRelease just once */
-				lockOwners[i].nLocks = 1;
-				locallock->nLocks = 1;
-				if (!LockRelease(&locallock->tag.lock,
-								 locallock->tag.mode,
-								 sessionLock))
-					elog(WARNING, "ReleaseLockIfHeld: failed??");
-			}
-			break;
-		}
+		locallock->nLocks -= locallockowner->nLocks;
+		dlist_delete(&locallockowner->locallock_node);
+
+		if (sessionLock)
+			dlist_delete(&locallockowner->resowner_node);
+		else
+			ResourceOwnerForgetLock(locallockowner);
+	}
+	else
+	{
+		Assert(locallockowner->nLocks == locallock->nLocks);
+		/* We want to call LockRelease just once */
+		locallockowner->nLocks = 1;
+		locallock->nLocks = 1;
+
+		if (!LockRelease(&locallock->tag.lock,
+						 locallock->tag.mode,
+						 sessionLock))
+			elog(WARNING, "ReleaseLockIfHeld: failed??");
 	}
 }
 
@@ -2510,82 +2283,47 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
  * LockReassignCurrentOwner
  *		Reassign all locks belonging to CurrentResourceOwner to belong
  *		to its parent resource owner.
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held
- * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
- * and we'll traverse through our hash table to find them.
  */
 void
-LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner)
 {
 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
-	Assert(parent != NULL);
-
-	if (locallocks == NULL)
-	{
-		HASH_SEQ_STATUS status;
-		LOCALLOCK  *locallock;
-
-		hash_seq_init(&status, LockMethodLocalHash);
-
-		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
-			LockReassignOwner(locallock, parent);
-	}
-	else
-	{
-		int			i;
-
-		for (i = nlocks - 1; i >= 0; i--)
-			LockReassignOwner(locallocks[i], parent);
-	}
+	LockReassignOwner(locallockowner, parent);
 }
 
 /*
- * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
- * CurrentResourceOwner to its parent.
+ * Subroutine of LockReassignCurrentOwner. Reassigns the given
+ *'locallockowner' to 'parent'.
  */
 static void
-LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
+LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent)
 {
-	LOCALLOCKOWNER *lockOwners;
-	int			i;
-	int			ic = -1;
-	int			ip = -1;
+	dlist_iter	iter;
+	LOCALLOCK  *locallock = locallockowner->locallock;
 
-	/*
-	 * Scan to see if there are any locks belonging to current owner or its
-	 * parent
-	 */
-	lockOwners = locallock->lockOwners;
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	ResourceOwnerForgetLock(locallockowner);
+
+	dlist_foreach(iter, &locallock->locallockowners)
 	{
-		if (lockOwners[i].owner == CurrentResourceOwner)
-			ic = i;
-		else if (lockOwners[i].owner == parent)
-			ip = i;
-	}
+		LOCALLOCKOWNER *parentlocalowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
 
-	if (ic < 0)
-		return;					/* no current locks */
+		Assert(parentlocalowner->locallock == locallock);
 
-	if (ip < 0)
-	{
-		/* Parent has no slot, so just give it the child's slot */
-		lockOwners[ic].owner = parent;
-		ResourceOwnerRememberLock(parent, locallock);
-	}
-	else
-	{
-		/* Merge child's count with parent's */
-		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
-		/* compact out unused slot */
-		locallock->numLockOwners--;
-		if (ic < locallock->numLockOwners)
-			lockOwners[ic] = lockOwners[locallock->numLockOwners];
+		if (parentlocalowner->owner != parent)
+			continue;
+
+		parentlocalowner->nLocks += locallockowner->nLocks;
+
+		locallockowner->nLocks = 0;
+		dlist_delete(&locallockowner->locallock_node);
+		pfree(locallockowner);
+		return;
 	}
-	ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
+
+	/* reassign locallockowner to parent resowner */
+	locallockowner->owner = parent;
+	ResourceOwnerRememberLock(parent, locallockowner);
 }
 
 /*
@@ -3057,7 +2795,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
  * We currently use this in two situations: first, to release locks held by
  * prepared transactions on commit (see lock_twophase_postcommit); and second,
  * to release locks taken via the fast-path, transferred to the main hash
- * table, and then released (see LockReleaseAll).
+ * table, and then released (see ResourceOwnerRelease).
  */
 static void
 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
@@ -3163,16 +2901,9 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
  * we can't implement this check by examining LOCALLOCK entries in isolation.
  * We must build a transient hashtable that is indexed by locktag only.
  */
-static void
+static HTAB *
 CheckForSessionAndXactLocks(void)
 {
-	typedef struct
-	{
-		LOCKTAG		lock;		/* identifies the lockable object */
-		bool		sessLock;	/* is any lockmode held at session level? */
-		bool		xactLock;	/* is any lockmode held at xact level? */
-	} PerLockTagEntry;
-
 	HASHCTL		hash_ctl;
 	HTAB	   *lockhtab;
 	HASH_SEQ_STATUS status;
@@ -3193,10 +2924,9 @@ CheckForSessionAndXactLocks(void)
 
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		PerLockTagEntry *hentry;
 		bool		found;
-		int			i;
+		dlist_iter	iter;
 
 		/*
 		 * Ignore VXID locks.  We don't want those to be held by prepared
@@ -3217,9 +2947,13 @@ CheckForSessionAndXactLocks(void)
 			hentry->sessLock = hentry->xactLock = false;
 
 		/* Scan to see if we hold lock at session or xact level or both */
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				hentry->sessLock = true;
 			else
 				hentry->xactLock = true;
@@ -3235,8 +2969,7 @@ CheckForSessionAndXactLocks(void)
 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
 	}
 
-	/* Success, so clean up */
-	hash_destroy(lockhtab);
+	return lockhtab;
 }
 
 /*
@@ -3244,6 +2977,11 @@ CheckForSessionAndXactLocks(void)
  *		Do the preparatory work for a PREPARE: make 2PC state file records
  *		for all locks currently held.
  *
+ * Returns a hash table of PerLockTagEntry structs with an entry for each
+ * lock held by this backend marking if the lock is held at the session or
+ * xact level, or both.  It is up to the calling function to call
+ * hash_destroy() on this table to free the memory used by it.
+ *
  * Session-level locks are ignored, as are VXID locks.
  *
  * For the most part, we don't need to touch shared memory for this ---
@@ -3251,14 +2989,15 @@ CheckForSessionAndXactLocks(void)
  * Fast-path locks are an exception, however: we move any such locks to
  * the main table before allowing PREPARE TRANSACTION to succeed.
  */
-void
+HTAB *
 AtPrepare_Locks(void)
 {
 	HASH_SEQ_STATUS status;
 	LOCALLOCK  *locallock;
+	HTAB	   *sessionandxactlocks;
 
 	/* First, verify there aren't locks of both xact and session level */
-	CheckForSessionAndXactLocks();
+	sessionandxactlocks = CheckForSessionAndXactLocks();
 
 	/* Now do the per-locallock cleanup work */
 	hash_seq_init(&status, LockMethodLocalHash);
@@ -3266,10 +3005,9 @@ AtPrepare_Locks(void)
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
 		TwoPhaseLockRecord record;
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		bool		haveSessionLock;
 		bool		haveXactLock;
-		int			i;
+		dlist_iter	iter;
 
 		/*
 		 * Ignore VXID locks.  We don't want those to be held by prepared
@@ -3284,9 +3022,13 @@ AtPrepare_Locks(void)
 
 		/* Scan to see whether we hold it at session or transaction level */
 		haveSessionLock = haveXactLock = false;
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				haveSessionLock = true;
 			else
 				haveXactLock = true;
@@ -3330,6 +3072,8 @@ AtPrepare_Locks(void)
 		RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
 							   &record, sizeof(TwoPhaseLockRecord));
 	}
+
+	return sessionandxactlocks;
 }
 
 /*
@@ -3344,11 +3088,11 @@ AtPrepare_Locks(void)
  * pointers in the transaction's resource owner.  This is OK at the
  * moment since resowner.c doesn't try to free locks retail at a toplevel
  * transaction commit or abort.  We could alternatively zero out nLocks
- * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
- * but that probably costs more cycles.
+ * and leave the LOCALLOCK entries to be garbage-collected by
+ * ResourceOwnerRelease, but that probably costs more cycles.
  */
 void
-PostPrepare_Locks(TransactionId xid)
+PostPrepare_Locks(TransactionId xid, HTAB *sessionandxactlocks)
 {
 	PGPROC	   *newproc = TwoPhaseGetDummyProc(xid, false);
 	HASH_SEQ_STATUS status;
@@ -3378,10 +3122,9 @@ PostPrepare_Locks(TransactionId xid)
 
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		bool		haveSessionLock;
 		bool		haveXactLock;
-		int			i;
+		dlist_iter	iter;
 
 		if (locallock->proclock == NULL || locallock->lock == NULL)
 		{
@@ -3400,9 +3143,13 @@ PostPrepare_Locks(TransactionId xid)
 
 		/* Scan to see whether we hold it at session or transaction level */
 		haveSessionLock = haveXactLock = false;
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				haveSessionLock = true;
 			else
 				haveXactLock = true;
@@ -3418,10 +3165,6 @@ PostPrepare_Locks(TransactionId xid)
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					 errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
 
-		/* Mark the proclock to show we need to release this lockmode */
-		if (locallock->nLocks > 0)
-			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
-
 		/* And remove the locallock hashtable entry */
 		RemoveLocalLock(locallock);
 	}
@@ -3439,11 +3182,7 @@ PostPrepare_Locks(TransactionId xid)
 
 		/*
 		 * If the proclock list for this partition is empty, we can skip
-		 * acquiring the partition lock.  This optimization is safer than the
-		 * situation in LockReleaseAll, because we got rid of any fast-path
-		 * locks during AtPrepare_Locks, so there cannot be any case where
-		 * another backend is adding something to our lists now.  For safety,
-		 * though, we code this the same way as in LockReleaseAll.
+		 * acquiring the partition lock.
 		 */
 		if (dlist_is_empty(procLocks))
 			continue;			/* needn't examine this partition */
@@ -3452,6 +3191,8 @@ PostPrepare_Locks(TransactionId xid)
 
 		dlist_foreach_modify(proclock_iter, procLocks)
 		{
+			PerLockTagEntry *locktagentry;
+
 			proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
 
 			Assert(proclock->tag.myProc == MyProc);
@@ -3469,13 +3210,14 @@ PostPrepare_Locks(TransactionId xid)
 			Assert(lock->nGranted <= lock->nRequested);
 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
 
-			/* Ignore it if nothing to release (must be a session lock) */
-			if (proclock->releaseMask == 0)
-				continue;
+			locktagentry = hash_search(sessionandxactlocks,
+									   &lock->tag,
+									   HASH_FIND,
+									   NULL);
 
-			/* Else we should be releasing all locks */
-			if (proclock->releaseMask != proclock->holdMask)
-				elog(PANIC, "we seem to have dropped a bit somewhere");
+			/* skip session locks */
+			if (locktagentry != NULL && locktagentry->sessLock)
+				continue;
 
 			/*
 			 * We cannot simply modify proclock->tag.myProc to reassign
@@ -4245,7 +3987,6 @@ lock_twophase_recover(TransactionId xid, uint16 info,
 		Assert(proc->lockGroupLeader == NULL);
 		proclock->groupLeader = proc;
 		proclock->holdMask = 0;
-		proclock->releaseMask = 0;
 		/* Add proclock to appropriate lists */
 		dlist_push_tail(&lock->procLocks, &proclock->lockLink);
 		dlist_push_tail(&proc->myProcLocks[partition],
@@ -4382,7 +4123,7 @@ lock_twophase_postabort(TransactionId xid, uint16 info,
  *
  *		We don't bother recording this lock in the local lock table, since it's
  *		only ever released at the end of a transaction.  Instead,
- *		LockReleaseAll() calls VirtualXactLockTableCleanup().
+ *		ProcReleaseLocks() calls VirtualXactLockTableCleanup().
  */
 void
 VirtualXactLockTableInsert(VirtualTransactionId vxid)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index e9e445bb216..9a7ea73cdcd 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -777,10 +777,17 @@ ProcReleaseLocks(bool isCommit)
 		return;
 	/* If waiting, get off wait queue (should only be needed after error) */
 	LockErrorCleanup();
-	/* Release standard locks, including session-level if aborting */
-	LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
-	/* Release transaction-level advisory locks */
-	LockReleaseAll(USER_LOCKMETHOD, false);
+
+	VirtualXactLockTableCleanup();
+
+	/* Release session-level locks if aborting */
+	if (!isCommit)
+		LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+	/* Ensure all locks were released */
+	LockAssertNoneHeld(isCommit);
+#endif
 }
 
 
@@ -865,6 +872,8 @@ ProcKill(int code, Datum arg)
 		LWLockRelease(leader_lwlock);
 	}
 
+	Assert(MyProc->fpLockBits == 0);
+
 	/*
 	 * Reset MyLatch to the process local one.  This is so that signal
 	 * handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 552cf9d950a..d64998e4ee8 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -1353,10 +1353,10 @@ ShutdownPostgres(int code, Datum arg)
 	AbortOutOfAnyTransaction();
 
 	/*
-	 * User locks are not released by transaction end, so be sure to release
-	 * them explicitly.
+	 * Session locks are not released by transaction end, so be sure to
+	 * release them explicitly.
 	 */
-	LockReleaseAll(USER_LOCKMETHOD, true);
+	LockReleaseSession(USER_LOCKMETHOD);
 }
 
 
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index f096f3df20a..96417e3cb94 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -91,18 +91,6 @@ typedef struct ResourceElem
 StaticAssertDecl(RESOWNER_HASH_MAX_ITEMS(RESOWNER_HASH_INIT_SIZE) >= RESOWNER_ARRAY_SIZE,
 				 "initial hash size too small compared to array size");
 
-/*
- * MAX_RESOWNER_LOCKS is the size of the per-resource owner locks cache. It's
- * chosen based on some testing with pg_dump with a large schema. When the
- * tests were done (on 9.2), resource owners in a pg_dump run contained up
- * to 9 locks, regardless of the schema size, except for the top resource
- * owner which contained much more (overflowing the cache). 15 seems like a
- * nice round number that's somewhat higher than what pg_dump needs. Note that
- * making this number larger is not free - the bigger the cache, the slower
- * it is to release locks (in retail), when a resource owner holds many locks.
- */
-#define MAX_RESOWNER_LOCKS 15
-
 /*
  * ResourceOwner objects look like this
  */
@@ -152,10 +140,10 @@ typedef struct ResourceOwnerData
 	uint32		capacity;		/* allocated length of hash[] */
 	uint32		grow_at;		/* grow hash when reach this */
 
-	/* The local locks cache. */
-	LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];	/* list of owned locks */
+	dlist_head	locks;			/* dlist of owned locks */
 } ResourceOwnerData;
 
+#include "lib/ilist.h"
 
 /*****************************************************************************
  *	  GLOBAL MEMORY															 *
@@ -423,6 +411,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	owner = (ResourceOwner) MemoryContextAllocZero(TopMemoryContext,
 												   sizeof(ResourceOwnerData));
 	owner->name = name;
+	dlist_init(&owner->locks);
 
 	if (parent)
 	{
@@ -729,8 +718,19 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
+		dlist_mutable_iter iter;
+
 		if (isTopLevel)
 		{
+			dlist_foreach_modify(iter, &owner->locks)
+			{
+				LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+				LockReleaseCurrentOwner(owner, locallockowner);
+			}
+
+			Assert(dlist_is_empty(&owner->locks));
+
 			/*
 			 * For a top-level xact we are going to release all locks (or at
 			 * least all non-session locks), so just do a single lmgr call at
@@ -749,30 +749,30 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 			 * subtransaction, we do NOT release its locks yet, but transfer
 			 * them to the parent.
 			 */
-			LOCALLOCK **locks;
-			int			nlocks;
+			if (isCommit)
+			{
+				dlist_foreach_modify(iter, &owner->locks)
+				{
+					LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+																	 resowner_node,
+																	 iter.cur);
 
-			Assert(owner->parent != NULL);
+					LockReassignCurrentOwner(locallockowner);
+				}
 
-			/*
-			 * Pass the list of locks owned by this resource owner to the lock
-			 * manager, unless it has overflowed.
-			 */
-			if (owner->nlocks > MAX_RESOWNER_LOCKS)
-			{
-				locks = NULL;
-				nlocks = 0;
+				Assert(dlist_is_empty(&owner->locks));
 			}
 			else
 			{
-				locks = owner->locks;
-				nlocks = owner->nlocks;
-			}
+				dlist_foreach_modify(iter, &owner->locks)
+				{
+					LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
 
-			if (isCommit)
-				LockReassignCurrentOwner(locks, nlocks);
-			else
-				LockReleaseCurrentOwner(locks, nlocks);
+					LockReleaseCurrentOwner(owner, locallockowner);
+				}
+
+				Assert(dlist_is_empty(&owner->locks));
+			}
 		}
 	}
 	else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -860,7 +860,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	/* And it better not own any resources, either */
 	Assert(owner->narr == 0);
 	Assert(owner->nhash == 0);
-	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
+	Assert(dlist_is_empty(&owner->locks));
 
 	/*
 	 * Delete children.  The recursive call will delink the child from me, so
@@ -1034,52 +1034,59 @@ ReleaseAuxProcessResourcesCallback(int code, Datum arg)
 
 /*
  * Remember that a Local Lock is owned by a ResourceOwner
- *
- * This is different from the generic ResourceOwnerRemember in that the list of
- * locks is only a lossy cache.  It can hold up to MAX_RESOWNER_LOCKS entries,
- * and when it overflows, we stop tracking locks.  The point of only remembering
- * only up to MAX_RESOWNER_LOCKS entries is that if a lot of locks are held,
- * ResourceOwnerForgetLock doesn't need to scan through a large array to find
- * the entry.
  */
 void
-ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-	Assert(locallock != NULL);
+	Assert(owner != NULL);
+	Assert(locallockowner != NULL);
 
-	if (owner->nlocks > MAX_RESOWNER_LOCKS)
-		return;					/* we have already overflowed */
-
-	if (owner->nlocks < MAX_RESOWNER_LOCKS)
-		owner->locks[owner->nlocks] = locallock;
-	else
+#ifdef USE_ASSERT_CHECKING
 	{
-		/* overflowed */
+		dlist_iter	iter;
+
+		dlist_foreach(iter, &owner->locks)
+		{
+			LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+			Assert(i->locallock != locallockowner->locallock);
+		}
 	}
-	owner->nlocks++;
+#endif
+
+	dlist_push_tail(&owner->locks, &locallockowner->resowner_node);
 }
 
 /*
- * Forget that a Local Lock is owned by a ResourceOwner
+ * Forget that a Local Lock is owned by the given LOCALLOCKOWNER.
  */
 void
-ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerForgetLock(LOCALLOCKOWNER *locallockowner)
 {
-	int			i;
+#ifdef USE_ASSERT_CHECKING
+	ResourceOwner owner;
+
+	Assert(locallockowner != NULL);
 
-	if (owner->nlocks > MAX_RESOWNER_LOCKS)
-		return;					/* we have overflowed */
+	owner = locallockowner->owner;
 
-	Assert(owner->nlocks > 0);
-	for (i = owner->nlocks - 1; i >= 0; i--)
 	{
-		if (locallock == owner->locks[i])
+		dlist_iter	iter;
+		bool		found = false;
+
+		dlist_foreach(iter, &owner->locks)
 		{
-			owner->locks[i] = owner->locks[owner->nlocks - 1];
-			owner->nlocks--;
-			return;
+			LOCALLOCKOWNER *owner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+			if (locallockowner == owner)
+			{
+				Assert(!found);
+				found = true;
+			}
 		}
+
+		Assert(found);
 	}
-	elog(ERROR, "lock reference %p is not owned by resource owner %s",
-		 locallock, owner->name);
+#endif
+	dlist_delete(&locallockowner->resowner_node);
 }
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 590c026b5bf..e7f5b25f338 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -24,6 +24,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
+#include "lib/ilist.h"
 
 /* struct PGPROC is declared in proc.h, but must forward-reference it */
 typedef struct PGPROC PGPROC;
@@ -349,10 +350,6 @@ typedef struct LOCK
  * Otherwise, proclock objects whose holdMasks are zero are recycled
  * as soon as convenient.
  *
- * releaseMask is workspace for LockReleaseAll(): it shows the locks due
- * to be released during the current call.  This must only be examined or
- * set by the backend owning the PROCLOCK.
- *
  * Each PROCLOCK object is linked into lists for both the associated LOCK
  * object and the owning PGPROC object.  Note that the PROCLOCK is entered
  * into these lists as soon as it is created, even if no lock has yet been
@@ -374,7 +371,6 @@ typedef struct PROCLOCK
 	/* data */
 	PGPROC	   *groupLeader;	/* proc's lock group leader, or proc itself */
 	LOCKMASK	holdMask;		/* bitmask for lock types currently held */
-	LOCKMASK	releaseMask;	/* bitmask for lock types to be released */
 	dlist_node	lockLink;		/* list link in LOCK's list of proclocks */
 	dlist_node	procLink;		/* list link in PGPROC's list of proclocks */
 } PROCLOCK;
@@ -420,6 +416,13 @@ typedef struct LOCALLOCKOWNER
 	 * Must use a forward struct reference to avoid circularity.
 	 */
 	struct ResourceOwnerData *owner;
+
+	dlist_node	resowner_node;	/* dlist link for ResourceOwner.locks */
+
+	dlist_node	locallock_node;	/* dlist link for LOCALLOCK.locallockowners */
+
+	struct LOCALLOCK *locallock;	/* pointer to the corresponding LOCALLOCK */
+
 	int64		nLocks;			/* # of times held by this owner */
 } LOCALLOCKOWNER;
 
@@ -433,9 +436,9 @@ typedef struct LOCALLOCK
 	LOCK	   *lock;			/* associated LOCK object, if any */
 	PROCLOCK   *proclock;		/* associated PROCLOCK object, if any */
 	int64		nLocks;			/* total number of times lock is held */
-	int			numLockOwners;	/* # of relevant ResourceOwners */
-	int			maxLockOwners;	/* allocated size of array */
-	LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
+
+	dlist_head	locallockowners;	/* dlist of LOCALLOCKOWNER */
+
 	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
 	bool		lockCleared;	/* we read all sinval msgs for lock */
 } LOCALLOCK;
@@ -564,10 +567,16 @@ extern void AbortStrongLockAcquire(void);
 extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 						LOCKMODE lockmode, bool sessionLock);
-extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
+
+#ifdef USE_ASSERT_CHECKING
+extern void LockAssertNoneHeld(bool isCommit);
+#endif
+
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
-extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
-extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+struct ResourceOwnerData;
+extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner,
+									LOCALLOCKOWNER *locallockowner);
+extern void LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner);
 extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 #ifdef USE_ASSERT_CHECKING
 extern HTAB *GetLockMethodLocalHash(void);
@@ -576,8 +585,8 @@ extern bool LockHasWaiters(const LOCKTAG *locktag,
 						   LOCKMODE lockmode, bool sessionLock);
 extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
 											  LOCKMODE lockmode, int *countp);
-extern void AtPrepare_Locks(void);
-extern void PostPrepare_Locks(TransactionId xid);
+extern HTAB *AtPrepare_Locks(void);
+extern void PostPrepare_Locks(TransactionId xid, HTAB *sessionandxactlocks);
 extern bool LockCheckConflicts(LockMethod lockMethodTable,
 							   LOCKMODE lockmode,
 							   LOCK *lock, PROCLOCK *proclock);
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 0735480214e..cf368cedca4 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -159,8 +159,8 @@ extern void CreateAuxProcessResourceOwner(void);
 extern void ReleaseAuxProcessResources(bool isCommit);
 
 /* special support for local lock management */
-struct LOCALLOCK;
-extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCK *locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *locallock);
+struct LOCALLOCKOWNER;
+extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCKOWNER *locallockowner);
+extern void ResourceOwnerForgetLock(struct LOCALLOCKOWNER *locallockowner);
 
 #endif							/* RESOWNER_H */
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index ae58438ec76..29b1cce54b7 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -36,8 +36,9 @@ extern void ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer);
 extern void ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer);
 
 /* support for local lock management */
-extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+extern void ResourceOwnerRememberLock(ResourceOwner owner,
+									  LOCALLOCKOWNER *locallock);
+extern void ResourceOwnerForgetLock(LOCALLOCKOWNER *locallock);
 
 /* support for catcache refcount management */
 extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);

Reply via email to