On 18/09/2023 07:08, David Rowley wrote:
On Fri, 15 Sept 2023 at 22:37, Heikki Linnakangas <hlinn...@iki.fi> wrote:
I've added a call to LockAssertNoneHeld(false) in there.
I don't see it in the patch?
hmm. I must've git format-patch before committing that part.
I'll try that again... see attached.
This needed a rebase after my ResourceOwner refactoring. Attached.
A few quick comments:
- It would be nice to add a test for the issue that you fixed in patch
v7, i.e. if you prepare a transaction while holding session-level locks.
- GrantLockLocal() now calls MemoryContextAlloc(), which can fail if you
are out of memory. Is that handled gracefully or is the lock leaked?
--
Heikki Linnakangas
Neon (https://neon.tech)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 74ce5f9491c..8c8585be7ab 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2415,6 +2415,7 @@ PrepareTransaction(void)
TransactionId xid = GetCurrentTransactionId();
GlobalTransaction gxact;
TimestampTz prepared_at;
+ HTAB *sessionandxactlocks;
Assert(!IsInParallelMode());
@@ -2560,7 +2561,17 @@ PrepareTransaction(void)
StartPrepare(gxact);
AtPrepare_Notify();
- AtPrepare_Locks();
+
+ /*
+ * Prepare the locks and save the returned hash table that describes if
+ * the lock is held at the session and/or transaction level. We need to
+ * know if we're dealing with session locks inside PostPrepare_Locks(),
+ * but we're unable to build the hash table there due to that function
+ * only discovering if we're dealing with a session lock while we're in a
+ * critical section, in which case we can't allocate memory for the hash
+ * table.
+ */
+ sessionandxactlocks = AtPrepare_Locks();
AtPrepare_PredicateLocks();
AtPrepare_PgStat();
AtPrepare_MultiXact();
@@ -2587,7 +2598,10 @@ PrepareTransaction(void)
* ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
* conclude "xact already committed or aborted" for our locks.
*/
- PostPrepare_Locks(xid);
+ PostPrepare_Locks(xid, sessionandxactlocks);
+
+ /* We no longer need this hash table */
+ hash_destroy(sessionandxactlocks);
/*
* Let others know about no transaction in progress by me. This has to be
diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index 296dc82d2ee..5baf83ac6ce 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -71,7 +71,12 @@ DiscardAll(bool isTopLevel)
ResetAllOptions();
DropAllPreparedStatements();
Async_UnlistenAll();
- LockReleaseAll(USER_LOCKMETHOD, true);
+ LockReleaseSession(USER_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+ LockAssertNoneHeld(false);
+#endif
+
ResetPlanCache();
ResetTempTableNamespace();
ResetSequenceCaches();
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 501910b4454..835f112f751 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -841,7 +841,11 @@ logicalrep_worker_onexit(int code, Datum arg)
* The locks will be acquired once the worker is initialized.
*/
if (!InitializingApplyWorker)
- LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+ LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+ LockAssertNoneHeld(false);
+#endif
ApplyLauncherWakeup();
}
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 45de0fd2bd6..e7e0f29347a 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -182,12 +182,6 @@ holdMask -
subset of the PGPROC object's heldLocks mask (if the PGPROC is
currently waiting for another lock mode on this lock).
-releaseMask -
- A bitmask for the lock modes due to be released during LockReleaseAll.
- This must be a subset of the holdMask. Note that it is modified without
- taking the partition LWLock, and therefore it is unsafe for any
- backend except the one owning the PROCLOCK to examine/change it.
-
lockLink -
List link for shared memory queue of all the PROCLOCK objects for the
same LOCK.
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index b8c57b3e16e..ce54b589b0d 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -22,8 +22,7 @@
* Interface:
*
* InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
- * LockAcquire(), LockRelease(), LockReleaseAll(),
- * LockCheckConflicts(), GrantLock()
+ * LockAcquire(), LockRelease(), LockCheckConflicts(), GrantLock()
*
*-------------------------------------------------------------------------
*/
@@ -162,6 +161,16 @@ typedef struct TwoPhaseLockRecord
LOCKMODE lockmode;
} TwoPhaseLockRecord;
+/*
+ * Used by for a hash table entry type in AtPrepare_Locks() to communicate the
+ * session/xact lock status of each held lock for use in PostPrepare_Locks().
+ */
+typedef struct PerLockTagEntry
+{
+ LOCKTAG lock; /* identifies the lockable object */
+ bool sessLock; /* is any lockmode held at session level? */
+ bool xactLock; /* is any lockmode held at xact level? */
+} PerLockTagEntry;
/*
* Count of the number of fast path lock slots we believe to be used. This
@@ -270,6 +279,8 @@ static HTAB *LockMethodLockHash;
static HTAB *LockMethodProcLockHash;
static HTAB *LockMethodLocalHash;
+/* A memory context for storing LOCALLOCKOWNER structs */
+MemoryContext LocalLockOwnerContext;
/* private state for error cleanup */
static LOCALLOCK *StrongLockInProgress;
@@ -277,6 +288,9 @@ static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;
+static dlist_head session_locks[lengthof(LockMethods)];
+
+
#ifdef LOCK_DEBUG
/*------
@@ -363,8 +377,8 @@ static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
static void FinishStrongLockAcquire(void);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
-static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
-static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
+static void ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock);
+static void LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -465,6 +479,21 @@ InitLocks(void)
16,
&info,
HASH_ELEM | HASH_BLOBS);
+
+ /* Initialize each element of the session_locks array */
+ for (int i = 0; i < lengthof(LockMethods); i++)
+ dlist_init(&session_locks[i]);
+
+ /*
+ * Create a slab context for storing LOCALLOCKOWNERs. Slab seems like a
+ * good context type for this as it will manage fragmentation better than
+ * aset.c contexts and it will free() excess memory rather than maintain
+ * excessively long freelists after a large surge in locking requirements.
+ */
+ LocalLockOwnerContext = SlabContextCreate(TopMemoryContext,
+ "LOCALLOCKOWNER context",
+ SLAB_DEFAULT_BLOCK_SIZE,
+ sizeof(LOCALLOCKOWNER));
}
@@ -827,26 +856,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->nLocks = 0;
locallock->holdsStrongLockCount = false;
locallock->lockCleared = false;
- locallock->numLockOwners = 0;
- locallock->maxLockOwners = 8;
- locallock->lockOwners = NULL; /* in case next line fails */
- locallock->lockOwners = (LOCALLOCKOWNER *)
- MemoryContextAlloc(TopMemoryContext,
- locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+ dlist_init(&locallock->locallockowners);
}
- else
- {
- /* Make sure there will be room to remember the lock */
- if (locallock->numLockOwners >= locallock->maxLockOwners)
- {
- int newsize = locallock->maxLockOwners * 2;
- locallock->lockOwners = (LOCALLOCKOWNER *)
- repalloc(locallock->lockOwners,
- newsize * sizeof(LOCALLOCKOWNER));
- locallock->maxLockOwners = newsize;
- }
- }
hashcode = locallock->hashcode;
if (locallockp)
@@ -1249,7 +1261,6 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
proclock->groupLeader = proc->lockGroupLeader != NULL ?
proc->lockGroupLeader : proc;
proclock->holdMask = 0;
- proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
dlist_push_tail(&lock->procLocks, &proclock->lockLink);
dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink);
@@ -1343,17 +1354,19 @@ CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired)
static void
RemoveLocalLock(LOCALLOCK *locallock)
{
- int i;
+ dlist_mutable_iter iter;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ dlist_foreach_modify(iter, &locallock->locallockowners)
{
- if (locallock->lockOwners[i].owner != NULL)
- ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+ Assert(locallockowner->owner != NULL);
+ dlist_delete(&locallockowner->locallock_node);
+ ResourceOwnerForgetLock(locallockowner);
+ pfree(locallockowner);
}
- locallock->numLockOwners = 0;
- if (locallock->lockOwners != NULL)
- pfree(locallock->lockOwners);
- locallock->lockOwners = NULL;
+
+ Assert(dlist_is_empty(&locallock->locallockowners));
if (locallock->holdsStrongLockCount)
{
@@ -1659,26 +1672,38 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
static void
GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
{
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
- int i;
+ LOCALLOCKOWNER *locallockowner;
+ dlist_iter iter;
- Assert(locallock->numLockOwners < locallock->maxLockOwners);
/* Count the total */
locallock->nLocks++;
/* Count the per-owner lock */
- for (i = 0; i < locallock->numLockOwners; i++)
+ dlist_foreach(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == owner)
+ locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+ if (locallockowner->owner == owner)
{
- lockOwners[i].nLocks++;
+ locallockowner->nLocks++;
return;
}
}
- lockOwners[i].owner = owner;
- lockOwners[i].nLocks = 1;
- locallock->numLockOwners++;
+ locallockowner = MemoryContextAlloc(LocalLockOwnerContext, sizeof(LOCALLOCKOWNER));
+ locallockowner->owner = owner;
+ locallockowner->nLocks = 1;
+ locallockowner->locallock = locallock;
+
+ dlist_push_tail(&locallock->locallockowners, &locallockowner->locallock_node);
+
if (owner != NULL)
- ResourceOwnerRememberLock(owner, locallock);
+ ResourceOwnerRememberLock(owner, locallockowner);
+ else
+ {
+ LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallockowner->locallock);
+
+ Assert(lockmethodid > 0 && lockmethodid <= 2);
+ dlist_push_tail(&session_locks[lockmethodid - 1], &locallockowner->resowner_node);
+ }
/* Indicate that the lock is acquired for certain types of locks. */
CheckAndSetLockHeld(locallock, true);
@@ -1971,9 +1996,9 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
* Decrease the count for the resource owner.
*/
{
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
ResourceOwner owner;
- int i;
+ dlist_mutable_iter iter;
+ bool found = false;
/* Identify owner for lock */
if (sessionLock)
@@ -1981,24 +2006,33 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
else
owner = CurrentResourceOwner;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ dlist_foreach_modify(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == owner)
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+ if (locallockowner->owner != owner)
+ continue;
+
+ found = true;
+
+ if (--locallockowner->nLocks == 0)
{
- Assert(lockOwners[i].nLocks > 0);
- if (--lockOwners[i].nLocks == 0)
- {
- if (owner != NULL)
- ResourceOwnerForgetLock(owner, locallock);
- /* compact out unused slot */
- locallock->numLockOwners--;
- if (i < locallock->numLockOwners)
- lockOwners[i] = lockOwners[locallock->numLockOwners];
- }
- break;
+ dlist_delete(&locallockowner->locallock_node);
+
+ if (owner != NULL)
+ ResourceOwnerForgetLock(locallockowner);
+ else
+ dlist_delete(&locallockowner->resowner_node);
+ pfree(locallockowner);
+ }
+ else
+ {
+ /* ensure nLocks didn't go negative */
+ Assert(locallockowner->nLocks >= 0);
}
}
- if (i < 0)
+
+ if (!found)
{
/* don't release a lock belonging to another owner */
elog(WARNING, "you don't own a lock of type %s",
@@ -2016,6 +2050,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return true;
+ Assert(locallock->nLocks == 0);
+
/*
* At this point we can no longer suppose we are clear of invalidation
* messages related to this lock. Although we'll delete the LOCALLOCK
@@ -2118,274 +2154,44 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
return true;
}
+#ifdef USE_ASSERT_CHECKING
/*
- * LockReleaseAll -- Release all locks of the specified lock method that
- * are held by the current process.
- *
- * Well, not necessarily *all* locks. The available behaviors are:
- * allLocks == true: release all locks including session locks.
- * allLocks == false: release all non-session locks.
+ * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
+ * locks during an abort.
*/
void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
+LockAssertNoneHeld(bool isCommit)
{
HASH_SEQ_STATUS status;
- LockMethod lockMethodTable;
- int i,
- numLockModes;
LOCALLOCK *locallock;
- LOCK *lock;
- int partition;
- bool have_fast_path_lwlock = false;
-
- if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
- elog(ERROR, "unrecognized lock method: %d", lockmethodid);
- lockMethodTable = LockMethods[lockmethodid];
-
-#ifdef LOCK_DEBUG
- if (*(lockMethodTable->trace_flag))
- elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
-#endif
-
- /*
- * Get rid of our fast-path VXID lock, if appropriate. Note that this is
- * the only way that the lock we hold on our own VXID can ever get
- * released: it is always and only released when a toplevel transaction
- * ends.
- */
- if (lockmethodid == DEFAULT_LOCKMETHOD)
- VirtualXactLockTableCleanup();
-
- numLockModes = lockMethodTable->numLockModes;
- /*
- * First we run through the locallock table and get rid of unwanted
- * entries, then we scan the process's proclocks and get rid of those. We
- * do this separately because we may have multiple locallock entries
- * pointing to the same proclock, and we daren't end up with any dangling
- * pointers. Fast-path locks are cleaned up during the locallock table
- * scan, though.
- */
- hash_seq_init(&status, LockMethodLocalHash);
-
- while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ if (!isCommit)
{
- /*
- * If the LOCALLOCK entry is unused, we must've run out of shared
- * memory while trying to set up this lock. Just forget the local
- * entry.
- */
- if (locallock->nLocks == 0)
- {
- RemoveLocalLock(locallock);
- continue;
- }
-
- /* Ignore items that are not of the lockmethod to be removed */
- if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
- continue;
+ hash_seq_init(&status, LockMethodLocalHash);
- /*
- * If we are asked to release all locks, we can just zap the entry.
- * Otherwise, must scan to see if there are session locks. We assume
- * there is at most one lockOwners entry for session locks.
- */
- if (!allLocks)
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+ dlist_iter local_iter;
- /* If session lock is above array position 0, move it down to 0 */
- for (i = 0; i < locallock->numLockOwners; i++)
- {
- if (lockOwners[i].owner == NULL)
- lockOwners[0] = lockOwners[i];
- else
- ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
- }
+ Assert(locallock->nLocks >= 0);
- if (locallock->numLockOwners > 0 &&
- lockOwners[0].owner == NULL &&
- lockOwners[0].nLocks > 0)
+ dlist_foreach(local_iter, &locallock->locallockowners)
{
- /* Fix the locallock to show just the session locks */
- locallock->nLocks = lockOwners[0].nLocks;
- locallock->numLockOwners = 1;
- /* We aren't deleting this locallock, so done */
- continue;
- }
- else
- locallock->numLockOwners = 0;
- }
-
- /*
- * If the lock or proclock pointers are NULL, this lock was taken via
- * the relation fast-path (and is not known to have been transferred).
- */
- if (locallock->proclock == NULL || locallock->lock == NULL)
- {
- LOCKMODE lockmode = locallock->tag.mode;
- Oid relid;
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+ locallock_node,
+ local_iter.cur);
- /* Verify that a fast-path lock is what we've got. */
- if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
- elog(PANIC, "locallock table corrupted");
+ Assert(locallockowner->owner == NULL);
- /*
- * If we don't currently hold the LWLock that protects our
- * fast-path data structures, we must acquire it before attempting
- * to release the lock via the fast-path. We will continue to
- * hold the LWLock until we're done scanning the locallock table,
- * unless we hit a transferred fast-path lock. (XXX is this
- * really such a good idea? There could be a lot of entries ...)
- */
- if (!have_fast_path_lwlock)
- {
- LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE);
- have_fast_path_lwlock = true;
- }
-
- /* Attempt fast-path release. */
- relid = locallock->tag.lock.locktag_field2;
- if (FastPathUnGrantRelationLock(relid, lockmode))
- {
- RemoveLocalLock(locallock);
- continue;
+ if (locallockowner->nLocks > 0 &&
+ LOCALLOCK_LOCKMETHOD(*locallock) == DEFAULT_LOCKMETHOD)
+ Assert(false);
}
-
- /*
- * Our lock, originally taken via the fast path, has been
- * transferred to the main lock table. That's going to require
- * some extra work, so release our fast-path lock before starting.
- */
- LWLockRelease(&MyProc->fpInfoLock);
- have_fast_path_lwlock = false;
-
- /*
- * Now dump the lock. We haven't got a pointer to the LOCK or
- * PROCLOCK in this case, so we have to handle this a bit
- * differently than a normal lock release. Unfortunately, this
- * requires an extra LWLock acquire-and-release cycle on the
- * partitionLock, but hopefully it shouldn't happen often.
- */
- LockRefindAndRelease(lockMethodTable, MyProc,
- &locallock->tag.lock, lockmode, false);
- RemoveLocalLock(locallock);
- continue;
}
-
- /* Mark the proclock to show we need to release this lockmode */
- if (locallock->nLocks > 0)
- locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
-
- /* And remove the locallock hashtable entry */
- RemoveLocalLock(locallock);
}
-
- /* Done with the fast-path data structures */
- if (have_fast_path_lwlock)
- LWLockRelease(&MyProc->fpInfoLock);
-
- /*
- * Now, scan each lock partition separately.
- */
- for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
- {
- LWLock *partitionLock;
- dlist_head *procLocks = &MyProc->myProcLocks[partition];
- dlist_mutable_iter proclock_iter;
-
- partitionLock = LockHashPartitionLockByIndex(partition);
-
- /*
- * If the proclock list for this partition is empty, we can skip
- * acquiring the partition lock. This optimization is trickier than
- * it looks, because another backend could be in process of adding
- * something to our proclock list due to promoting one of our
- * fast-path locks. However, any such lock must be one that we
- * decided not to delete above, so it's okay to skip it again now;
- * we'd just decide not to delete it again. We must, however, be
- * careful to re-fetch the list header once we've acquired the
- * partition lock, to be sure we have a valid, up-to-date pointer.
- * (There is probably no significant risk if pointer fetch/store is
- * atomic, but we don't wish to assume that.)
- *
- * XXX This argument assumes that the locallock table correctly
- * represents all of our fast-path locks. While allLocks mode
- * guarantees to clean up all of our normal locks regardless of the
- * locallock situation, we lose that guarantee for fast-path locks.
- * This is not ideal.
- */
- if (dlist_is_empty(procLocks))
- continue; /* needn't examine this partition */
-
- LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-
- dlist_foreach_modify(proclock_iter, procLocks)
- {
- PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
- bool wakeupNeeded = false;
-
- Assert(proclock->tag.myProc == MyProc);
-
- lock = proclock->tag.myLock;
-
- /* Ignore items that are not of the lockmethod to be removed */
- if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
- continue;
-
- /*
- * In allLocks mode, force release of all locks even if locallock
- * table had problems
- */
- if (allLocks)
- proclock->releaseMask = proclock->holdMask;
- else
- Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
-
- /*
- * Ignore items that have nothing to be released, unless they have
- * holdMask == 0 and are therefore recyclable
- */
- if (proclock->releaseMask == 0 && proclock->holdMask != 0)
- continue;
-
- PROCLOCK_PRINT("LockReleaseAll", proclock);
- LOCK_PRINT("LockReleaseAll", lock, 0);
- Assert(lock->nRequested >= 0);
- Assert(lock->nGranted >= 0);
- Assert(lock->nGranted <= lock->nRequested);
- Assert((proclock->holdMask & ~lock->grantMask) == 0);
-
- /*
- * Release the previously-marked lock modes
- */
- for (i = 1; i <= numLockModes; i++)
- {
- if (proclock->releaseMask & LOCKBIT_ON(i))
- wakeupNeeded |= UnGrantLock(lock, i, proclock,
- lockMethodTable);
- }
- Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
- Assert(lock->nGranted <= lock->nRequested);
- LOCK_PRINT("LockReleaseAll: updated", lock, 0);
-
- proclock->releaseMask = 0;
-
- /* CleanUpLock will wake up waiters if needed. */
- CleanUpLock(lock, proclock,
- lockMethodTable,
- LockTagHashCode(&lock->tag),
- wakeupNeeded);
- } /* loop over PROCLOCKs within this partition */
-
- LWLockRelease(partitionLock);
- } /* loop over partitions */
-
-#ifdef LOCK_DEBUG
- if (*(lockMethodTable->trace_flag))
- elog(LOG, "LockReleaseAll done");
-#endif
+ Assert(MyProc->fpLockBits == 0);
}
+#endif
/*
* LockReleaseSession -- Release all session locks of the specified lock method
@@ -2394,59 +2200,39 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
- HASH_SEQ_STATUS status;
- LOCALLOCK *locallock;
+ dlist_mutable_iter iter;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
- hash_seq_init(&status, LockMethodLocalHash);
-
- while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ dlist_foreach_modify(iter, &session_locks[lockmethodid - 1])
{
- /* Ignore items that are not of the specified lock method */
- if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
- continue;
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
- ReleaseLockIfHeld(locallock, true);
+ Assert(LOCALLOCK_LOCKMETHOD(*locallockowner->locallock) == lockmethodid);
+
+ ReleaseLockIfHeld(locallockowner, true);
}
+
+ Assert(dlist_is_empty(&session_locks[lockmethodid - 1]));
}
/*
* LockReleaseCurrentOwner
- * Release all locks belonging to CurrentResourceOwner
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held.
- * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
- * table to find them.
+ * Release all locks belonging to 'owner'
*/
void
-LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReleaseCurrentOwner(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
{
- if (locallocks == NULL)
- {
- HASH_SEQ_STATUS status;
- LOCALLOCK *locallock;
-
- hash_seq_init(&status, LockMethodLocalHash);
-
- while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
- ReleaseLockIfHeld(locallock, false);
- }
- else
- {
- int i;
+ Assert(locallockowner->owner == owner);
- for (i = nlocks - 1; i >= 0; i--)
- ReleaseLockIfHeld(locallocks[i], false);
- }
+ ReleaseLockIfHeld(locallockowner, false);
}
/*
* ReleaseLockIfHeld
- * Release any session-level locks on this lockable object if sessionLock
- * is true; else, release any locks held by CurrentResourceOwner.
+ * Release any session-level locks on this 'locallockowner' if
+ * sessionLock is true; else, release any locks held by 'locallockowner'.
*
* It is tempting to pass this a ResourceOwner pointer (or NULL for session
* locks), but without refactoring LockRelease() we cannot support releasing
@@ -2457,52 +2243,39 @@ LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
* convenience.
*/
static void
-ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
+ReleaseLockIfHeld(LOCALLOCKOWNER *locallockowner, bool sessionLock)
{
- ResourceOwner owner;
- LOCALLOCKOWNER *lockOwners;
- int i;
+ LOCALLOCK *locallock = locallockowner->locallock;
+
+ /* release all references to the lock by this resource owner */
- /* Identify owner for lock (must match LockRelease!) */
if (sessionLock)
- owner = NULL;
+ Assert(locallockowner->owner == NULL);
else
- owner = CurrentResourceOwner;
+ Assert(locallockowner->owner != NULL);
- /* Scan to see if there are any locks belonging to the target owner */
- lockOwners = locallock->lockOwners;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ /* We will still hold this lock after forgetting this ResourceOwner. */
+ if (locallockowner->nLocks < locallock->nLocks)
{
- if (lockOwners[i].owner == owner)
- {
- Assert(lockOwners[i].nLocks > 0);
- if (lockOwners[i].nLocks < locallock->nLocks)
- {
- /*
- * We will still hold this lock after forgetting this
- * ResourceOwner.
- */
- locallock->nLocks -= lockOwners[i].nLocks;
- /* compact out unused slot */
- locallock->numLockOwners--;
- if (owner != NULL)
- ResourceOwnerForgetLock(owner, locallock);
- if (i < locallock->numLockOwners)
- lockOwners[i] = lockOwners[locallock->numLockOwners];
- }
- else
- {
- Assert(lockOwners[i].nLocks == locallock->nLocks);
- /* We want to call LockRelease just once */
- lockOwners[i].nLocks = 1;
- locallock->nLocks = 1;
- if (!LockRelease(&locallock->tag.lock,
- locallock->tag.mode,
- sessionLock))
- elog(WARNING, "ReleaseLockIfHeld: failed??");
- }
- break;
- }
+ locallock->nLocks -= locallockowner->nLocks;
+ dlist_delete(&locallockowner->locallock_node);
+
+ if (sessionLock)
+ dlist_delete(&locallockowner->resowner_node);
+ else
+ ResourceOwnerForgetLock(locallockowner);
+ }
+ else
+ {
+ Assert(locallockowner->nLocks == locallock->nLocks);
+ /* We want to call LockRelease just once */
+ locallockowner->nLocks = 1;
+ locallock->nLocks = 1;
+
+ if (!LockRelease(&locallock->tag.lock,
+ locallock->tag.mode,
+ sessionLock))
+ elog(WARNING, "ReleaseLockIfHeld: failed??");
}
}
@@ -2510,82 +2283,47 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
* LockReassignCurrentOwner
* Reassign all locks belonging to CurrentResourceOwner to belong
* to its parent resource owner.
- *
- * If the caller knows what those locks are, it can pass them as an array.
- * That speeds up the call significantly, when a lot of locks are held
- * (e.g pg_dump with a large schema). Otherwise, pass NULL for locallocks,
- * and we'll traverse through our hash table to find them.
*/
void
-LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner)
{
ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
- Assert(parent != NULL);
-
- if (locallocks == NULL)
- {
- HASH_SEQ_STATUS status;
- LOCALLOCK *locallock;
-
- hash_seq_init(&status, LockMethodLocalHash);
-
- while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
- LockReassignOwner(locallock, parent);
- }
- else
- {
- int i;
-
- for (i = nlocks - 1; i >= 0; i--)
- LockReassignOwner(locallocks[i], parent);
- }
+ LockReassignOwner(locallockowner, parent);
}
/*
- * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
- * CurrentResourceOwner to its parent.
+ * Subroutine of LockReassignCurrentOwner. Reassigns the given
+ *'locallockowner' to 'parent'.
*/
static void
-LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
+LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent)
{
- LOCALLOCKOWNER *lockOwners;
- int i;
- int ic = -1;
- int ip = -1;
+ dlist_iter iter;
+ LOCALLOCK *locallock = locallockowner->locallock;
- /*
- * Scan to see if there are any locks belonging to current owner or its
- * parent
- */
- lockOwners = locallock->lockOwners;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ ResourceOwnerForgetLock(locallockowner);
+
+ dlist_foreach(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == CurrentResourceOwner)
- ic = i;
- else if (lockOwners[i].owner == parent)
- ip = i;
- }
+ LOCALLOCKOWNER *parentlocalowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
- if (ic < 0)
- return; /* no current locks */
+ Assert(parentlocalowner->locallock == locallock);
- if (ip < 0)
- {
- /* Parent has no slot, so just give it the child's slot */
- lockOwners[ic].owner = parent;
- ResourceOwnerRememberLock(parent, locallock);
- }
- else
- {
- /* Merge child's count with parent's */
- lockOwners[ip].nLocks += lockOwners[ic].nLocks;
- /* compact out unused slot */
- locallock->numLockOwners--;
- if (ic < locallock->numLockOwners)
- lockOwners[ic] = lockOwners[locallock->numLockOwners];
+ if (parentlocalowner->owner != parent)
+ continue;
+
+ parentlocalowner->nLocks += locallockowner->nLocks;
+
+ locallockowner->nLocks = 0;
+ dlist_delete(&locallockowner->locallock_node);
+ pfree(locallockowner);
+ return;
}
- ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
+
+ /* reassign locallockowner to parent resowner */
+ locallockowner->owner = parent;
+ ResourceOwnerRememberLock(parent, locallockowner);
}
/*
@@ -3057,7 +2795,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
* We currently use this in two situations: first, to release locks held by
* prepared transactions on commit (see lock_twophase_postcommit); and second,
* to release locks taken via the fast-path, transferred to the main hash
- * table, and then released (see LockReleaseAll).
+ * table, and then released (see ResourceOwnerRelease).
*/
static void
LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
@@ -3163,16 +2901,9 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
* we can't implement this check by examining LOCALLOCK entries in isolation.
* We must build a transient hashtable that is indexed by locktag only.
*/
-static void
+static HTAB *
CheckForSessionAndXactLocks(void)
{
- typedef struct
- {
- LOCKTAG lock; /* identifies the lockable object */
- bool sessLock; /* is any lockmode held at session level? */
- bool xactLock; /* is any lockmode held at xact level? */
- } PerLockTagEntry;
-
HASHCTL hash_ctl;
HTAB *lockhtab;
HASH_SEQ_STATUS status;
@@ -3193,10 +2924,9 @@ CheckForSessionAndXactLocks(void)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
PerLockTagEntry *hentry;
bool found;
- int i;
+ dlist_iter iter;
/*
* Ignore VXID locks. We don't want those to be held by prepared
@@ -3217,9 +2947,13 @@ CheckForSessionAndXactLocks(void)
hentry->sessLock = hentry->xactLock = false;
/* Scan to see if we hold lock at session or xact level or both */
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ dlist_foreach(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == NULL)
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+ locallock_node,
+ iter.cur);
+
+ if (locallockowner->owner == NULL)
hentry->sessLock = true;
else
hentry->xactLock = true;
@@ -3235,8 +2969,7 @@ CheckForSessionAndXactLocks(void)
errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
}
- /* Success, so clean up */
- hash_destroy(lockhtab);
+ return lockhtab;
}
/*
@@ -3244,6 +2977,11 @@ CheckForSessionAndXactLocks(void)
* Do the preparatory work for a PREPARE: make 2PC state file records
* for all locks currently held.
*
+ * Returns a hash table of PerLockTagEntry structs with an entry for each
+ * lock held by this backend marking if the lock is held at the session or
+ * xact level, or both. It is up to the calling function to call
+ * hash_destroy() on this table to free the memory used by it.
+ *
* Session-level locks are ignored, as are VXID locks.
*
* For the most part, we don't need to touch shared memory for this ---
@@ -3251,14 +2989,15 @@ CheckForSessionAndXactLocks(void)
* Fast-path locks are an exception, however: we move any such locks to
* the main table before allowing PREPARE TRANSACTION to succeed.
*/
-void
+HTAB *
AtPrepare_Locks(void)
{
HASH_SEQ_STATUS status;
LOCALLOCK *locallock;
+ HTAB *sessionandxactlocks;
/* First, verify there aren't locks of both xact and session level */
- CheckForSessionAndXactLocks();
+ sessionandxactlocks = CheckForSessionAndXactLocks();
/* Now do the per-locallock cleanup work */
hash_seq_init(&status, LockMethodLocalHash);
@@ -3266,10 +3005,9 @@ AtPrepare_Locks(void)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
TwoPhaseLockRecord record;
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
bool haveSessionLock;
bool haveXactLock;
- int i;
+ dlist_iter iter;
/*
* Ignore VXID locks. We don't want those to be held by prepared
@@ -3284,9 +3022,13 @@ AtPrepare_Locks(void)
/* Scan to see whether we hold it at session or transaction level */
haveSessionLock = haveXactLock = false;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ dlist_foreach(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == NULL)
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+ locallock_node,
+ iter.cur);
+
+ if (locallockowner->owner == NULL)
haveSessionLock = true;
else
haveXactLock = true;
@@ -3330,6 +3072,8 @@ AtPrepare_Locks(void)
RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
&record, sizeof(TwoPhaseLockRecord));
}
+
+ return sessionandxactlocks;
}
/*
@@ -3344,11 +3088,11 @@ AtPrepare_Locks(void)
* pointers in the transaction's resource owner. This is OK at the
* moment since resowner.c doesn't try to free locks retail at a toplevel
* transaction commit or abort. We could alternatively zero out nLocks
- * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
- * but that probably costs more cycles.
+ * and leave the LOCALLOCK entries to be garbage-collected by
+ * ResourceOwnerRelease, but that probably costs more cycles.
*/
void
-PostPrepare_Locks(TransactionId xid)
+PostPrepare_Locks(TransactionId xid, HTAB *sessionandxactlocks)
{
PGPROC *newproc = TwoPhaseGetDummyProc(xid, false);
HASH_SEQ_STATUS status;
@@ -3378,10 +3122,9 @@ PostPrepare_Locks(TransactionId xid)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
- LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
bool haveSessionLock;
bool haveXactLock;
- int i;
+ dlist_iter iter;
if (locallock->proclock == NULL || locallock->lock == NULL)
{
@@ -3400,9 +3143,13 @@ PostPrepare_Locks(TransactionId xid)
/* Scan to see whether we hold it at session or transaction level */
haveSessionLock = haveXactLock = false;
- for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ dlist_foreach(iter, &locallock->locallockowners)
{
- if (lockOwners[i].owner == NULL)
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+ locallock_node,
+ iter.cur);
+
+ if (locallockowner->owner == NULL)
haveSessionLock = true;
else
haveXactLock = true;
@@ -3418,10 +3165,6 @@ PostPrepare_Locks(TransactionId xid)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
- /* Mark the proclock to show we need to release this lockmode */
- if (locallock->nLocks > 0)
- locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
-
/* And remove the locallock hashtable entry */
RemoveLocalLock(locallock);
}
@@ -3439,11 +3182,7 @@ PostPrepare_Locks(TransactionId xid)
/*
* If the proclock list for this partition is empty, we can skip
- * acquiring the partition lock. This optimization is safer than the
- * situation in LockReleaseAll, because we got rid of any fast-path
- * locks during AtPrepare_Locks, so there cannot be any case where
- * another backend is adding something to our lists now. For safety,
- * though, we code this the same way as in LockReleaseAll.
+ * acquiring the partition lock.
*/
if (dlist_is_empty(procLocks))
continue; /* needn't examine this partition */
@@ -3452,6 +3191,8 @@ PostPrepare_Locks(TransactionId xid)
dlist_foreach_modify(proclock_iter, procLocks)
{
+ PerLockTagEntry *locktagentry;
+
proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
Assert(proclock->tag.myProc == MyProc);
@@ -3469,13 +3210,14 @@ PostPrepare_Locks(TransactionId xid)
Assert(lock->nGranted <= lock->nRequested);
Assert((proclock->holdMask & ~lock->grantMask) == 0);
- /* Ignore it if nothing to release (must be a session lock) */
- if (proclock->releaseMask == 0)
- continue;
+ locktagentry = hash_search(sessionandxactlocks,
+ &lock->tag,
+ HASH_FIND,
+ NULL);
- /* Else we should be releasing all locks */
- if (proclock->releaseMask != proclock->holdMask)
- elog(PANIC, "we seem to have dropped a bit somewhere");
+ /* skip session locks */
+ if (locktagentry != NULL && locktagentry->sessLock)
+ continue;
/*
* We cannot simply modify proclock->tag.myProc to reassign
@@ -4245,7 +3987,6 @@ lock_twophase_recover(TransactionId xid, uint16 info,
Assert(proc->lockGroupLeader == NULL);
proclock->groupLeader = proc;
proclock->holdMask = 0;
- proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
dlist_push_tail(&lock->procLocks, &proclock->lockLink);
dlist_push_tail(&proc->myProcLocks[partition],
@@ -4382,7 +4123,7 @@ lock_twophase_postabort(TransactionId xid, uint16 info,
*
* We don't bother recording this lock in the local lock table, since it's
* only ever released at the end of a transaction. Instead,
- * LockReleaseAll() calls VirtualXactLockTableCleanup().
+ * ProcReleaseLocks() calls VirtualXactLockTableCleanup().
*/
void
VirtualXactLockTableInsert(VirtualTransactionId vxid)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index e9e445bb216..9a7ea73cdcd 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -777,10 +777,17 @@ ProcReleaseLocks(bool isCommit)
return;
/* If waiting, get off wait queue (should only be needed after error) */
LockErrorCleanup();
- /* Release standard locks, including session-level if aborting */
- LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
- /* Release transaction-level advisory locks */
- LockReleaseAll(USER_LOCKMETHOD, false);
+
+ VirtualXactLockTableCleanup();
+
+ /* Release session-level locks if aborting */
+ if (!isCommit)
+ LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+ /* Ensure all locks were released */
+ LockAssertNoneHeld(isCommit);
+#endif
}
@@ -865,6 +872,8 @@ ProcKill(int code, Datum arg)
LWLockRelease(leader_lwlock);
}
+ Assert(MyProc->fpLockBits == 0);
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 552cf9d950a..d64998e4ee8 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -1353,10 +1353,10 @@ ShutdownPostgres(int code, Datum arg)
AbortOutOfAnyTransaction();
/*
- * User locks are not released by transaction end, so be sure to release
- * them explicitly.
+ * Session locks are not released by transaction end, so be sure to
+ * release them explicitly.
*/
- LockReleaseAll(USER_LOCKMETHOD, true);
+ LockReleaseSession(USER_LOCKMETHOD);
}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index f096f3df20a..96417e3cb94 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -91,18 +91,6 @@ typedef struct ResourceElem
StaticAssertDecl(RESOWNER_HASH_MAX_ITEMS(RESOWNER_HASH_INIT_SIZE) >= RESOWNER_ARRAY_SIZE,
"initial hash size too small compared to array size");
-/*
- * MAX_RESOWNER_LOCKS is the size of the per-resource owner locks cache. It's
- * chosen based on some testing with pg_dump with a large schema. When the
- * tests were done (on 9.2), resource owners in a pg_dump run contained up
- * to 9 locks, regardless of the schema size, except for the top resource
- * owner which contained much more (overflowing the cache). 15 seems like a
- * nice round number that's somewhat higher than what pg_dump needs. Note that
- * making this number larger is not free - the bigger the cache, the slower
- * it is to release locks (in retail), when a resource owner holds many locks.
- */
-#define MAX_RESOWNER_LOCKS 15
-
/*
* ResourceOwner objects look like this
*/
@@ -152,10 +140,10 @@ typedef struct ResourceOwnerData
uint32 capacity; /* allocated length of hash[] */
uint32 grow_at; /* grow hash when reach this */
- /* The local locks cache. */
- LOCALLOCK *locks[MAX_RESOWNER_LOCKS]; /* list of owned locks */
+ dlist_head locks; /* dlist of owned locks */
} ResourceOwnerData;
+#include "lib/ilist.h"
/*****************************************************************************
* GLOBAL MEMORY *
@@ -423,6 +411,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
owner = (ResourceOwner) MemoryContextAllocZero(TopMemoryContext,
sizeof(ResourceOwnerData));
owner->name = name;
+ dlist_init(&owner->locks);
if (parent)
{
@@ -729,8 +718,19 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
+ dlist_mutable_iter iter;
+
if (isTopLevel)
{
+ dlist_foreach_modify(iter, &owner->locks)
+ {
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+ LockReleaseCurrentOwner(owner, locallockowner);
+ }
+
+ Assert(dlist_is_empty(&owner->locks));
+
/*
* For a top-level xact we are going to release all locks (or at
* least all non-session locks), so just do a single lmgr call at
@@ -749,30 +749,30 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
* subtransaction, we do NOT release its locks yet, but transfer
* them to the parent.
*/
- LOCALLOCK **locks;
- int nlocks;
+ if (isCommit)
+ {
+ dlist_foreach_modify(iter, &owner->locks)
+ {
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+ resowner_node,
+ iter.cur);
- Assert(owner->parent != NULL);
+ LockReassignCurrentOwner(locallockowner);
+ }
- /*
- * Pass the list of locks owned by this resource owner to the lock
- * manager, unless it has overflowed.
- */
- if (owner->nlocks > MAX_RESOWNER_LOCKS)
- {
- locks = NULL;
- nlocks = 0;
+ Assert(dlist_is_empty(&owner->locks));
}
else
{
- locks = owner->locks;
- nlocks = owner->nlocks;
- }
+ dlist_foreach_modify(iter, &owner->locks)
+ {
+ LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
- if (isCommit)
- LockReassignCurrentOwner(locks, nlocks);
- else
- LockReleaseCurrentOwner(locks, nlocks);
+ LockReleaseCurrentOwner(owner, locallockowner);
+ }
+
+ Assert(dlist_is_empty(&owner->locks));
+ }
}
}
else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -860,7 +860,7 @@ ResourceOwnerDelete(ResourceOwner owner)
/* And it better not own any resources, either */
Assert(owner->narr == 0);
Assert(owner->nhash == 0);
- Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
+ Assert(dlist_is_empty(&owner->locks));
/*
* Delete children. The recursive call will delink the child from me, so
@@ -1034,52 +1034,59 @@ ReleaseAuxProcessResourcesCallback(int code, Datum arg)
/*
* Remember that a Local Lock is owned by a ResourceOwner
- *
- * This is different from the generic ResourceOwnerRemember in that the list of
- * locks is only a lossy cache. It can hold up to MAX_RESOWNER_LOCKS entries,
- * and when it overflows, we stop tracking locks. The point of only remembering
- * only up to MAX_RESOWNER_LOCKS entries is that if a lot of locks are held,
- * ResourceOwnerForgetLock doesn't need to scan through a large array to find
- * the entry.
*/
void
-ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
{
- Assert(locallock != NULL);
+ Assert(owner != NULL);
+ Assert(locallockowner != NULL);
- if (owner->nlocks > MAX_RESOWNER_LOCKS)
- return; /* we have already overflowed */
-
- if (owner->nlocks < MAX_RESOWNER_LOCKS)
- owner->locks[owner->nlocks] = locallock;
- else
+#ifdef USE_ASSERT_CHECKING
{
- /* overflowed */
+ dlist_iter iter;
+
+ dlist_foreach(iter, &owner->locks)
+ {
+ LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+ Assert(i->locallock != locallockowner->locallock);
+ }
}
- owner->nlocks++;
+#endif
+
+ dlist_push_tail(&owner->locks, &locallockowner->resowner_node);
}
/*
- * Forget that a Local Lock is owned by a ResourceOwner
+ * Forget that a Local Lock is owned by the given LOCALLOCKOWNER.
*/
void
-ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerForgetLock(LOCALLOCKOWNER *locallockowner)
{
- int i;
+#ifdef USE_ASSERT_CHECKING
+ ResourceOwner owner;
+
+ Assert(locallockowner != NULL);
- if (owner->nlocks > MAX_RESOWNER_LOCKS)
- return; /* we have overflowed */
+ owner = locallockowner->owner;
- Assert(owner->nlocks > 0);
- for (i = owner->nlocks - 1; i >= 0; i--)
{
- if (locallock == owner->locks[i])
+ dlist_iter iter;
+ bool found = false;
+
+ dlist_foreach(iter, &owner->locks)
{
- owner->locks[i] = owner->locks[owner->nlocks - 1];
- owner->nlocks--;
- return;
+ LOCALLOCKOWNER *owner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+ if (locallockowner == owner)
+ {
+ Assert(!found);
+ found = true;
+ }
}
+
+ Assert(found);
}
- elog(ERROR, "lock reference %p is not owned by resource owner %s",
- locallock, owner->name);
+#endif
+ dlist_delete(&locallockowner->resowner_node);
}
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 590c026b5bf..e7f5b25f338 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -24,6 +24,7 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
+#include "lib/ilist.h"
/* struct PGPROC is declared in proc.h, but must forward-reference it */
typedef struct PGPROC PGPROC;
@@ -349,10 +350,6 @@ typedef struct LOCK
* Otherwise, proclock objects whose holdMasks are zero are recycled
* as soon as convenient.
*
- * releaseMask is workspace for LockReleaseAll(): it shows the locks due
- * to be released during the current call. This must only be examined or
- * set by the backend owning the PROCLOCK.
- *
* Each PROCLOCK object is linked into lists for both the associated LOCK
* object and the owning PGPROC object. Note that the PROCLOCK is entered
* into these lists as soon as it is created, even if no lock has yet been
@@ -374,7 +371,6 @@ typedef struct PROCLOCK
/* data */
PGPROC *groupLeader; /* proc's lock group leader, or proc itself */
LOCKMASK holdMask; /* bitmask for lock types currently held */
- LOCKMASK releaseMask; /* bitmask for lock types to be released */
dlist_node lockLink; /* list link in LOCK's list of proclocks */
dlist_node procLink; /* list link in PGPROC's list of proclocks */
} PROCLOCK;
@@ -420,6 +416,13 @@ typedef struct LOCALLOCKOWNER
* Must use a forward struct reference to avoid circularity.
*/
struct ResourceOwnerData *owner;
+
+ dlist_node resowner_node; /* dlist link for ResourceOwner.locks */
+
+ dlist_node locallock_node; /* dlist link for LOCALLOCK.locallockowners */
+
+ struct LOCALLOCK *locallock; /* pointer to the corresponding LOCALLOCK */
+
int64 nLocks; /* # of times held by this owner */
} LOCALLOCKOWNER;
@@ -433,9 +436,9 @@ typedef struct LOCALLOCK
LOCK *lock; /* associated LOCK object, if any */
PROCLOCK *proclock; /* associated PROCLOCK object, if any */
int64 nLocks; /* total number of times lock is held */
- int numLockOwners; /* # of relevant ResourceOwners */
- int maxLockOwners; /* allocated size of array */
- LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
+
+ dlist_head locallockowners; /* dlist of LOCALLOCKOWNER */
+
bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
bool lockCleared; /* we read all sinval msgs for lock */
} LOCALLOCK;
@@ -564,10 +567,16 @@ extern void AbortStrongLockAcquire(void);
extern void MarkLockClear(LOCALLOCK *locallock);
extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
-extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
+
+#ifdef USE_ASSERT_CHECKING
+extern void LockAssertNoneHeld(bool isCommit);
+#endif
+
extern void LockReleaseSession(LOCKMETHODID lockmethodid);
-extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
-extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+struct ResourceOwnerData;
+extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner,
+ LOCALLOCKOWNER *locallockowner);
+extern void LockReassignCurrentOwner(LOCALLOCKOWNER *locallockowner);
extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
#ifdef USE_ASSERT_CHECKING
extern HTAB *GetLockMethodLocalHash(void);
@@ -576,8 +585,8 @@ extern bool LockHasWaiters(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
LOCKMODE lockmode, int *countp);
-extern void AtPrepare_Locks(void);
-extern void PostPrepare_Locks(TransactionId xid);
+extern HTAB *AtPrepare_Locks(void);
+extern void PostPrepare_Locks(TransactionId xid, HTAB *sessionandxactlocks);
extern bool LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock, PROCLOCK *proclock);
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 0735480214e..cf368cedca4 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -159,8 +159,8 @@ extern void CreateAuxProcessResourceOwner(void);
extern void ReleaseAuxProcessResources(bool isCommit);
/* special support for local lock management */
-struct LOCALLOCK;
-extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCK *locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *locallock);
+struct LOCALLOCKOWNER;
+extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCKOWNER *locallockowner);
+extern void ResourceOwnerForgetLock(struct LOCALLOCKOWNER *locallockowner);
#endif /* RESOWNER_H */
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index ae58438ec76..29b1cce54b7 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -36,8 +36,9 @@ extern void ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer);
extern void ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer);
/* support for local lock management */
-extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+extern void ResourceOwnerRememberLock(ResourceOwner owner,
+ LOCALLOCKOWNER *locallock);
+extern void ResourceOwnerForgetLock(LOCALLOCKOWNER *locallock);
/* support for catcache refcount management */
extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);