Hi, In [1] Robert justifiably complained about the use of PROC_QUEUE. I've previously been bothered by this in [2], but didn't get around to finishing the patches.
One thing that made me hesitate was the naming of the function for a) checking whether a dlist_node is a list, b) initializing and deleting nodes in a way to allow for a). My patch adds dlist_node_init(), dlist_delete_thoroughly() / dlist_delete_from_thoroughly() / dclist_delete_from_thoroughly() and dlist_node_is_detached(). Thomas proposed dlist_delete_and_reinit() and dlist_node_is_linked() instead. Attached is a revised version of the patches from [2]. I left the naming of the detached / thoroughly as it was before, for now. Another alternative could be to try to just get rid of needing the detached state at all, although that likely would make the code changes bigger. I've switched the PROC_QUEUE uses to dclist, which we only got recently. The prior version of the patchset contained a patch to remove the use of the size field of PROC_QUEUE, as it's only needed in a few places. But it seems easier to just replace it with dclist for now. Robert had previously complained about the ilist.h patch constifying some functions. I don't really understand the complaint in this case - none of the cases should require constifying outside code. It just allows to replace things like SHMQueueEmpty() which were const, because there's a few places that get passed a const PGPROC. There's more that could be constified (removing the need for one unconstify() in predicate.c - but that seems like a lot more work with less clear benefit. Either way, I split the constification into a separate patch. Comments? Greetings, Andres Freund [1] https://www.postgresql.org/message-id/20221117201304.vemjfsxaizmt3zbb%40awork3.anarazel.de [2] https://www.postgresql.org/message-id/20200211042229.msv23badgqljrdg2%40alap3.anarazel.de
>From e5efdebf82e8e857a21e675bcdccd3c129d16ab2 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 14:38:02 -0800 Subject: [PATCH v2 1/9] Add detached node functions to ilist These allow to test whether an element is in a list by checking whether prev/next are NULL. This is needed to replace SHM_QUEUE uses with ilist.h --- src/include/lib/ilist.h | 63 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/include/lib/ilist.h b/src/include/lib/ilist.h index 3c543e7c365..a1a4abf0609 100644 --- a/src/include/lib/ilist.h +++ b/src/include/lib/ilist.h @@ -316,6 +316,17 @@ dlist_init(dlist_head *head) head->head.next = head->head.prev = &head->head; } +/* + * Initialize a doubly linked list element. + * + * This is only needed when dlist_node_is_detached() may be needed. + */ +static inline void +dlist_node_init(dlist_node *node) +{ + node->next = node->prev = NULL; +} + /* * Is the list empty? * @@ -397,6 +408,19 @@ dlist_delete(dlist_node *node) node->next->prev = node->prev; } +/* + * Like dlist_delete(), but also sets next/prev to NULL to signal not being in + * a list. + */ +static inline void +dlist_delete_thoroughly(dlist_node *node) +{ + node->prev->next = node->next; + node->next->prev = node->prev; + node->next = NULL; + node->prev = NULL; +} + /* * Same as dlist_delete, but performs checks in ILIST_DEBUG builds to ensure * that 'node' belongs to 'head'. @@ -408,6 +432,17 @@ dlist_delete_from(dlist_head *head, dlist_node *node) dlist_delete(node); } +/* + * Like dlist_delete_from, but also sets next/prev to NULL to signal not + * being in a list. + */ +static inline void +dlist_delete_from_thoroughly(dlist_head *head, dlist_node *node) +{ + dlist_member_check(head, node); + dlist_delete_thoroughly(node); +} + /* * Remove and return the first node from a list (there must be one). */ @@ -480,6 +515,21 @@ dlist_has_prev(dlist_head *head, dlist_node *node) return node->prev != &head->head; } +/* + * Check if node is detached. A node is only detached if it either has been + * initialized with dlist_init_node(), or deleted with + * dlist_delete_thoroughly() / dlist_delete_from_thoroughly() / + * dclist_delete_from_thoroughly(). + */ +static inline bool +dlist_node_is_detached(const dlist_node *node) +{ + Assert((node->next == NULL && node->prev == NULL) || + (node->next != NULL && node->prev != NULL)); + + return node->next == NULL; +} + /* * Return the next node in the list (there must be one). */ @@ -718,6 +768,19 @@ dclist_delete_from(dclist_head *head, dlist_node *node) head->count--; } +/* + * Like dclist_delete_from(), but also sets next/prev to NULL to signal not + * being in a list. + */ +static inline void +dclist_delete_from_thoroughly(dclist_head *head, dlist_node *node) +{ + Assert(head->count > 0); + + dlist_delete_from_thoroughly(&head->dlist, node); + head->count--; +} + /* * dclist_pop_head_node * Remove and return the first node from a list (there must be one). -- 2.38.0
>From 5266ce81a47c75e5c8d42c54ef60bf0afcaed755 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 21:32:43 -0800 Subject: [PATCH v2 2/9] Constify some ilist.h functions This is required for some of the replacements of SHM_QUEUE, because of code dealing with const PGPROC's. --- src/include/lib/ilist.h | 22 +++++++++++----------- src/backend/lib/ilist.c | 8 ++++---- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/include/lib/ilist.h b/src/include/lib/ilist.h index a1a4abf0609..64596ad0f1e 100644 --- a/src/include/lib/ilist.h +++ b/src/include/lib/ilist.h @@ -290,8 +290,8 @@ extern void slist_delete(slist_head *head, slist_node *node); #ifdef ILIST_DEBUG extern void dlist_member_check(dlist_head *head, dlist_node *node); -extern void dlist_check(dlist_head *head); -extern void slist_check(slist_head *head); +extern void dlist_check(const dlist_head *head); +extern void slist_check(const slist_head *head); #else /* * These seemingly useless casts to void are here to keep the compiler quiet @@ -333,7 +333,7 @@ dlist_node_init(dlist_node *node) * An empty list has either its first 'next' pointer set to NULL, or to itself. */ static inline bool -dlist_is_empty(dlist_head *head) +dlist_is_empty(const dlist_head *head) { dlist_check(head); @@ -500,7 +500,7 @@ dlist_move_tail(dlist_head *head, dlist_node *node) * Caution: unreliable if 'node' is not in the list. */ static inline bool -dlist_has_next(dlist_head *head, dlist_node *node) +dlist_has_next(const dlist_head *head, const dlist_node *node) { return node->next != &head->head; } @@ -510,7 +510,7 @@ dlist_has_next(dlist_head *head, dlist_node *node) * Caution: unreliable if 'node' is not in the list. */ static inline bool -dlist_has_prev(dlist_head *head, dlist_node *node) +dlist_has_prev(const dlist_head *head, const dlist_node *node) { return node->prev != &head->head; } @@ -679,7 +679,7 @@ dclist_init(dclist_head *head) * Returns true if the list is empty, otherwise false. */ static inline bool -dclist_is_empty(dclist_head *head) +dclist_is_empty(const dclist_head *head) { Assert(dlist_is_empty(&head->dlist) == (head->count == 0)); return (head->count == 0); @@ -836,7 +836,7 @@ dclist_move_tail(dclist_head *head, dlist_node *node) * Caution: 'node' must be a member of 'head'. */ static inline bool -dclist_has_next(dclist_head *head, dlist_node *node) +dclist_has_next(const dclist_head *head, const dlist_node *node) { dlist_member_check(&head->dlist, node); Assert(head->count > 0); @@ -851,7 +851,7 @@ dclist_has_next(dclist_head *head, dlist_node *node) * Caution: 'node' must be a member of 'head'. */ static inline bool -dclist_has_prev(dclist_head *head, dlist_node *node) +dclist_has_prev(const dclist_head *head, const dlist_node *node) { dlist_member_check(&head->dlist, node); Assert(head->count > 0); @@ -929,7 +929,7 @@ dclist_tail_node(dclist_head *head) * Returns the stored number of entries in 'head' */ static inline uint32 -dclist_count(dclist_head *head) +dclist_count(const dclist_head *head) { Assert(dlist_is_empty(&head->dlist) == (head->count == 0)); @@ -992,7 +992,7 @@ slist_init(slist_head *head) * Is the list empty? */ static inline bool -slist_is_empty(slist_head *head) +slist_is_empty(const slist_head *head) { slist_check(head); @@ -1040,7 +1040,7 @@ slist_pop_head_node(slist_head *head) * Check whether 'node' has a following node. */ static inline bool -slist_has_next(slist_head *head, slist_node *node) +slist_has_next(const slist_head *head, const slist_node *node) { slist_check(head); diff --git a/src/backend/lib/ilist.c b/src/backend/lib/ilist.c index e8ea9811764..fc78e06b1f9 100644 --- a/src/backend/lib/ilist.c +++ b/src/backend/lib/ilist.c @@ -73,9 +73,9 @@ dlist_member_check(dlist_head *head, dlist_node *node) * Verify integrity of a doubly linked list */ void -dlist_check(dlist_head *head) +dlist_check(const dlist_head *head) { - dlist_node *cur; + const dlist_node *cur; if (head == NULL) elog(ERROR, "doubly linked list head address is NULL"); @@ -110,9 +110,9 @@ dlist_check(dlist_head *head) * Verify integrity of a singly linked list */ void -slist_check(slist_head *head) +slist_check(const slist_head *head) { - slist_node *cur; + const slist_node *cur; if (head == NULL) elog(ERROR, "singly linked list head address is NULL"); -- 2.38.0
>From 2b6042f461e3c265e4c6a6089f8126bc9aae7f58 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 15:15:10 -0800 Subject: [PATCH v2 3/9] Use dclist instead of PROC_QUEUE / SHM_QUEUE for heavyweight locks --- src/include/storage/lock.h | 15 +- src/include/storage/proc.h | 17 +- src/backend/access/transam/twophase.c | 4 +- src/backend/storage/lmgr/deadlock.c | 89 ++++------ src/backend/storage/lmgr/lock.c | 155 ++++++----------- src/backend/storage/lmgr/proc.c | 237 +++++++++----------------- src/tools/pgindent/typedefs.list | 1 - 7 files changed, 183 insertions(+), 335 deletions(-) diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index e4e1495b245..52afb5e957d 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -18,6 +18,7 @@ #error "lock.h may not be included from frontend code" #endif +#include "lib/ilist.h" #include "storage/backendid.h" #include "storage/lockdefs.h" #include "storage/lwlock.h" @@ -27,12 +28,6 @@ /* struct PGPROC is declared in proc.h, but must forward-reference it */ typedef struct PGPROC PGPROC; -typedef struct PROC_QUEUE -{ - SHM_QUEUE links; /* head of list of PGPROC objects */ - int size; /* number of entries in list */ -} PROC_QUEUE; - /* GUC variables */ extern PGDLLIMPORT int max_locks_per_xact; @@ -305,8 +300,8 @@ typedef struct LOCK /* data */ LOCKMASK grantMask; /* bitmask for lock types already granted */ LOCKMASK waitMask; /* bitmask for lock types awaited */ - SHM_QUEUE procLocks; /* list of PROCLOCK objects assoc. with lock */ - PROC_QUEUE waitProcs; /* list of PGPROC objects waiting on lock */ + dlist_head procLocks; /* list of PROCLOCK objects assoc. with lock */ + dclist_head waitProcs; /* list of PGPROC objects waiting on lock */ int requested[MAX_LOCKMODES]; /* counts of requested locks */ int nRequested; /* total of requested[] array */ int granted[MAX_LOCKMODES]; /* counts of granted locks */ @@ -367,8 +362,8 @@ typedef struct PROCLOCK PGPROC *groupLeader; /* proc's lock group leader, or proc itself */ LOCKMASK holdMask; /* bitmask for lock types currently held */ LOCKMASK releaseMask; /* bitmask for lock types to be released */ - SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */ - SHM_QUEUE procLink; /* list link in PGPROC's list of proclocks */ + dlist_node lockLink; /* list link in LOCK's list of proclocks */ + dlist_node procLink; /* list link in PGPROC's list of proclocks */ } PROCLOCK; #define PROCLOCK_LOCKMETHOD(proclock) \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 8d096fdeeb1..7005770da79 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -167,8 +167,8 @@ typedef enum struct PGPROC { /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ - SHM_QUEUE links; /* list link if process is in a list */ - PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ + dlist_node links; /* list link if process is in a list */ + dlist_head *procgloballist; /* procglobal list that owns this PGPROC */ PGSemaphore sem; /* ONE semaphore to sleep on */ ProcWaitStatus waitStatus; @@ -255,7 +255,7 @@ struct PGPROC * linked into one of these lists, according to the partition number of * their lock. */ - SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS]; + dlist_head myProcLocks[NUM_LOCK_PARTITIONS]; XidCacheStatus subxidStatus; /* mirrored with * ProcGlobal->subxidStates[i] */ @@ -385,13 +385,13 @@ typedef struct PROC_HDR /* Length of allProcs array */ uint32 allProcCount; /* Head of list of free PGPROC structures */ - PGPROC *freeProcs; + dlist_head freeProcs; /* Head of list of autovacuum's free PGPROC structures */ - PGPROC *autovacFreeProcs; + dlist_head autovacFreeProcs; /* Head of list of bgworker free PGPROC structures */ - PGPROC *bgworkerFreeProcs; + dlist_head bgworkerFreeProcs; /* Head of list of walsender free PGPROC structures */ - PGPROC *walsenderFreeProcs; + dlist_head walsenderFreeProcs; /* First pgproc waiting for group XID clear */ pg_atomic_uint32 procArrayGroupFirst; /* First pgproc waiting for group transaction status update */ @@ -448,9 +448,8 @@ extern int GetStartupBufferPinWaitBufId(void); extern bool HaveNFreeProcs(int n); extern void ProcReleaseLocks(bool isCommit); -extern void ProcQueueInit(PROC_QUEUE *queue); extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); -extern PGPROC *ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus); +extern void ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus); extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void CheckDeadLockAlert(void); extern bool IsWaitingForLock(void); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 803d169f578..60ee7d00a61 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -461,7 +461,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, /* Initialize the PGPROC entry */ MemSet(proc, 0, sizeof(PGPROC)); proc->pgprocno = gxact->pgprocno; - SHMQueueElemInit(&(proc->links)); + dlist_node_init(&proc->links); proc->waitStatus = PROC_WAIT_STATUS_OK; if (LocalTransactionIdIsValid(MyProc->lxid)) { @@ -491,7 +491,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->waitProcLock = NULL; pg_atomic_init_u64(&proc->waitStart, 0); for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - SHMQueueInit(&(proc->myProcLocks[i])); + dlist_init(&proc->myProcLocks[i]); /* subxid data must be filled later by GXactLoadSubxactData */ proc->subxidStatus.overflowed = false; proc->subxidStatus.count = 0; diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index cd9c0418eca..17afddd0a9f 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -216,9 +216,6 @@ InitDeadLockChecking(void) DeadLockState DeadLockCheck(PGPROC *proc) { - int i, - j; - /* Initialize to "no constraints" */ nCurConstraints = 0; nPossibleConstraints = 0; @@ -246,26 +243,23 @@ DeadLockCheck(PGPROC *proc) } /* Apply any needed rearrangements of wait queues */ - for (i = 0; i < nWaitOrders; i++) + for (int i = 0; i < nWaitOrders; i++) { LOCK *lock = waitOrders[i].lock; PGPROC **procs = waitOrders[i].procs; int nProcs = waitOrders[i].nProcs; - PROC_QUEUE *waitQueue = &(lock->waitProcs); + dclist_head *waitQueue = &lock->waitProcs; - Assert(nProcs == waitQueue->size); + Assert(nProcs == dclist_count(waitQueue)); #ifdef DEBUG_DEADLOCK PrintLockQueue(lock, "DeadLockCheck:"); #endif /* Reset the queue and re-add procs in the desired order */ - ProcQueueInit(waitQueue); - for (j = 0; j < nProcs; j++) - { - SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links)); - waitQueue->size++; - } + dclist_init(waitQueue); + for (int j = 0; j < nProcs; j++) + dclist_push_tail(waitQueue, &procs[j]->links); #ifdef DEBUG_DEADLOCK PrintLockQueue(lock, "rearranged to:"); @@ -544,11 +538,8 @@ FindLockCycleRecurseMember(PGPROC *checkProc, { PGPROC *proc; LOCK *lock = checkProc->waitLock; - PROCLOCK *proclock; - SHM_QUEUE *procLocks; + dlist_iter proclock_iter; LockMethod lockMethodTable; - PROC_QUEUE *waitQueue; - int queue_size; int conflictMask; int i; int numLockModes, @@ -571,13 +562,9 @@ FindLockCycleRecurseMember(PGPROC *checkProc, * Scan for procs that already hold conflicting locks. These are "hard" * edges in the waits-for graph. */ - procLocks = &(lock->procLocks); - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (proclock) + dlist_foreach(proclock_iter, &lock->procLocks) { + PROCLOCK *proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur); PGPROC *leader; proc = proclock->tag.myProc; @@ -636,9 +623,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc, } } } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* @@ -660,8 +644,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc, { /* Use the given hypothetical wait queue order */ PGPROC **procs = waitOrders[i].procs; - - queue_size = waitOrders[i].nProcs; + int queue_size = waitOrders[i].nProcs; for (i = 0; i < queue_size; i++) { @@ -711,9 +694,11 @@ FindLockCycleRecurseMember(PGPROC *checkProc, else { PGPROC *lastGroupMember = NULL; + dlist_iter proc_iter; + dclist_head *waitQueue; /* Use the true lock wait queue order */ - waitQueue = &(lock->waitProcs); + waitQueue = &lock->waitProcs; /* * Find the last member of the lock group that is present in the wait @@ -726,13 +711,14 @@ FindLockCycleRecurseMember(PGPROC *checkProc, lastGroupMember = checkProc; else { - proc = (PGPROC *) waitQueue->links.next; - queue_size = waitQueue->size; - while (queue_size-- > 0) + dlist_iter iter; + + dclist_foreach(iter, waitQueue) { + proc = dlist_container(PGPROC, links, iter.cur); + if (proc->lockGroupLeader == checkProcLeader) lastGroupMember = proc; - proc = (PGPROC *) proc->links.next; } Assert(lastGroupMember != NULL); } @@ -740,12 +726,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc, /* * OK, now rescan (or scan) the queue to identify the soft conflicts. */ - queue_size = waitQueue->size; - proc = (PGPROC *) waitQueue->links.next; - while (queue_size-- > 0) + dclist_foreach(proc_iter, waitQueue) { PGPROC *leader; + proc = dlist_container(PGPROC, links, proc_iter.cur); + leader = proc->lockGroupLeader == NULL ? proc : proc->lockGroupLeader; @@ -779,8 +765,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc, return true; } } - - proc = (PGPROC *) proc->links.next; } } @@ -832,8 +816,8 @@ ExpandConstraints(EDGE *constraints, /* No, so allocate a new list */ waitOrders[nWaitOrders].lock = lock; waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; - waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; - nWaitOrderProcs += lock->waitProcs.size; + waitOrders[nWaitOrders].nProcs = dclist_count(&lock->waitProcs); + nWaitOrderProcs += dclist_count(&lock->waitProcs); Assert(nWaitOrderProcs <= MaxBackends); /* @@ -880,8 +864,8 @@ TopoSort(LOCK *lock, int nConstraints, PGPROC **ordering) /* output argument */ { - PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; + dclist_head *waitQueue = &lock->waitProcs; + int queue_size = dclist_count(waitQueue); PGPROC *proc; int i, j, @@ -889,14 +873,16 @@ TopoSort(LOCK *lock, k, kk, last; + dlist_iter proc_iter; /* First, fill topoProcs[] array with the procs in their current order */ - proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < queue_size; i++) + i = 0; + dclist_foreach(proc_iter, waitQueue) { - topoProcs[i] = proc; - proc = (PGPROC *) proc->links.next; + proc = dlist_container(PGPROC, links, proc_iter.cur); + topoProcs[i++] = proc; } + Assert(i == queue_size); /* * Scan the constraints, and for each proc in the array, generate a count @@ -1066,17 +1052,16 @@ TopoSort(LOCK *lock, static void PrintLockQueue(LOCK *lock, const char *info) { - PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; - PGPROC *proc; - int i; + dclist_head *waitQueue = &lock->waitProcs; + dlist_iter proc_iter; printf("%s lock %p queue ", info, lock); - proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < queue_size; i++) + + dclist_foreach(proc_iter, waitQueue) { + PGPROC *proc = dlist_container(PGPROC, links, proc_iter.cur); + printf(" %d", proc->pid); - proc = (PGPROC *) proc->links.next; } printf("\n"); fflush(stdout); diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 3d1049cf756..338e6df409d 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -345,7 +345,7 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) lock->granted[1], lock->granted[2], lock->granted[3], lock->granted[4], lock->granted[5], lock->granted[6], lock->granted[7], lock->nGranted, - lock->waitProcs.size, + dclist_count(&lock->waitProcs), LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]); } @@ -1058,8 +1058,8 @@ LockAcquireExtended(const LOCKTAG *locktag, uint32 proclock_hashcode; proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); - SHMQueueDelete(&proclock->lockLink); - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->lockLink); + dlist_delete(&proclock->procLink); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, @@ -1194,8 +1194,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->procLocks)); - ProcQueueInit(&(lock->waitProcs)); + dlist_init(&lock->procLocks); + dclist_init(&lock->waitProcs); lock->nRequested = 0; lock->nGranted = 0; MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); @@ -1237,7 +1237,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -1270,9 +1270,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); - SHMQueueInsertBefore(&(proc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&lock->procLocks, &proclock->lockLink); + dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else @@ -1427,9 +1426,8 @@ LockCheckConflicts(LockMethod lockMethodTable, int conflictMask = lockMethodTable->conflictTab[lockmode]; int conflictsRemaining[MAX_LOCKMODES]; int totalConflictsRemaining = 0; + dlist_iter proclock_iter; int i; - SHM_QUEUE *procLocks; - PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, @@ -1501,11 +1499,11 @@ LockCheckConflicts(LockMethod lockMethodTable, * shared memory state more complex (and larger) but it doesn't seem worth * it. */ - procLocks = &(lock->procLocks); - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); - while (otherproclock != NULL) + dlist_foreach(proclock_iter, &lock->procLocks) { + PROCLOCK *otherproclock = + dlist_container(PROCLOCK, lockLink, proclock_iter.cur); + if (proclock != otherproclock && proclock->groupLeader == otherproclock->groupLeader && (otherproclock->holdMask & conflictMask) != 0) @@ -1530,9 +1528,6 @@ LockCheckConflicts(LockMethod lockMethodTable, return false; } } - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, &otherproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* Nope, it's a real conflict. */ @@ -1645,8 +1640,8 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 proclock_hashcode; PROCLOCK_PRINT("CleanUpLock: deleting", proclock); - SHMQueueDelete(&proclock->lockLink); - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->lockLink); + dlist_delete(&proclock->procLink); proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), @@ -1663,7 +1658,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock, * object. */ LOCK_PRINT("CleanUpLock: deleting", lock, 0); - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&lock->procLocks)); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -1926,12 +1921,11 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); Assert(proc->links.next != NULL); Assert(waitLock); - Assert(waitLock->waitProcs.size > 0); + Assert(!dclist_is_empty(&waitLock->waitProcs)); Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); /* Remove proc from lock's wait queue */ - SHMQueueDelete(&(proc->links)); - waitLock->waitProcs.size--; + dclist_delete_from(&waitLock->waitProcs, &proc->links); /* Undo increments of request counts by waiting process */ Assert(waitLock->nRequested > 0); @@ -2185,7 +2179,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) numLockModes; LOCALLOCK *locallock; LOCK *lock; - PROCLOCK *proclock; int partition; bool have_fast_path_lwlock = false; @@ -2342,8 +2335,8 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; - SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); - PROCLOCK *nextplock; + dlist_head *procLocks = &MyProc->myProcLocks[partition]; + dlist_mutable_iter proclock_iter; partitionLock = LockHashPartitionLockByIndex(partition); @@ -2366,24 +2359,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) * locallock situation, we lose that guarantee for fast-path locks. * This is not ideal. */ - if (SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)) == NULL) + if (dlist_is_empty(procLocks)) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - proclock; - proclock = nextplock) + dlist_foreach_modify(proclock_iter, procLocks) { + PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur); bool wakeupNeeded = false; - /* Get link first, since we may unlink/delete this proclock */ - nextplock = (PROCLOCK *) - SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); - Assert(proclock->tag.myProc == MyProc); lock = proclock->tag.myLock; @@ -2918,7 +2903,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) LockMethod lockMethodTable; LOCK *lock; LOCKMASK conflictMask; - SHM_QUEUE *procLocks; + dlist_iter proclock_iter; PROCLOCK *proclock; uint32 hashcode; LWLock *partitionLock; @@ -3064,14 +3049,10 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) /* * Examine each existing holder (or awaiter) of the lock. */ - - procLocks = &(lock->procLocks); - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (proclock) + dlist_foreach(proclock_iter, &lock->procLocks) { + proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur); + if (conflictMask & proclock->holdMask) { PGPROC *proc = proclock->tag.myProc; @@ -3097,9 +3078,6 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) /* else, xact already committed or aborted */ } } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); @@ -3498,8 +3476,8 @@ PostPrepare_Locks(TransactionId xid) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; - SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); - PROCLOCK *nextplock; + dlist_head *procLocks = &(MyProc->myProcLocks[partition]); + dlist_mutable_iter proclock_iter; partitionLock = LockHashPartitionLockByIndex(partition); @@ -3511,21 +3489,14 @@ PostPrepare_Locks(TransactionId xid) * another backend is adding something to our lists now. For safety, * though, we code this the same way as in LockReleaseAll. */ - if (SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)) == NULL) + if (dlist_is_empty(procLocks)) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - proclock; - proclock = nextplock) + dlist_foreach_modify(proclock_iter, procLocks) { - /* Get link first, since we may unlink/relink this proclock */ - nextplock = (PROCLOCK *) - SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); + proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur); Assert(proclock->tag.myProc == MyProc); @@ -3563,7 +3534,7 @@ PostPrepare_Locks(TransactionId xid) * same hash partition, cf proclock_hash(). So the partition lock * we already hold is sufficient for this. */ - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->procLink); /* * Create the new hash key for the proclock. @@ -3589,8 +3560,7 @@ PostPrepare_Locks(TransactionId xid) elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks"); /* Re-link into the new proc's proclock list */ - SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&newproc->myProcLocks[partition], &proclock->procLink); PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); } /* loop over PROCLOCKs within this partition */ @@ -3919,12 +3889,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) { LOCK *theLock = blocked_proc->waitLock; BlockedProcData *bproc; - SHM_QUEUE *procLocks; - PROCLOCK *proclock; - PROC_QUEUE *waitQueue; - PGPROC *queued_proc; + dlist_iter proclock_iter; + dlist_iter proc_iter; + dclist_head *waitQueue; int queue_size; - int i; /* Nothing to do if this proc is not blocked */ if (theLock == NULL) @@ -3942,11 +3910,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) */ /* Collect all PROCLOCKs associated with theLock */ - procLocks = &(theLock->procLocks); - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - while (proclock) + dlist_foreach(proclock_iter, &theLock->procLocks) { + PROCLOCK *proclock = + dlist_container(PROCLOCK, lockLink, proclock_iter.cur); PGPROC *proc = proclock->tag.myProc; LOCK *lock = proclock->tag.myLock; LockInstanceData *instance; @@ -3971,14 +3938,11 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) instance->leaderPid = proclock->groupLeader->pid; instance->fastpath = false; data->nlocks++; - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ waitQueue = &(theLock->waitProcs); - queue_size = waitQueue->size; + queue_size = dclist_count(waitQueue); if (queue_size > data->maxpids - data->npids) { @@ -3989,9 +3953,9 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) } /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */ - queued_proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < queue_size; i++) + dclist_foreach(proc_iter, waitQueue) { + PGPROC *queued_proc = dlist_container(PGPROC, links, proc_iter.cur); if (queued_proc == blocked_proc) break; data->waiter_pids[data->npids++] = queued_proc->pid; @@ -4113,9 +4077,6 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode) void DumpLocks(PGPROC *proc) { - SHM_QUEUE *procLocks; - PROCLOCK *proclock; - LOCK *lock; int i; if (proc == NULL) @@ -4126,23 +4087,17 @@ DumpLocks(PGPROC *proc) for (i = 0; i < NUM_LOCK_PARTITIONS; i++) { - procLocks = &(proc->myProcLocks[i]); + dlist_head *procLocks = &proc->myProcLocks[i]; + dlist_iter iter; - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - while (proclock) + dlist_foreach(iter, procLocks) { + PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, iter.cur); + LOCK *lock = proclock->tag.myLock; + Assert(proclock->tag.myProc == proc); - - lock = proclock->tag.myLock; - PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); - - proclock = (PROCLOCK *) - SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); } } } @@ -4267,8 +4222,8 @@ lock_twophase_recover(TransactionId xid, uint16 info, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->procLocks)); - ProcQueueInit(&(lock->waitProcs)); + dlist_init(&lock->procLocks); + dclist_init(&lock->waitProcs); lock->nRequested = 0; lock->nGranted = 0; MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); @@ -4310,7 +4265,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&lock->procLocks)); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -4335,9 +4290,9 @@ lock_twophase_recover(TransactionId xid, uint16 info, proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); - SHMQueueInsertBefore(&(proc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&lock->procLocks, &proclock->lockLink); + dlist_push_tail(&proc->myProcLocks[partition], + &proclock->procLink); PROCLOCK_PRINT("lock_twophase_recover: new", proclock); } else diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 0aa304c835c..5ffbd7b8195 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -15,8 +15,6 @@ /* * Interface (a): * ProcSleep(), ProcWakeup(), - * ProcQueueAlloc() -- create a shm queue for sleeping processes - * ProcQueueInit() -- create a queue without allocing memory * * Waiting for a lock causes the backend to be put to sleep. Whoever releases * the lock wakes the process up again (and gives it an error code so it knows @@ -173,10 +171,10 @@ InitProcGlobal(void) * Initialize the data structures. */ ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; - ProcGlobal->freeProcs = NULL; - ProcGlobal->autovacFreeProcs = NULL; - ProcGlobal->bgworkerFreeProcs = NULL; - ProcGlobal->walsenderFreeProcs = NULL; + dlist_init(&ProcGlobal->freeProcs); + dlist_init(&ProcGlobal->autovacFreeProcs); + dlist_init(&ProcGlobal->bgworkerFreeProcs); + dlist_init(&ProcGlobal->walsenderFreeProcs); ProcGlobal->startupBufferPinWaitBufId = -1; ProcGlobal->walwriterLatch = NULL; ProcGlobal->checkpointerLatch = NULL; @@ -214,6 +212,8 @@ InitProcGlobal(void) for (i = 0; i < TotalProcs; i++) { + PGPROC *proc = &procs[i]; + /* Common initialization for all PGPROCs, regardless of type. */ /* @@ -223,11 +223,11 @@ InitProcGlobal(void) */ if (i < MaxBackends + NUM_AUXILIARY_PROCS) { - procs[i].sem = PGSemaphoreCreate(); - InitSharedLatch(&(procs[i].procLatch)); - LWLockInitialize(&(procs[i].fpInfoLock), LWTRANCHE_LOCK_FASTPATH); + proc->sem = PGSemaphoreCreate(); + InitSharedLatch(&(proc->procLatch)); + LWLockInitialize(&(proc->fpInfoLock), LWTRANCHE_LOCK_FASTPATH); } - procs[i].pgprocno = i; + proc->pgprocno = i; /* * Newly created PGPROCs for normal backends, autovacuum and bgworkers @@ -240,46 +240,42 @@ InitProcGlobal(void) if (i < MaxConnections) { /* PGPROC for normal backend, add to freeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; - ProcGlobal->freeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->freeProcs; + dlist_push_head(&ProcGlobal->freeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->freeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; - ProcGlobal->autovacFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->autovacFreeProcs; + dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->autovacFreeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes) { /* PGPROC for bgworker, add to bgworkerFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; - ProcGlobal->bgworkerFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs; + dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->bgworkerFreeProcs; } else if (i < MaxBackends) { /* PGPROC for walsender, add to walsenderFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs; - ProcGlobal->walsenderFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs; + dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->walsenderFreeProcs; } /* Initialize myProcLocks[] shared memory queues. */ for (j = 0; j < NUM_LOCK_PARTITIONS; j++) - SHMQueueInit(&(procs[i].myProcLocks[j])); + dlist_init(&(proc->myProcLocks[j])); /* Initialize lockGroupMembers list. */ - dlist_init(&procs[i].lockGroupMembers); + dlist_init(&proc->lockGroupMembers); /* * Initialize the atomic variables, otherwise, it won't be safe to * access them for backends that aren't currently in use. */ - pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO); - pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO); - pg_atomic_init_u64(&(procs[i].waitStart), 0); + pg_atomic_init_u32(&(proc->procArrayGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u32(&(proc->clogGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u64(&(proc->waitStart), 0); } /* @@ -300,7 +296,7 @@ InitProcGlobal(void) void InitProcess(void) { - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; /* * ProcGlobal should be set up already (if we are a backend, we inherit @@ -333,11 +329,9 @@ InitProcess(void) set_spins_per_delay(ProcGlobal->spins_per_delay); - MyProc = *procgloballist; - - if (MyProc != NULL) + if (!dlist_is_empty(procgloballist)) { - *procgloballist = (PGPROC *) MyProc->links.next; + MyProc = (PGPROC*) dlist_pop_head_node(procgloballist); SpinLockRelease(ProcStructLock); } else @@ -378,7 +372,7 @@ InitProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -408,7 +402,7 @@ InitProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif MyProc->recoveryConflictPending = false; @@ -565,7 +559,7 @@ InitAuxiliaryProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -590,7 +584,7 @@ InitAuxiliaryProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -658,16 +652,15 @@ GetStartupBufferPinWaitBufId(void) bool HaveNFreeProcs(int n) { - PGPROC *proc; + dlist_iter iter; SpinLockAcquire(ProcStructLock); - proc = ProcGlobal->freeProcs; - - while (n > 0 && proc != NULL) + dlist_foreach(iter, &ProcGlobal->freeProcs) { - proc = (PGPROC *) proc->links.next; n--; + if (n == 0) + break; } SpinLockRelease(ProcStructLock); @@ -730,7 +723,7 @@ LockErrorCleanup(void) partitionLock = LockHashPartitionLock(lockAwaited->hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); - if (MyProc->links.next != NULL) + if (!dlist_node_is_detached(&MyProc->links)) { /* We could not have been granted the lock yet */ RemoveFromWaitQueue(MyProc, lockAwaited->hashcode); @@ -803,7 +796,7 @@ static void ProcKill(int code, Datum arg) { PGPROC *proc; - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; Assert(MyProc != NULL); @@ -816,7 +809,7 @@ ProcKill(int code, Datum arg) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -832,7 +825,7 @@ ProcKill(int code, Datum arg) /* * Detach from any lock group of which we are a member. If the leader - * exist before all other group members, its PGPROC will remain allocated + * exits before all other group members, its PGPROC will remain allocated * until the last group process exits; that process must return the * leader's PGPROC to the appropriate list. */ @@ -853,8 +846,7 @@ ProcKill(int code, Datum arg) /* Leader exited first; return its PGPROC. */ SpinLockAcquire(ProcStructLock); - leader->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = leader; + dlist_push_head(procgloballist, &leader->links); SpinLockRelease(ProcStructLock); } } @@ -893,8 +885,7 @@ ProcKill(int code, Datum arg) Assert(dlist_is_empty(&proc->lockGroupMembers)); /* Return PGPROC structure (and semaphore) to appropriate freelist */ - proc->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = proc; + dlist_push_tail(procgloballist, &proc->links); } /* Update shared estimate of spins_per_delay */ @@ -986,44 +977,6 @@ AuxiliaryPidGetProc(int pid) return result; } -/* - * ProcQueue package: routines for putting processes to sleep - * and waking them up - */ - -/* - * ProcQueueAlloc -- alloc/attach to a shared memory process queue - * - * Returns: a pointer to the queue - * Side Effects: Initializes the queue if it wasn't there before - */ -#ifdef NOT_USED -PROC_QUEUE * -ProcQueueAlloc(const char *name) -{ - PROC_QUEUE *queue; - bool found; - - queue = (PROC_QUEUE *) - ShmemInitStruct(name, sizeof(PROC_QUEUE), &found); - - if (!found) - ProcQueueInit(queue); - - return queue; -} -#endif - -/* - * ProcQueueInit -- initialize a shared memory process queue - */ -void -ProcQueueInit(PROC_QUEUE *queue) -{ - SHMQueueInit(&(queue->links)); - queue->size = 0; -} - /* * ProcSleep -- put a process to sleep on the specified lock @@ -1049,8 +1002,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) PROCLOCK *proclock = locallock->proclock; uint32 hashcode = locallock->hashcode; LWLock *partitionLock = LockHashPartitionLock(hashcode); - PROC_QUEUE *waitQueue = &(lock->waitProcs); - SHM_QUEUE *waitQueuePos; + dclist_head *waitQueue = &lock->waitProcs; + PGPROC *insert_before = NULL; LOCKMASK myHeldLocks = MyProc->heldLocks; TimestampTz standbyWaitStart = 0; bool early_deadlock = false; @@ -1058,7 +1011,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) bool logged_recovery_conflict = false; ProcWaitStatus myWaitStatus; PGPROC *leader = MyProc->lockGroupLeader; - int i; /* * If group locking is in use, locks held by members of my locking group @@ -1072,18 +1024,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (leader != NULL) { - SHM_QUEUE *procLocks = &(lock->procLocks); - PROCLOCK *otherproclock; + dlist_iter iter; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); - while (otherproclock != NULL) + dlist_foreach(iter, &lock->procLocks) { + PROCLOCK *otherproclock; + + otherproclock = dlist_container(PROCLOCK, lockLink, iter.cur); + if (otherproclock->groupLeader == leader) myHeldLocks |= otherproclock->holdMask; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, &otherproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } } @@ -1104,15 +1054,14 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * we are only considering the part of the wait queue before my insertion * point. */ - if (myHeldLocks != 0 && waitQueue->size > 0) + if (myHeldLocks != 0 && !dclist_is_empty(waitQueue)) { LOCKMASK aheadRequests = 0; - SHM_QUEUE *proc_node; + dlist_iter iter; - proc_node = waitQueue->links.next; - for (i = 0; i < waitQueue->size; i++) + dclist_foreach(iter, waitQueue) { - PGPROC *proc = (PGPROC *) proc_node; + PGPROC *proc = dlist_container(PGPROC, links, iter.cur); /* * If we're part of the same locking group as this waiter, its @@ -1120,10 +1069,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * aheadRequests. */ if (leader != NULL && leader == proc->lockGroupLeader) - { - proc_node = proc->links.next; continue; - } + /* Must he wait for me? */ if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) { @@ -1151,31 +1098,23 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) GrantAwaitedLock(); return PROC_WAIT_STATUS_OK; } - /* Break out of loop to put myself before him */ + + /* Put myself into wait queue before conflicting process */ + insert_before = proc; break; } /* Nope, so advance to next waiter */ aheadRequests |= LOCKBIT_ON(proc->waitLockMode); - proc_node = proc->links.next; } - - /* - * If we iterated through the whole queue, cur points to the waitQueue - * head, so we will insert at tail of queue as desired. - */ - waitQueuePos = proc_node; - } - else - { - /* I hold no locks, so I can't push in front of anyone. */ - waitQueuePos = &waitQueue->links; } /* * Insert self into queue, at the position determined above. */ - SHMQueueInsertBefore(waitQueuePos, &MyProc->links); - waitQueue->size++; + if (insert_before) + dclist_insert_before(waitQueue, &insert_before->links, &MyProc->links); + else + dclist_push_tail(waitQueue, &MyProc->links); lock->waitMask |= LOCKBIT_ON(lockmode); @@ -1453,7 +1392,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) long secs; int usecs; long msecs; - SHM_QUEUE *procLocks; + dlist_iter proc_iter; PROCLOCK *curproclock; bool first_holder = true, first_waiter = true; @@ -1483,12 +1422,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LWLockAcquire(partitionLock, LW_SHARED); - procLocks = &(lock->procLocks); - curproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (curproclock) + dlist_foreach(proc_iter, &lock->procLocks) { + curproclock = + dlist_container(PROCLOCK, lockLink, proc_iter.cur); + /* * we are a waiter if myProc->waitProcLock == curproclock; we * are a holder if it is NULL or something different @@ -1519,10 +1457,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) lockHoldersNum++; } - - curproclock = (PROCLOCK *) SHMQueueNext(procLocks, - &curproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); @@ -1657,7 +1591,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * ProcWakeup -- wake up a process by setting its latch. * * Also remove the process from the wait queue and set its links invalid. - * RETURN: the next process in the wait queue. * * The appropriate lock partition lock must be held by caller. * @@ -1666,23 +1599,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * to twiddle the lock's request counts too --- see RemoveFromWaitQueue. * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK. */ -PGPROC * +void ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) { - PGPROC *retProc; + if (dlist_node_is_detached(&proc->links)) + return; - /* Proc should be sleeping ... */ - if (proc->links.prev == NULL || - proc->links.next == NULL) - return NULL; Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); - /* Save next process before we zap the list link */ - retProc = (PGPROC *) proc->links.next; - /* Remove process from wait queue */ - SHMQueueDelete(&(proc->links)); - (proc->waitLock->waitProcs.size)--; + dclist_delete_from_thoroughly(&proc->waitLock->waitProcs, &proc->links); /* Clean up process' state and pass it the ok/fail signal */ proc->waitLock = NULL; @@ -1692,8 +1618,6 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) /* And awaken it */ SetLatch(&proc->procLatch); - - return retProc; } /* @@ -1706,20 +1630,16 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { - PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; - PGPROC *proc; + dclist_head *waitQueue = &lock->waitProcs; LOCKMASK aheadRequests = 0; + dlist_mutable_iter miter; - Assert(queue_size >= 0); - - if (queue_size == 0) + if (dclist_is_empty(waitQueue)) return; - proc = (PGPROC *) waitQueue->links.next; - - while (queue_size-- > 0) + dclist_foreach_modify(miter, waitQueue) { + PGPROC *proc = dlist_container(PGPROC, links, miter.cur); LOCKMODE lockmode = proc->waitLockMode; /* @@ -1732,7 +1652,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { /* OK to waken */ GrantLock(lock, proc->waitProcLock, lockmode); - proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK); + ProcWakeup(proc, PROC_WAIT_STATUS_OK); /* * ProcWakeup removes proc from the lock's waiting process queue @@ -1742,15 +1662,10 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) } else { - /* - * Cannot wake this guy. Remember his request for later checks. - */ + /* Lock conflicts: don't wake, but remember for later checks. */ aheadRequests |= LOCKBIT_ON(lockmode); - proc = (PGPROC *) proc->links.next; } } - - Assert(waitQueue->size >= 0); } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f8302f1ed15..c138745e42c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1844,7 +1844,6 @@ PROCESS_INFORMATION PROCLOCK PROCLOCKTAG PROC_HDR -PROC_QUEUE PSID PSID_AND_ATTRIBUTES PSQL_COMP_CASE -- 2.38.0
>From 9680b22ab7c28343c1c2d1de7d318bae2d667e71 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 15:18:28 -0800 Subject: [PATCH v2 4/9] Use dlist for syncrep queue --- src/include/replication/walsender_private.h | 3 +- src/include/storage/proc.h | 2 +- src/backend/replication/syncrep.c | 89 +++++++++------------ src/backend/replication/walsender.c | 2 +- src/backend/storage/lmgr/proc.c | 2 +- 5 files changed, 41 insertions(+), 57 deletions(-) diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7897c74589e..db801e9f5cf 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -13,6 +13,7 @@ #define _WALSENDER_PRIVATE_H #include "access/xlog.h" +#include "lib/ilist.h" #include "nodes/nodes.h" #include "replication/syncrep.h" #include "storage/latch.h" @@ -89,7 +90,7 @@ typedef struct * Synchronous replication queue with one queue per request type. * Protected by SyncRepLock. */ - SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; + dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; /* * Current location of the head of the queue. All waiters should have a diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 7005770da79..ba9bf1c9508 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -248,7 +248,7 @@ struct PGPROC */ XLogRecPtr waitLSN; /* waiting for this LSN or higher */ int syncRepState; /* wait state for sync rep */ - SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + dlist_node syncRepLinks; /* list link if process is in syncrep queue */ /* * All PROCLOCK objects for locks held or awaited by this backend are diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 1a022b11a06..2c2f1082e97 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) else mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); Assert(WalSndCtl != NULL); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); @@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) * assertions, but better safe than sorry). */ pg_read_barrier(); - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->waitLSN = 0; @@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) static void SyncRepQueueInsert(int mode) { - PGPROC *proc; + dlist_head *queue; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); + queue = &WalSndCtl->SyncRepQueue[mode]; - while (proc) + dlist_reverse_foreach(iter, queue) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* - * Stop at the queue element that we should after to ensure the queue - * is ordered by LSN. + * Stop at the queue element that we should insert after to ensure the + * queue is ordered by LSN. */ if (proc->waitLSN < MyProc->waitLSN) - break; - - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); + { + dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks); + return; + } } - if (proc) - SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); - else - SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); + /* + * If we get here, the list was either empty, or this process needs to be + * at the head. + */ + dlist_push_head(queue, &MyProc->syncRepLinks); } /* @@ -373,8 +374,8 @@ static void SyncRepCancelWait(void) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); MyProc->syncRepState = SYNC_REP_NOT_WAITING; LWLockRelease(SyncRepLock); } @@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void) * First check if we are removed from the queue without the lock to not * slow down backend exit. */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* maybe we have just been removed, so recheck */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); LWLockRelease(SyncRepLock); } @@ -879,20 +880,17 @@ static int SyncRepWakeQueue(bool all, int mode) { volatile WalSndCtlData *walsndctl = WalSndCtl; - PGPROC *proc = NULL; - PGPROC *thisproc = NULL; int numprocs = 0; + dlist_mutable_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); Assert(SyncRepQueueIsOrderedByLSN(mode)); - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Assume the queue is ordered by LSN */ @@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode) return numprocs; /* - * Move to next proc, so we can delete thisproc from the queue. - * thisproc is valid, proc may be NULL after this. + * Remove from queue. */ - thisproc = proc; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); - - /* - * Remove thisproc from queue. - */ - SHMQueueDelete(&(thisproc->syncRepLinks)); + dlist_delete_thoroughly(&proc->syncRepLinks); /* * SyncRepWaitForLSN() reads syncRepState without holding the lock, so @@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode) * Set state to complete; see SyncRepWaitForLSN() for discussion of * the various states. */ - thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; + proc->syncRepState = SYNC_REP_WAIT_COMPLETE; /* * Wake only when we have set state and removed from queue. */ - SetLatch(&(thisproc->procLatch)); + SetLatch(&(proc->procLatch)); numprocs++; } @@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void) static bool SyncRepQueueIsOrderedByLSN(int mode) { - PGPROC *proc = NULL; XLogRecPtr lastLSN; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); lastLSN = 0; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Check the queue is ordered by LSN and that multiple procs don't * have matching LSNs @@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode) return false; lastLSN = proc->waitLSN; - - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); } return true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a81ef6a2014..b5a40d2c439 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3270,7 +3270,7 @@ WalSndShmemInit(void) MemSet(WalSndCtl, 0, WalSndShmemSize()); for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); + dlist_init(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 5ffbd7b8195..39c6685f467 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -410,7 +410,7 @@ InitProcess(void) /* Initialize fields for sync rep */ MyProc->waitLSN = 0; MyProc->syncRepState = SYNC_REP_NOT_WAITING; - SHMQueueElemInit(&(MyProc->syncRepLinks)); + dlist_node_init(&MyProc->syncRepLinks); /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; -- 2.38.0
>From d9066b7a96bb78725e162de1645db486aa55fd67 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 15:25:16 -0800 Subject: [PATCH v2 5/9] Use dlists for predicate locking --- src/include/storage/predicate_internals.h | 48 +- src/backend/storage/lmgr/predicate.c | 691 +++++++--------------- 2 files changed, 239 insertions(+), 500 deletions(-) diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 2416d3c2248..4569a73b0a9 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_INTERNALS_H #define PREDICATE_INTERNALS_H +#include "lib/ilist.h" #include "storage/lock.h" #include "storage/lwlock.h" @@ -84,13 +85,14 @@ typedef struct SERIALIZABLEXACT SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or * no conflict out */ } SeqNo; - SHM_QUEUE outConflicts; /* list of write transactions whose data we + dlist_head outConflicts; /* list of write transactions whose data we * couldn't read. */ - SHM_QUEUE inConflicts; /* list of read transactions which couldn't + dlist_head inConflicts; /* list of read transactions which couldn't * see our write. */ - SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */ - SHM_QUEUE finishedLink; /* list link in + dlist_head predicateLocks; /* list of associated PREDICATELOCK objects */ + dlist_node finishedLink; /* list link in * FinishedSerializableTransactions */ + dlist_node xactLink; /* PredXact->activeList/availableList */ /* * perXactPredicateListLock is only used in parallel queries: it protects @@ -103,7 +105,7 @@ typedef struct SERIALIZABLEXACT * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions */ - SHM_QUEUE possibleUnsafeConflicts; + dlist_head possibleUnsafeConflicts; TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ @@ -139,28 +141,10 @@ typedef struct SERIALIZABLEXACT */ #define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800 -/* - * The following types are used to provide an ad hoc list for holding - * SERIALIZABLEXACT objects. An HTAB is overkill, since there is no need to - * access these by key -- there are direct pointers to these objects where - * needed. If a shared memory list is created, these types can probably be - * eliminated in favor of using the general solution. - */ -typedef struct PredXactListElementData -{ - SHM_QUEUE link; - SERIALIZABLEXACT sxact; -} PredXactListElementData; - -typedef struct PredXactListElementData *PredXactListElement; - -#define PredXactListElementDataSize \ - ((Size)MAXALIGN(sizeof(PredXactListElementData))) - typedef struct PredXactListData { - SHM_QUEUE availableList; - SHM_QUEUE activeList; + dlist_head availableList; + dlist_head activeList; /* * These global variables are maintained when registering and cleaning up @@ -187,7 +171,7 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ - PredXactListElement element; + SERIALIZABLEXACT *element; } PredXactListData; typedef struct PredXactListData *PredXactList; @@ -208,8 +192,8 @@ typedef struct PredXactListData *PredXactList; */ typedef struct RWConflictData { - SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */ - SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */ + dlist_node outLink; /* link for list of conflicts out from a sxact */ + dlist_node inLink; /* link for list of conflicts in to a sxact */ SERIALIZABLEXACT *sxactOut; SERIALIZABLEXACT *sxactIn; } RWConflictData; @@ -221,7 +205,7 @@ typedef struct RWConflictData *RWConflict; typedef struct RWConflictPoolHeaderData { - SHM_QUEUE availableList; + dlist_head availableList; RWConflict element; } RWConflictPoolHeaderData; @@ -303,7 +287,7 @@ typedef struct PREDICATELOCKTARGET PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */ /* data */ - SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with + dlist_head predicateLocks; /* list of PREDICATELOCK objects assoc. with * predicate lock target */ } PREDICATELOCKTARGET; @@ -336,9 +320,9 @@ typedef struct PREDICATELOCK PREDICATELOCKTAG tag; /* unique identifier of lock */ /* data */ - SHM_QUEUE targetLink; /* list link in PREDICATELOCKTARGET's list of + dlist_node targetLink; /* list link in PREDICATELOCKTARGET's list of * predicate locks */ - SHM_QUEUE xactLink; /* list link in SERIALIZABLEXACT's list of + dlist_node xactLink; /* list link in SERIALIZABLEXACT's list of * predicate locks */ SerCommitSeqNo commitSeqNo; /* only used for summarized predicate locks */ } PREDICATELOCK; diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e8120174d61..08dc2d0c316 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -260,7 +260,7 @@ #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) -#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink))) +#define SxactIsOnFinishedList(sxact) (!dlist_node_is_detached(&(sxact)->finishedLink)) /* * Note that a sxact is marked "prepared" once it has passed @@ -392,7 +392,7 @@ static RWConflictPoolHeader RWConflictPool; static HTAB *SerializableXidHash; static HTAB *PredicateLockTargetHash; static HTAB *PredicateLockHash; -static SHM_QUEUE *FinishedSerializableTransactions; +static dlist_head *FinishedSerializableTransactions; /* * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing @@ -430,8 +430,6 @@ static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact; static SERIALIZABLEXACT *CreatePredXact(void); static void ReleasePredXact(SERIALIZABLEXACT *sxact); -static SERIALIZABLEXACT *FirstPredXact(void); -static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact); static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer); static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); @@ -580,69 +578,24 @@ SerializationNeededForWrite(Relation relation) static SERIALIZABLEXACT * CreatePredXact(void) { - PredXactListElement ptle; + SERIALIZABLEXACT *sxact; - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->availableList, - &PredXact->availableList, - offsetof(PredXactListElementData, link)); - if (!ptle) + if (dlist_is_empty(&PredXact->availableList)) return NULL; - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->activeList, &ptle->link); - return &ptle->sxact; + sxact = dlist_container(SERIALIZABLEXACT, xactLink, + dlist_pop_head_node(&PredXact->availableList)); + dlist_push_tail(&PredXact->activeList, &sxact->xactLink); + return sxact; } static void ReleasePredXact(SERIALIZABLEXACT *sxact) { - PredXactListElement ptle; - Assert(ShmemAddrIsValid(sxact)); - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->availableList, &ptle->link); -} - -static SERIALIZABLEXACT * -FirstPredXact(void) -{ - PredXactListElement ptle; - - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &PredXact->activeList, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; -} - -static SERIALIZABLEXACT * -NextPredXact(SERIALIZABLEXACT *sxact) -{ - PredXactListElement ptle; - - Assert(ShmemAddrIsValid(sxact)); - - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &ptle->link, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; + dlist_delete(&sxact->xactLink); + dlist_push_tail(&PredXact->availableList, &sxact->xactLink); } /*------------------------------------------------------------------------*/ @@ -653,30 +606,25 @@ NextPredXact(SERIALIZABLEXACT *sxact) static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer) { - RWConflict conflict; + dlist_iter iter; Assert(reader != writer); /* Check the ends of the purported conflict first. */ if (SxactIsDoomed(reader) || SxactIsDoomed(writer) - || SHMQueueEmpty(&reader->outConflicts) - || SHMQueueEmpty(&writer->inConflicts)) + || dlist_is_empty(&reader->outConflicts) + || dlist_is_empty(&writer->inConflicts)) return false; /* A conflict is possible; walk the list to find out. */ - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &reader->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); + if (conflict->sxactIn == writer) return true; - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } /* No conflict found. */ @@ -691,22 +639,19 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) Assert(reader != writer); Assert(!RWConflictExists(reader, writer)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = reader; conflict->sxactIn = writer; - SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink); - SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink); + dlist_push_tail(&reader->outConflicts, &conflict->outLink); + dlist_push_tail(&writer->inConflicts, &conflict->inLink); } static void @@ -719,39 +664,33 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, Assert(SxactIsReadOnly(roXact)); Assert(!SxactIsReadOnly(activeXact)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = activeXact; conflict->sxactIn = roXact; - SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts, - &conflict->outLink); - SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts, - &conflict->inLink); + dlist_push_tail(&activeXact->possibleUnsafeConflicts, &conflict->outLink); + dlist_push_tail(&roXact->possibleUnsafeConflicts, &conflict->inLink); } static void ReleaseRWConflict(RWConflict conflict) { - SHMQueueDelete(&conflict->inLink); - SHMQueueDelete(&conflict->outLink); - SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink); + dlist_delete(&conflict->inLink); + dlist_delete(&conflict->outLink); + dlist_push_tail(&RWConflictPool->availableList, &conflict->outLink); } static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact) { - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(SxactIsReadOnly(sxact)); Assert(!SxactIsROSafe(sxact)); @@ -762,23 +701,15 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact) * We know this isn't a safe snapshot, so we can stop looking for other * potential conflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(conflict->sxactOut)); Assert(sxact == conflict->sxactIn); ReleaseRWConflict(conflict); - - conflict = nextConflict; } } @@ -1243,8 +1174,8 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&PredXact->availableList); - SHMQueueInit(&PredXact->activeList); + dlist_init(&PredXact->availableList); + dlist_init(&PredXact->activeList); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; PredXact->WritableSxactCount = 0; @@ -1252,27 +1183,26 @@ InitPredicateLocks(void) PredXact->CanPartialClearThrough = 0; PredXact->HavePartialClearedThrough = 0; requestSize = mul_size((Size) max_table_size, - PredXactListElementDataSize); + sizeof(SERIALIZABLEXACT)); PredXact->element = ShmemAlloc(requestSize); /* Add all elements to available list, clean. */ memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - LWLockInitialize(&PredXact->element[i].sxact.perXactPredicateListLock, + LWLockInitialize(&PredXact->element[i].perXactPredicateListLock, LWTRANCHE_PER_XACT_PREDICATE_LIST); - SHMQueueInsertBefore(&(PredXact->availableList), - &(PredXact->element[i].link)); + dlist_push_tail(&PredXact->availableList, &PredXact->element[i].xactLink); } PredXact->OldCommittedSxact = CreatePredXact(); SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid); PredXact->OldCommittedSxact->prepareSeqNo = 0; PredXact->OldCommittedSxact->commitSeqNo = 0; PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0; - SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks); - SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink); - SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); + dlist_init(&PredXact->OldCommittedSxact->outConflicts); + dlist_init(&PredXact->OldCommittedSxact->inConflicts); + dlist_init(&PredXact->OldCommittedSxact->predicateLocks); + dlist_node_init(&PredXact->OldCommittedSxact->finishedLink); + dlist_init(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); PredXact->OldCommittedSxact->topXid = InvalidTransactionId; PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId; PredXact->OldCommittedSxact->xmin = InvalidTransactionId; @@ -1318,7 +1248,7 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&RWConflictPool->availableList); + dlist_init(&RWConflictPool->availableList); requestSize = mul_size((Size) max_table_size, RWConflictDataSize); RWConflictPool->element = ShmemAlloc(requestSize); @@ -1326,8 +1256,8 @@ InitPredicateLocks(void) memset(RWConflictPool->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - SHMQueueInsertBefore(&(RWConflictPool->availableList), - &(RWConflictPool->element[i].outLink)); + dlist_push_tail(&RWConflictPool->availableList, + &RWConflictPool->element[i].outLink); } } @@ -1335,13 +1265,13 @@ InitPredicateLocks(void) * Create or attach to the header for the list of finished serializable * transactions. */ - FinishedSerializableTransactions = (SHM_QUEUE *) + FinishedSerializableTransactions = (dlist_head *) ShmemInitStruct("FinishedSerializableTransactions", - sizeof(SHM_QUEUE), + sizeof(dlist_head), &found); Assert(found == IsUnderPostmaster); if (!found) - SHMQueueInit(FinishedSerializableTransactions); + dlist_init(FinishedSerializableTransactions); /* * Initialize the SLRU storage for old committed serializable @@ -1380,7 +1310,7 @@ PredicateLockShmemSize(void) max_table_size *= 10; size = add_size(size, PredXactListDataSize); size = add_size(size, mul_size((Size) max_table_size, - PredXactListElementDataSize)); + sizeof(SERIALIZABLEXACT))); /* transaction xid table */ size = add_size(size, hash_estimate_size(max_table_size, @@ -1393,7 +1323,7 @@ PredicateLockShmemSize(void) RWConflictDataSize)); /* Head for list of finished serializable transactions. */ - size = add_size(size, sizeof(SHM_QUEUE)); + size = add_size(size, sizeof(dlist_head)); /* Shared memory structures for SLRU tracking of old committed xids. */ size = add_size(size, sizeof(SerialControlData)); @@ -1516,7 +1446,7 @@ SummarizeOldestCommittedSxact(void) * that case, we have nothing to do here. The caller will find one of the * slots released by the other backend when it retries. */ - if (SHMQueueEmpty(FinishedSerializableTransactions)) + if (dlist_is_empty(FinishedSerializableTransactions)) { LWLockRelease(SerializableFinishedListLock); return; @@ -1526,11 +1456,9 @@ SummarizeOldestCommittedSxact(void) * Grab the first sxact off the finished list -- this will be the earliest * commit. Remove it from the list. */ - sxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); - SHMQueueDelete(&(sxact->finishedLink)); + sxact = dlist_head_element(SERIALIZABLEXACT, finishedLink, + FinishedSerializableTransactions); + dlist_delete_thoroughly(&sxact->finishedLink); /* Add to SLRU summary information. */ if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact)) @@ -1584,7 +1512,7 @@ GetSafeSnapshot(Snapshot origSnapshot) * them marked us as conflicted. */ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + while (!(dlist_is_empty(&MySerializableXact->possibleUnsafeConflicts) || SxactIsROUnsafe(MySerializableXact))) { LWLockRelease(SerializableXactHashLock); @@ -1630,13 +1558,16 @@ int GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) { int num_written = 0; - SERIALIZABLEXACT *sxact; + dlist_iter iter; + SERIALIZABLEXACT *sxact = NULL; LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Find blocked_pid's SERIALIZABLEXACT by linear search. */ - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + sxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (sxact->pid == blocked_pid) break; } @@ -1644,21 +1575,13 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) /* Did we find it, and is it currently waiting in GetSafeSnapshot? */ if (sxact != NULL && SxactIsDeferrableWaiting(sxact)) { - RWConflict possibleUnsafeConflict; - /* Traverse the list of possible unsafe conflicts collecting PIDs. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - - while (possibleUnsafeConflict != NULL && num_written < output_size) + dlist_foreach(iter, &sxact->possibleUnsafeConflicts) { + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); + output[num_written++] = possibleUnsafeConflict->sxactOut->pid; - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -1871,19 +1794,21 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo; sxact->prepareSeqNo = InvalidSerCommitSeqNo; sxact->commitSeqNo = InvalidSerCommitSeqNo; - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); sxact->topXid = GetTopTransactionIdIfAny(); sxact->finishedBefore = InvalidTransactionId; sxact->xmin = snapshot->xmin; sxact->pid = MyProcPid; sxact->pgprocno = MyProc->pgprocno; - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&sxact->predicateLocks); + dlist_node_init(&sxact->finishedLink); sxact->flags = 0; if (XactReadOnly) { + dlist_iter iter; + sxact->flags |= SXACT_FLAG_READ_ONLY; /* @@ -1892,10 +1817,10 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, * transactions then this snapshot can be deemed safe (and we can run * without tracking predicate locks). */ - for (othersxact = FirstPredXact(); - othersxact != NULL; - othersxact = NextPredXact(othersxact)) + dlist_foreach(iter, &PredXact->activeList) { + othersxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsCommitted(othersxact) && !SxactIsDoomed(othersxact) && !SxactIsReadOnly(othersxact)) @@ -2172,7 +2097,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) Assert(LWLockHeldByMe(SerializablePredicateListLock)); /* Can't remove it until no locks at this target. */ - if (!SHMQueueEmpty(&target->predicateLocks)) + if (!dlist_is_empty(&target->predicateLocks)) return; /* Actually remove the target. */ @@ -2200,28 +2125,20 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) { SERIALIZABLEXACT *sxact; PREDICATELOCK *predlock; + dlist_mutable_iter iter; LWLockAcquire(SerializablePredicateListLock, LW_SHARED); sxact = MySerializableXact; if (IsInParallelMode()) LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + + dlist_foreach_modify(iter, &sxact->predicateLocks) { - SHM_QUEUE *predlocksxactlink; - PREDICATELOCK *nextpredlock; PREDICATELOCKTAG oldlocktag; PREDICATELOCKTARGET *oldtarget; PREDICATELOCKTARGETTAG oldtargettag; - predlocksxactlink = &(predlock->xactLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - predlocksxactlink, - offsetof(PREDICATELOCK, xactLink)); + predlock = dlist_container(PREDICATELOCK, xactLink, iter.cur); oldlocktag = predlock->tag; Assert(oldlocktag.myXact == sxact); @@ -2239,8 +2156,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(predlocksxactlink); - SHMQueueDelete(&(predlock->targetLink)); + dlist_delete(&predlock->xactLink); + dlist_delete(&predlock->targetLink); rmpredlock = hash_search_with_hash_value (PredicateLockHash, &oldlocktag, @@ -2255,8 +2172,6 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) DecrementParentLocks(&oldtargettag); } - - predlock = nextpredlock; } if (IsInParallelMode()) LWLockRelease(&sxact->perXactPredicateListLock); @@ -2473,7 +2388,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, errmsg("out of shared memory"), errhint("You might need to increase max_pred_locks_per_transaction."))); if (!found) - SHMQueueInit(&(target->predicateLocks)); + dlist_init(&target->predicateLocks); /* We've got the sxact and target, make sure they're joined. */ locktag.myTarget = target; @@ -2490,9 +2405,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, if (!found) { - SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink)); - SHMQueueInsertBefore(&(sxact->predicateLocks), - &(lock->xactLink)); + dlist_push_tail(&target->predicateLocks, &lock->targetLink); + dlist_push_tail(&sxact->predicateLocks, &lock->xactLink); lock->commitSeqNo = InvalidSerCommitSeqNo; } @@ -2664,30 +2578,22 @@ PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot, static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) { - PREDICATELOCK *predlock; - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - bool found; + dlist_mutable_iter iter; Assert(LWLockHeldByMeInMode(SerializablePredicateListLock, LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (predlock) - { - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); - SHMQueueDelete(&(predlock->xactLink)); - SHMQueueDelete(&(predlock->targetLink)); + dlist_foreach_modify(iter, &target->predicateLocks) + { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); + bool found; + + dlist_delete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2696,8 +2602,6 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) targettaghash), HASH_REMOVE, &found); Assert(found); - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); @@ -2796,8 +2700,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, if (oldtarget) { PREDICATELOCKTARGET *newtarget; - PREDICATELOCK *oldpredlock; PREDICATELOCKTAG newpredlocktag; + dlist_mutable_iter iter; newtarget = hash_search_with_hash_value(PredicateLockTargetHash, &newtargettag, @@ -2813,7 +2717,7 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, /* If we created a new entry, initialize it */ if (!found) - SHMQueueInit(&(newtarget->predicateLocks)); + dlist_init(&newtarget->predicateLocks); newpredlocktag.myTarget = newtarget; @@ -2821,29 +2725,21 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (oldpredlock) + + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo; - predlocktargetlink = &(oldpredlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); newpredlocktag.myXact = oldpredlock->tag.myXact; if (removeOld) { - SHMQueueDelete(&(oldpredlock->xactLink)); - SHMQueueDelete(&(oldpredlock->targetLink)); + dlist_delete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2871,10 +2767,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, } if (!found) { - SHMQueueInsertBefore(&(newtarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(newtarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -2886,14 +2782,12 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, Assert(newpredlock->commitSeqNo != 0); Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); - - oldpredlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); if (removeOld) { - Assert(SHMQueueEmpty(&oldtarget->predicateLocks)); + Assert(dlist_is_empty(&oldtarget->predicateLocks)); RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash); } } @@ -3013,7 +2907,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *oldpredlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -3048,29 +2942,21 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) heaptargettaghash, HASH_ENTER, &found); if (!found) - SHMQueueInit(&heaptarget->predicateLocks); + dlist_init(&heaptarget->predicateLocks); } /* * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (oldpredlock) + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo; SERIALIZABLEXACT *oldXact; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldpredlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); - /* * Remove the old lock first. This avoids the chance of running * out of lock structure entries for the hash table. @@ -3078,7 +2964,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) oldCommitSeqNo = oldpredlock->commitSeqNo; oldXact = oldpredlock->tag.myXact; - SHMQueueDelete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->xactLink)); /* * No need for retail delete from oldtarget list, we're removing @@ -3104,10 +2990,10 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) &found); if (!found) { - SHMQueueInsertBefore(&(heaptarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(heaptarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -3120,8 +3006,6 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); } - - oldpredlock = nextpredlock; } hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE, @@ -3276,15 +3160,18 @@ PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, static void SetNewSxactGlobalXmin(void) { - SERIALIZABLEXACT *sxact; + dlist_iter iter; Assert(LWLockHeldByMe(SerializableXactHashLock)); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + SERIALIZABLEXACT *sxact = + dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsRolledBack(sxact) && !SxactIsCommitted(sxact) && sxact != OldCommittedSxact) @@ -3335,10 +3222,8 @@ void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) { bool needToClear; - RWConflict conflict, - nextConflict, - possibleUnsafeConflict; SERIALIZABLEXACT *roXact; + dlist_mutable_iter iter; /* * We can't trust XactReadOnly here, because a transaction which started @@ -3526,23 +3411,15 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * make us unsafe. Note that we use 'inLink' for the iteration as * opposed to 'outLink' for the r/w xacts. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut)); Assert(MySerializableXact == possibleUnsafeConflict->sxactIn); ReleaseRWConflict(possibleUnsafeConflict); - - possibleUnsafeConflict = nextConflict; } } @@ -3565,16 +3442,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to * previously committed transactions. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &MySerializableXact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); if (isCommit && !SxactIsReadOnly(MySerializableXact) @@ -3590,31 +3461,21 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) || SxactIsCommitted(conflict->sxactIn) || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } /* * Release all inConflicts from committed and read-only transactions. If * we're rolling back, clear them all. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); if (!isCommit || SxactIsCommitted(conflict->sxactOut) || SxactIsReadOnly(conflict->sxactOut)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } if (!topLevelIsDeclaredReadOnly) @@ -3625,16 +3486,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * conflict out. If any are waiting DEFERRABLE transactions, wake them * up if they are known safe or known unsafe. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, outLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, outLink, iter.cur); roXact = possibleUnsafeConflict->sxactIn; Assert(MySerializableXact == possibleUnsafeConflict->sxactOut); @@ -3662,7 +3517,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * transaction can now safely release its predicate locks (but * that transaction's backend has to do that itself). */ - if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts)) + if (dlist_is_empty(&roXact->possibleUnsafeConflicts)) roXact->flags |= SXACT_FLAG_RO_SAFE; } @@ -3673,8 +3528,6 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pgprocno); - - possibleUnsafeConflict = nextConflict; } } @@ -3702,8 +3555,8 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) /* Add this to the list of transactions to check for later cleanup. */ if (isCommit) - SHMQueueInsertBefore(FinishedSerializableTransactions, - &MySerializableXact->finishedLink); + dlist_push_tail(FinishedSerializableTransactions, + &MySerializableXact->finishedLink); /* * If we're releasing a RO_SAFE transaction in parallel mode, we'll only @@ -3745,27 +3598,19 @@ ReleasePredicateLocksLocal(void) static void ClearOldPredicateLocks(void) { - SERIALIZABLEXACT *finishedSxact; - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Loop through finished transactions. They are in commit order, so we can * stop as soon as we find one that's still interesting. */ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE); - finishedSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (finishedSxact) + dlist_foreach_modify(iter, FinishedSerializableTransactions) { - SERIALIZABLEXACT *nextSxact; + SERIALIZABLEXACT *finishedSxact = + dlist_container(SERIALIZABLEXACT, finishedLink, iter.cur); - nextSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - &(finishedSxact->finishedLink), - offsetof(SERIALIZABLEXACT, finishedLink)); if (!TransactionIdIsValid(PredXact->SxactGlobalXmin) || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore, PredXact->SxactGlobalXmin)) @@ -3775,7 +3620,7 @@ ClearOldPredicateLocks(void) * took its snapshot. It's no longer interesting. */ LWLockRelease(SerializableXactHashLock); - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&finishedSxact->finishedLink); ReleaseOneSerializableXact(finishedSxact, false, false); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } @@ -3792,7 +3637,7 @@ ClearOldPredicateLocks(void) if (SxactIsReadOnly(finishedSxact)) { /* A read-only transaction can be removed entirely */ - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&(finishedSxact->finishedLink)); ReleaseOneSerializableXact(finishedSxact, false, false); } else @@ -3813,7 +3658,6 @@ ClearOldPredicateLocks(void) /* Still interesting. */ break; } - finishedSxact = nextSxact; } LWLockRelease(SerializableXactHashLock); @@ -3821,20 +3665,12 @@ ClearOldPredicateLocks(void) * Loop through predicate locks on dummy transaction for summarized data. */ LWLockAcquire(SerializablePredicateListLock, LW_SHARED); - predlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &OldCommittedSxact->predicateLocks, - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + dlist_foreach_modify(iter, &OldCommittedSxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); bool canDoPartialCleanup; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &predlock->xactLink, - offsetof(PREDICATELOCK, xactLink)); - LWLockAcquire(SerializableXactHashLock, LW_SHARED); Assert(predlock->commitSeqNo != 0); Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo); @@ -3861,8 +3697,8 @@ ClearOldPredicateLocks(void) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(&(predlock->targetLink)); - SHMQueueDelete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); + dlist_delete(&(predlock->xactLink)); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3872,8 +3708,6 @@ ClearOldPredicateLocks(void) LWLockRelease(partitionLock); } - - predlock = nextpredlock; } LWLockRelease(SerializablePredicateListLock); @@ -3903,10 +3737,8 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, bool summarize) { - PREDICATELOCK *predlock; SERIALIZABLEXIDTAG sxidtag; - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(sxact != NULL); Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact)); @@ -3920,27 +3752,17 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(SerializablePredicateListLock, LW_SHARED); if (IsInParallelMode()) LWLockAcquire(&sxact->perXactPredicateListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + dlist_foreach_modify(iter, &sxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); PREDICATELOCKTAG tag; - SHM_QUEUE *targetLink; PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; LWLock *partitionLock; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); - tag = predlock->tag; - targetLink = &(predlock->targetLink); target = tag.myTarget; targettag = target->tag; targettaghash = PredicateLockTargetTagHashCode(&targettag); @@ -3948,7 +3770,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(targetLink); + dlist_delete(&predlock->targetLink); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3978,10 +3800,10 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, } else { - SHMQueueInsertBefore(&(target->predicateLocks), - &(predlock->targetLink)); - SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks), - &(predlock->xactLink)); + dlist_push_tail(&target->predicateLocks, + &predlock->targetLink); + dlist_push_tail(&OldCommittedSxact->predicateLocks, + &predlock->xactLink); predlock->commitSeqNo = sxact->commitSeqNo; } } @@ -3989,15 +3811,13 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, RemoveTargetIfNoLongerUsed(target, targettaghash); LWLockRelease(partitionLock); - - predlock = nextpredlock; } /* * Rather than retail removal, just re-init the head after we've run * through the list. */ - SHMQueueInit(&sxact->predicateLocks); + dlist_init(&sxact->predicateLocks); if (IsInParallelMode()) LWLockRelease(&sxact->perXactPredicateListLock); @@ -4009,38 +3829,25 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, /* Release all outConflicts (unless 'partial' is true) */ if (!partial) { - conflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &sxact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); if (summarize) conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; ReleaseRWConflict(conflict); - conflict = nextConflict; } } /* Release all inConflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &sxact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + if (summarize) conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; ReleaseRWConflict(conflict); - conflict = nextConflict; } /* Finally, get rid of the xid and the record of the transaction itself. */ @@ -4166,7 +3973,7 @@ CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot s errhint("The transaction might succeed if retried."))); if (SxactHasSummaryConflictIn(MySerializableXact) - || !SHMQueueEmpty(&MySerializableXact->inConflicts)) + || !dlist_is_empty(&MySerializableXact->inConflicts)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), @@ -4262,9 +4069,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) uint32 targettaghash; LWLock *partitionLock; PREDICATELOCKTARGET *target; - PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; PREDICATELOCKTAG mypredlocktag; + dlist_mutable_iter iter; Assert(MySerializableXact != InvalidSerializableXact); @@ -4289,24 +4096,14 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) * Each lock for an overlapping transaction represents a conflict: a * rw-dependency in to this transaction. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (predlock) + + dlist_foreach_modify(iter, &target->predicateLocks) { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - SERIALIZABLEXACT *sxact; + PREDICATELOCK *predlock = (PREDICATELOCK *) + dlist_container(PREDICATELOCK, targetLink, iter.cur); + SERIALIZABLEXACT *sxact = predlock->tag.myXact; - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); - - sxact = predlock->tag.myXact; if (sxact == MySerializableXact) { /* @@ -4351,8 +4148,6 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); @@ -4392,8 +4187,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { Assert(rmpredlock == mypredlock); - SHMQueueDelete(&(mypredlock->targetLink)); - SHMQueueDelete(&(mypredlock->xactLink)); + dlist_delete(&(mypredlock->targetLink)); + dlist_delete(&(mypredlock->xactLink)); rmpredlock = (PREDICATELOCK *) hash_search_with_hash_value(PredicateLockHash, @@ -4563,7 +4358,7 @@ CheckTableForSerializableConflictIn(Relation relation) while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -4576,26 +4371,16 @@ CheckTableForSerializableConflictIn(Relation relation) /* * Loop through locks for this target and flag conflicts. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (predlock) + dlist_foreach_modify(iter, &target->predicateLocks) { - PREDICATELOCK *nextpredlock; - - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(predlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); if (predlock->tag.myXact != MySerializableXact && !RWConflictExists(predlock->tag.myXact, MySerializableXact)) { FlagRWConflict(predlock->tag.myXact, MySerializableXact); } - - predlock = nextpredlock; } } @@ -4653,7 +4438,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) { bool failure; - RWConflict conflict; Assert(LWLockHeldByMe(SerializableXactHashLock)); @@ -4693,20 +4477,16 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, * to abort. *------------------------------------------------------------------------ */ - if (!failure) + if (!failure && SxactHasSummaryConflictOut(writer)) + failure = true; + else if (!failure) { - if (SxactHasSummaryConflictOut(writer)) - { - failure = true; - conflict = NULL; - } - else - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &writer->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_iter iter; + + dlist_foreach(iter, &writer->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); SERIALIZABLEXACT *t2 = conflict->sxactIn; if (SxactIsPrepared(t2) @@ -4720,10 +4500,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, failure = true; break; } - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } } @@ -4745,30 +4521,27 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, if (SxactHasSummaryConflictIn(reader)) { failure = true; - conflict = NULL; } else - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &reader->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) { - SERIALIZABLEXACT *t0 = conflict->sxactOut; + dlist_iter iter; - if (!SxactIsDoomed(t0) - && (!SxactIsCommitted(t0) - || t0->commitSeqNo >= writer->prepareSeqNo) - && (!SxactIsReadOnly(t0) - || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->inConflicts) { - failure = true; - break; + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + SERIALIZABLEXACT *t0 = conflict->sxactOut; + + if (!SxactIsDoomed(t0) + && (!SxactIsCommitted(t0) + || t0->commitSeqNo >= writer->prepareSeqNo) + && (!SxactIsReadOnly(t0) + || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + { + failure = true; + break; + } } - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -4826,7 +4599,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, void PreCommit_CheckForSerializationFailure(void) { - RWConflict nearConflict; + dlist_iter near_iter; if (MySerializableXact == InvalidSerializableXact) return; @@ -4847,23 +4620,21 @@ PreCommit_CheckForSerializationFailure(void) errhint("The transaction might succeed if retried."))); } - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (nearConflict) + dlist_foreach(near_iter, &MySerializableXact->inConflicts) { + RWConflict nearConflict = + dlist_container(RWConflictData, inLink, near_iter.cur); + if (!SxactIsCommitted(nearConflict->sxactOut) && !SxactIsDoomed(nearConflict->sxactOut)) { - RWConflict farConflict; + dlist_iter far_iter; - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &nearConflict->sxactOut->inConflicts, - offsetof(RWConflictData, inLink)); - while (farConflict) + dlist_foreach(far_iter, &nearConflict->sxactOut->inConflicts) { + RWConflict farConflict = + dlist_container(RWConflictData, inLink, far_iter.cur); + if (farConflict->sxactOut == MySerializableXact || (!SxactIsCommitted(farConflict->sxactOut) && !SxactIsReadOnly(farConflict->sxactOut) @@ -4887,17 +4658,8 @@ PreCommit_CheckForSerializationFailure(void) nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED; break; } - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &farConflict->inLink, - offsetof(RWConflictData, inLink)); } } - - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &nearConflict->inLink, - offsetof(RWConflictData, inLink)); } MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); @@ -4920,11 +4682,11 @@ PreCommit_CheckForSerializationFailure(void) void AtPrepare_PredicateLocks(void) { - PREDICATELOCK *predlock; SERIALIZABLEXACT *sxact; TwoPhasePredicateRecord record; TwoPhasePredicateXactRecord *xactRecord; TwoPhasePredicateLockRecord *lockRecord; + dlist_iter iter; sxact = MySerializableXact; xactRecord = &(record.data.xactRecord); @@ -4964,23 +4726,16 @@ AtPrepare_PredicateLocks(void) */ Assert(!IsParallelWorker() && !ParallelContextActive()); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - - while (predlock != NULL) + dlist_foreach(iter, &sxact->predicateLocks) { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); + record.type = TWOPHASEPREDICATERECORD_LOCK; lockRecord->target = predlock->tag.myTarget->tag; RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0, &record, sizeof(record)); - - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); } LWLockRelease(SerializablePredicateListLock); @@ -5092,10 +4847,10 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * recovered xact started are still active, except possibly other * prepared xacts and we don't care whether those are RO_SAFE or not. */ - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&(sxact->predicateLocks)); + dlist_node_init(&sxact->finishedLink); sxact->topXid = xid; sxact->xmin = xactRecord->xmin; @@ -5113,8 +4868,8 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * we'll conservatively assume that it had both a conflict in and a * conflict out, and represent that with the summary conflict flags. */ - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; -- 2.38.0
>From 4d71eb31586ec9c022b2b060bf6a3e292d61dbbb Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sat, 19 Nov 2022 15:26:35 -0800 Subject: [PATCH v2 6/9] Remove now unused SHMQueue* --- src/include/storage/shmem.h | 22 ---- src/backend/storage/ipc/Makefile | 1 - src/backend/storage/ipc/meson.build | 1 - src/backend/storage/ipc/shmqueue.c | 190 ---------------------------- src/tools/pgindent/typedefs.list | 1 - 5 files changed, 215 deletions(-) delete mode 100644 src/backend/storage/ipc/shmqueue.c diff --git a/src/include/storage/shmem.h b/src/include/storage/shmem.h index de9e7c6e73f..ba97f7a95fe 100644 --- a/src/include/storage/shmem.h +++ b/src/include/storage/shmem.h @@ -24,13 +24,6 @@ #include "utils/hsearch.h" -/* shmqueue.c */ -typedef struct SHM_QUEUE -{ - struct SHM_QUEUE *prev; - struct SHM_QUEUE *next; -} SHM_QUEUE; - /* shmem.c */ extern void InitShmemAccess(void *seghdr); extern void InitShmemAllocation(void); @@ -63,19 +56,4 @@ typedef struct Size allocated_size; /* # bytes actually allocated */ } ShmemIndexEnt; -/* - * prototypes for functions in shmqueue.c - */ -extern void SHMQueueInit(SHM_QUEUE *queue); -extern void SHMQueueElemInit(SHM_QUEUE *queue); -extern void SHMQueueDelete(SHM_QUEUE *queue); -extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem); -extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem); -extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, - Size linkOffset); -extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, - Size linkOffset); -extern bool SHMQueueEmpty(const SHM_QUEUE *queue); -extern bool SHMQueueIsDetached(const SHM_QUEUE *queue); - #endif /* SHMEM_H */ diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index df90c6b093f..6d5b921038c 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -21,7 +21,6 @@ OBJS = \ shm_mq.o \ shm_toc.o \ shmem.o \ - shmqueue.o \ signalfuncs.o \ sinval.o \ sinvaladt.o \ diff --git a/src/backend/storage/ipc/meson.build b/src/backend/storage/ipc/meson.build index 516bc1d0193..b24ede71ce4 100644 --- a/src/backend/storage/ipc/meson.build +++ b/src/backend/storage/ipc/meson.build @@ -11,7 +11,6 @@ backend_sources += files( 'shm_mq.c', 'shm_toc.c', 'shmem.c', - 'shmqueue.c', 'signalfuncs.c', 'sinval.c', 'sinvaladt.c', diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c deleted file mode 100644 index a22cf576e06..00000000000 --- a/src/backend/storage/ipc/shmqueue.c +++ /dev/null @@ -1,190 +0,0 @@ -/*------------------------------------------------------------------------- - * - * shmqueue.c - * shared memory linked lists - * - * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * src/backend/storage/ipc/shmqueue.c - * - * NOTES - * - * Package for managing doubly-linked lists in shared memory. - * The only tricky thing is that SHM_QUEUE will usually be a field - * in a larger record. SHMQueueNext has to return a pointer - * to the record itself instead of a pointer to the SHMQueue field - * of the record. It takes an extra parameter and does some extra - * pointer arithmetic to do this correctly. - * - * NOTE: These are set up so they can be turned into macros some day. - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "storage/shmem.h" - - -/* - * ShmemQueueInit -- make the head of a new queue point - * to itself - */ -void -SHMQueueInit(SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - queue->prev = queue->next = queue; -} - -/* - * SHMQueueIsDetached -- true if element is not currently - * in a queue. - */ -bool -SHMQueueIsDetached(const SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - return (queue->prev == NULL); -} - -/* - * SHMQueueElemInit -- clear an element's links - */ -void -SHMQueueElemInit(SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - queue->prev = queue->next = NULL; -} - -/* - * SHMQueueDelete -- remove an element from the queue and - * close the links - */ -void -SHMQueueDelete(SHM_QUEUE *queue) -{ - SHM_QUEUE *nextElem = queue->next; - SHM_QUEUE *prevElem = queue->prev; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(nextElem)); - Assert(ShmemAddrIsValid(prevElem)); - - prevElem->next = queue->next; - nextElem->prev = queue->prev; - - queue->prev = queue->next = NULL; -} - -/* - * SHMQueueInsertBefore -- put elem in queue before the given queue - * element. Inserting "before" the queue head puts the elem - * at the tail of the queue. - */ -void -SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem) -{ - SHM_QUEUE *prevPtr = queue->prev; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(elem)); - - elem->next = prevPtr->next; - elem->prev = queue->prev; - queue->prev = elem; - prevPtr->next = elem; -} - -/* - * SHMQueueInsertAfter -- put elem in queue after the given queue - * element. Inserting "after" the queue head puts the elem - * at the head of the queue. - */ -void -SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) -{ - SHM_QUEUE *nextPtr = queue->next; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(elem)); - - elem->prev = nextPtr->prev; - elem->next = queue->next; - queue->next = elem; - nextPtr->prev = elem; -} - -/*-------------------- - * SHMQueueNext -- Get the next element from a queue - * - * To start the iteration, pass the queue head as both queue and curElem. - * Returns NULL if no more elements. - * - * Next element is at curElem->next. If SHMQueue is part of - * a larger structure, we want to return a pointer to the - * whole structure rather than a pointer to its SHMQueue field. - * For example, - * struct { - * int stuff; - * SHMQueue elem; - * } ELEMType; - * When this element is in a queue, prevElem->next points at struct.elem. - * We subtract linkOffset to get the correct start address of the structure. - * - * calls to SHMQueueNext should take these parameters: - * &(queueHead), &(queueHead), offsetof(ELEMType, elem) - * or - * &(queueHead), &(curElem->elem), offsetof(ELEMType, elem) - *-------------------- - */ -Pointer -SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) -{ - SHM_QUEUE *elemPtr = curElem->next; - - Assert(ShmemAddrIsValid(curElem)); - - if (elemPtr == queue) /* back to the queue head? */ - return NULL; - - return (Pointer) (((char *) elemPtr) - linkOffset); -} - -/*-------------------- - * SHMQueuePrev -- Get the previous element from a queue - * - * Same as SHMQueueNext, just starting at tail and moving towards head. - * All other comments and usage applies. - */ -Pointer -SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) -{ - SHM_QUEUE *elemPtr = curElem->prev; - - Assert(ShmemAddrIsValid(curElem)); - - if (elemPtr == queue) /* back to the queue head? */ - return NULL; - - return (Pointer) (((char *) elemPtr) - linkOffset); -} - -/* - * SHMQueueEmpty -- true if queue head is only element, false otherwise - */ -bool -SHMQueueEmpty(const SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - - if (queue->prev == queue) - { - Assert(queue->next == queue); - return true; - } - return false; -} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c138745e42c..47dbf1688be 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2369,7 +2369,6 @@ SERIALIZABLEXIDTAG SERVICE_STATUS SERVICE_STATUS_HANDLE SERVICE_TABLE_ENTRY -SHM_QUEUE SID_AND_ATTRIBUTES SID_IDENTIFIER_AUTHORITY SID_NAME_USE -- 2.38.0