Hi, Some of the discussions about improving the locking code, in particular the group locking / deadlock detector discussion with Robert, again made me look at lock.c. While looking at how much work it'd be to a) remove the PROCLOCK hashtable b) move more of the group locking logic into lock.c, rather than smarts in deadlock.c, I got sidetracked by all the verbose and hard to read SHM_QUEUE code.
Here's a *draft* patch series for removing all use of SHM_QUEUE, and subsequently removing SHM_QUEUE. There's a fair bit of polish needed, but I do think it makes the code considerably easier to read (particularly for predicate.c). The diffstat is nice too: src/include/lib/ilist.h | 132 +++++++++++++++++---- src/include/replication/walsender_private.h | 3 +- src/include/storage/lock.h | 10 +- src/include/storage/predicate_internals.h | 49 +++----- src/include/storage/proc.h | 18 +-- src/include/storage/shmem.h | 22 ---- src/backend/access/transam/twophase.c | 4 +- src/backend/lib/ilist.c | 8 +- src/backend/replication/syncrep.c | 89 ++++++-------- src/backend/replication/walsender.c | 2 +- src/backend/storage/ipc/Makefile | 1 - src/backend/storage/ipc/shmqueue.c | 190 ------------------------------ src/backend/storage/lmgr/deadlock.c | 76 +++++------- src/backend/storage/lmgr/lock.c | 129 ++++++++------------ src/backend/storage/lmgr/predicate.c | 692 +++++++++++++++++++++++++++++++++++------------------------------------------------------------------------ src/backend/storage/lmgr/proc.c | 197 +++++++++++++------------------ 16 files changed, 569 insertions(+), 1053 deletions(-) I don't want to invest a lot of time into this if there's not some agreement that this is a good direction to go into. So I'd appreciate a few cursory looks before spending more time. Overview: 0001: Add additional prev/next & detached node functions to ilist. I think the additional prev/next accessors are nice. I am less convinced by the 'detached' stuff, but it's used by some SHM_QUEUE users. I don't want to make the plain dlist_delete reset the node's prev/next pointers, it's not needed in the vast majority of cases... 0002: Use dlists instead of SHM_QUEUE for heavyweight locks. I mostly removed the odd reliance on PGPROC.links needing to be the first struct member - seems odd. I think we should rename PROC_QUEUE.links, elsewhere that's used for list membership nodes, so it's imo confusing/odd. 0003: Use dlist for syncrep queue. This seems fairly simple to me. 0004: Use dlists for predicate locking. Unfortunately pretty large. I think it's a huge improvement, but it's also subtle code. Wonder if there's something better to do here wrt OnConflict_CheckForSerializationFailure? 0005: Remove now unused SHMQueue*. 0006: Remove PROC_QUEUE.size. I'm not sure whether this is a a good idea. I was looking primarily at that because I thought it'd allow us to remove PROC_QUEUE as a whole if we wanted to. But as PROC_QUEUE.size doesn't really seem to buy us much, we should perhaps just do something roughly like in the patch? Greetings, Andres Freund
>From 777808a9f51063dc31abcef54f49e6a327efcfb8 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 12:06:08 -0800 Subject: [PATCH v1 1/6] Add additional prev/next & detached node functions to ilist. --- src/include/lib/ilist.h | 132 +++++++++++++++++++++++++++++++++------- src/backend/lib/ilist.c | 8 +-- 2 files changed, 113 insertions(+), 27 deletions(-) diff --git a/src/include/lib/ilist.h b/src/include/lib/ilist.h index 98db885f6ff..1ad9ceb033f 100644 --- a/src/include/lib/ilist.h +++ b/src/include/lib/ilist.h @@ -255,8 +255,8 @@ typedef struct slist_mutable_iter extern void slist_delete(slist_head *head, slist_node *node); #ifdef ILIST_DEBUG -extern void dlist_check(dlist_head *head); -extern void slist_check(slist_head *head); +extern void dlist_check(const dlist_head *head); +extern void slist_check(const slist_head *head); #else /* * These seemingly useless casts to void are here to keep the compiler quiet @@ -270,6 +270,11 @@ extern void slist_check(slist_head *head); /* doubly linked list implementation */ +/* prototypes dlist helpers */ +static inline void *dlist_element_off(dlist_node *node, size_t off); +static inline void *dlist_head_element_off(dlist_head *head, size_t off); +static inline void *dlist_tail_element_off(dlist_head *head, size_t off); + /* * Initialize a doubly linked list. * Previous state will be thrown away without any cleanup. @@ -280,13 +285,24 @@ dlist_init(dlist_head *head) head->head.next = head->head.prev = &head->head; } +/* + * Initialize a doubly linked element. + * + * This is only needed when dlist_node_is_detached() may be needed. + */ +static inline void +dlist_node_init(dlist_node *node) +{ + node->next = node->prev = NULL; +} + /* * Is the list empty? * * An empty list has either its first 'next' pointer set to NULL, or to itself. */ static inline bool -dlist_is_empty(dlist_head *head) +dlist_is_empty(const dlist_head *head) { dlist_check(head); @@ -361,6 +377,20 @@ dlist_delete(dlist_node *node) node->next->prev = node->prev; } + +/* + * Like dlist_delete(), but also sets next/prev to NULL to signal not being in + * list. + */ +static inline void +dlist_delete_thoroughly(dlist_node *node) +{ + node->prev->next = node->next; + node->next->prev = node->prev; + node->next = NULL; + node->prev = NULL; +} + /* * Remove and return the first node from a list (there must be one). */ @@ -399,7 +429,7 @@ dlist_move_head(dlist_head *head, dlist_node *node) * Caution: unreliable if 'node' is not in the list. */ static inline bool -dlist_has_next(dlist_head *head, dlist_node *node) +dlist_has_next(const dlist_head *head, const dlist_node *node) { return node->next != &head->head; } @@ -409,11 +439,25 @@ dlist_has_next(dlist_head *head, dlist_node *node) * Caution: unreliable if 'node' is not in the list. */ static inline bool -dlist_has_prev(dlist_head *head, dlist_node *node) +dlist_has_prev(const dlist_head *head, const dlist_node *node) { return node->prev != &head->head; } +/* + * Check if node is detached. A node is only detached if it either has been + * initialized with dlist_init_node(), or deleted with + * dlist_delete_thoroughly(). + */ +static inline bool +dlist_node_is_detached(const dlist_node *node) +{ + Assert((node->next == NULL && node->prev == NULL) || + (node->next != NULL && node->prev != NULL)); + + return node->next == NULL; +} + /* * Return the next node in the list (there must be one). */ @@ -434,14 +478,6 @@ dlist_prev_node(dlist_head *head, dlist_node *node) return node->prev; } -/* internal support function to get address of head element's struct */ -static inline void * -dlist_head_element_off(dlist_head *head, size_t off) -{ - Assert(!dlist_is_empty(head)); - return (char *) head->head.next - off; -} - /* * Return the first node in the list (there must be one). */ @@ -451,14 +487,6 @@ dlist_head_node(dlist_head *head) return (dlist_node *) dlist_head_element_off(head, 0); } -/* internal support function to get address of tail element's struct */ -static inline void * -dlist_tail_element_off(dlist_head *head, size_t off) -{ - Assert(!dlist_is_empty(head)); - return (char *) head->head.prev - off; -} - /* * Return the last node in the list (there must be one). */ @@ -497,6 +525,24 @@ dlist_tail_node(dlist_head *head) (AssertVariableIsOfTypeMacro(((type *) NULL)->membername, dlist_node), \ ((type *) dlist_tail_element_off(lhead, offsetof(type, membername)))) +/* + * Return the address of the previous element in the list. + * + * The node must have a previous node. + */ +#define dlist_prev_element(type, membername, node) \ + (AssertVariableIsOfTypeMacro(((type *) NULL)->membername, dlist_node), \ + ((type *) dlist_prev_element_off(node, offsetof(type, membername)))) + +/* + * Return the address of the next element in the list. + * + * The node must have a next node. + */ +#define dlist_next_element(type, membername, node) \ + (AssertVariableIsOfTypeMacro(((type *) NULL)->membername, dlist_node), \ + ((type *) dlist_next_element_off(node, offsetof(type, membername)))) + /* * Iterate through the list pointed at by 'lhead' storing the state in 'iter'. * @@ -543,6 +589,46 @@ dlist_tail_node(dlist_head *head) (iter).cur != (iter).end; \ (iter).cur = (iter).cur->prev) +/* internal support function to get address of element's struct */ +static inline void * +dlist_element_off(dlist_node *node, size_t off) +{ + return (char *) node - off; +} + +/* internal support function to get address of head element's struct */ +static inline void * +dlist_head_element_off(dlist_head *head, size_t off) +{ + Assert(!dlist_is_empty(head)); + return dlist_element_off(head->head.next, off); +} + +/* internal support function to get address of tail element's struct */ +static inline void * +dlist_tail_element_off(dlist_head *head, size_t off) +{ + Assert(!dlist_is_empty(head)); + return dlist_element_off(head->head.next, off); +} + +/* internal support function to get address of prev element's struct */ +static inline void * +dlist_prev_element_off(dlist_node *node, size_t off) +{ + Assert(!dlist_node_is_detached(node) && + dlist_node_is_detached(node->prev)); + return dlist_element_off(node->prev, off); +} + +/* internal support function to get address of next element's struct */ +static inline void * +dlist_next_element_off(dlist_node *node, size_t off) +{ + Assert(!dlist_node_is_detached(node) && + dlist_node_is_detached(node->next)); + return dlist_element_off(node->next, off); +} /* singly linked list implementation */ @@ -560,7 +646,7 @@ slist_init(slist_head *head) * Is the list empty? */ static inline bool -slist_is_empty(slist_head *head) +slist_is_empty(const slist_head *head) { slist_check(head); @@ -608,7 +694,7 @@ slist_pop_head_node(slist_head *head) * Check whether 'node' has a following node. */ static inline bool -slist_has_next(slist_head *head, slist_node *node) +slist_has_next(const slist_head *head, const slist_node *node) { slist_check(head); diff --git a/src/backend/lib/ilist.c b/src/backend/lib/ilist.c index 9b02d546076..82752ab1a39 100644 --- a/src/backend/lib/ilist.c +++ b/src/backend/lib/ilist.c @@ -56,9 +56,9 @@ slist_delete(slist_head *head, slist_node *node) * Verify integrity of a doubly linked list */ void -dlist_check(dlist_head *head) +dlist_check(const dlist_head *head) { - dlist_node *cur; + const dlist_node *cur; if (head == NULL) elog(ERROR, "doubly linked list head address is NULL"); @@ -93,9 +93,9 @@ dlist_check(dlist_head *head) * Verify integrity of a singly linked list */ void -slist_check(slist_head *head) +slist_check(const slist_head *head) { - slist_node *cur; + const slist_node *cur; if (head == NULL) elog(ERROR, "singly linked list head address is NULL"); -- 2.25.0.114.g5b0ca878e0
>From 53f0dc9b79aa072e2b49d41be1dc63f3e838aa2b Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 12:23:28 -0800 Subject: [PATCH v1 2/6] Use dlists instead of SHM_QUEUE for heavyweight locks. Todo: - Consider using singly linked list? - Consider removing PROC_QUEUE - the size doesn't seem important. --- src/include/storage/lock.h | 9 +- src/include/storage/proc.h | 16 +-- src/backend/access/transam/twophase.c | 4 +- src/backend/storage/lmgr/deadlock.c | 51 +++---- src/backend/storage/lmgr/lock.c | 121 ++++++---------- src/backend/storage/lmgr/proc.c | 192 +++++++++++--------------- 6 files changed, 161 insertions(+), 232 deletions(-) diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index bb8e4e6e5b7..3569f145092 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -18,6 +18,7 @@ #error "lock.h may not be included from frontend code" #endif +#include "lib/ilist.h" #include "storage/backendid.h" #include "storage/lockdefs.h" #include "storage/lwlock.h" @@ -28,7 +29,7 @@ typedef struct PGPROC PGPROC; typedef struct PROC_QUEUE { - SHM_QUEUE links; /* head of list of PGPROC objects */ + dlist_head links; /* list of PGPROC objects */ int size; /* number of entries in list */ } PROC_QUEUE; @@ -292,7 +293,7 @@ typedef struct LOCK /* data */ LOCKMASK grantMask; /* bitmask for lock types already granted */ LOCKMASK waitMask; /* bitmask for lock types awaited */ - SHM_QUEUE procLocks; /* list of PROCLOCK objects assoc. with lock */ + dlist_head procLocks; /* list of PROCLOCK objects assoc. with lock */ PROC_QUEUE waitProcs; /* list of PGPROC objects waiting on lock */ int requested[MAX_LOCKMODES]; /* counts of requested locks */ int nRequested; /* total of requested[] array */ @@ -353,8 +354,8 @@ typedef struct PROCLOCK PGPROC *groupLeader; /* proc's lock group leader, or proc itself */ LOCKMASK holdMask; /* bitmask for lock types currently held */ LOCKMASK releaseMask; /* bitmask for lock types to be released */ - SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */ - SHM_QUEUE procLink; /* list link in PGPROC's list of proclocks */ + dlist_node lockLink; /* list link in LOCK's list of proclocks */ + dlist_node procLink; /* list link in PGPROC's list of proclocks */ } PROCLOCK; #define PROCLOCK_LOCKMETHOD(proclock) \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index d21780108bb..2ba37f250de 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -95,8 +95,8 @@ struct XidCache struct PGPROC { /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ - SHM_QUEUE links; /* list link if process is in a list */ - PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ + dlist_node links; /* list link if process is in a list */ + dlist_head *procgloballist; /* procglobal list that owns this PGPROC */ PGSemaphore sem; /* ONE semaphore to sleep on */ int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */ @@ -157,7 +157,7 @@ struct PGPROC * linked into one of these lists, according to the partition number of * their lock. */ - SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS]; + dlist_head myProcLocks[NUM_LOCK_PARTITIONS]; struct XidCache subxids; /* cache for subtransaction XIDs */ @@ -250,13 +250,13 @@ typedef struct PROC_HDR /* Length of allProcs array */ uint32 allProcCount; /* Head of list of free PGPROC structures */ - PGPROC *freeProcs; + dlist_head freeProcs; /* Head of list of autovacuum's free PGPROC structures */ - PGPROC *autovacFreeProcs; + dlist_head autovacFreeProcs; /* Head of list of bgworker free PGPROC structures */ - PGPROC *bgworkerFreeProcs; + dlist_head bgworkerFreeProcs; /* Head of list of walsender free PGPROC structures */ - PGPROC *walsenderFreeProcs; + dlist_head walsenderFreeProcs; /* First pgproc waiting for group XID clear */ pg_atomic_uint32 procArrayGroupFirst; /* First pgproc waiting for group transaction status update */ @@ -318,7 +318,7 @@ extern void ProcReleaseLocks(bool isCommit); extern void ProcQueueInit(PROC_QUEUE *queue); extern int ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); -extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus); +extern void ProcWakeup(PGPROC *proc, int waitStatus); extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void CheckDeadLockAlert(void); extern bool IsWaitingForLock(void); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 5adf956f413..39d7e5463c1 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -459,7 +459,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, /* Initialize the PGPROC entry */ MemSet(proc, 0, sizeof(PGPROC)); proc->pgprocno = gxact->pgprocno; - SHMQueueElemInit(&(proc->links)); + dlist_node_init(&proc->links); proc->waitStatus = STATUS_OK; /* We set up the gxact's VXID as InvalidBackendId/XID */ proc->lxid = (LocalTransactionId) xid; @@ -478,7 +478,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, proc->waitLock = NULL; proc->waitProcLock = NULL; for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - SHMQueueInit(&(proc->myProcLocks[i])); + dlist_init(&proc->myProcLocks[i]); /* subxid data must be filled later by GXactLoadSubxactData */ pgxact->overflowed = false; pgxact->nxids = 0; diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index f8c5df08e69..ca2abea07f1 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -216,9 +216,6 @@ InitDeadLockChecking(void) DeadLockState DeadLockCheck(PGPROC *proc) { - int i, - j; - /* Initialize to "no constraints" */ nCurConstraints = 0; nPossibleConstraints = 0; @@ -246,7 +243,7 @@ DeadLockCheck(PGPROC *proc) } /* Apply any needed rearrangements of wait queues */ - for (i = 0; i < nWaitOrders; i++) + for (int i = 0; i < nWaitOrders; i++) { LOCK *lock = waitOrders[i].lock; PGPROC **procs = waitOrders[i].procs; @@ -261,9 +258,9 @@ DeadLockCheck(PGPROC *proc) /* Reset the queue and re-add procs in the desired order */ ProcQueueInit(waitQueue); - for (j = 0; j < nProcs; j++) + for (int j = 0; j < nProcs; j++) { - SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links)); + dlist_push_tail(&waitQueue->links, &(procs[j]->links)); waitQueue->size++; } @@ -545,8 +542,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc, PGPROC *proc; LOCK *lock = checkProc->waitLock; PGXACT *pgxact; - PROCLOCK *proclock; - SHM_QUEUE *procLocks; + dlist_iter proclock_iter; LockMethod lockMethodTable; PROC_QUEUE *waitQueue; int queue_size; @@ -563,13 +559,9 @@ FindLockCycleRecurseMember(PGPROC *checkProc, * Scan for procs that already hold conflicting locks. These are "hard" * edges in the waits-for graph. */ - procLocks = &(lock->procLocks); - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (proclock) + dlist_foreach(proclock_iter, &lock->procLocks) { + PROCLOCK *proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur); PGPROC *leader; proc = proclock->tag.myProc; @@ -629,9 +621,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc, } } } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* @@ -704,6 +693,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc, else { PGPROC *lastGroupMember = NULL; + dlist_iter proc_iter; /* Use the true lock wait queue order */ waitQueue = &(lock->waitProcs); @@ -719,13 +709,14 @@ FindLockCycleRecurseMember(PGPROC *checkProc, lastGroupMember = checkProc; else { - proc = (PGPROC *) waitQueue->links.next; - queue_size = waitQueue->size; - while (queue_size-- > 0) + dlist_iter iter; + + dlist_foreach(iter, &waitQueue->links) { + proc = dlist_container(PGPROC, links, iter.cur); + if (proc->lockGroupLeader == checkProcLeader) lastGroupMember = proc; - proc = (PGPROC *) proc->links.next; } Assert(lastGroupMember != NULL); } @@ -733,12 +724,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc, /* * OK, now rescan (or scan) the queue to identify the soft conflicts. */ - queue_size = waitQueue->size; - proc = (PGPROC *) waitQueue->links.next; - while (queue_size-- > 0) + dlist_foreach(proc_iter, &waitQueue->links) { PGPROC *leader; + proc = dlist_container(PGPROC, links, proc_iter.cur); + leader = proc->lockGroupLeader == NULL ? proc : proc->lockGroupLeader; @@ -772,8 +763,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc, return true; } } - - proc = (PGPROC *) proc->links.next; } } @@ -882,14 +871,16 @@ TopoSort(LOCK *lock, k, kk, last; + dlist_iter proc_iter; /* First, fill topoProcs[] array with the procs in their current order */ - proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < queue_size; i++) + i = 0; + dlist_foreach(proc_iter, &waitQueue->links) { - topoProcs[i] = proc; - proc = (PGPROC *) proc->links.next; + proc = dlist_container(PGPROC, links, proc_iter.cur); + topoProcs[i++] = proc; } + Assert(i == queue_size); /* * Scan the constraints, and for each proc in the array, generate a count diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 56dba09299d..01ac3c06c5e 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -1005,8 +1005,8 @@ LockAcquireExtended(const LOCKTAG *locktag, uint32 proclock_hashcode; proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); - SHMQueueDelete(&proclock->lockLink); - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->lockLink); + dlist_delete(&proclock->procLink); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, @@ -1141,7 +1141,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->procLocks)); + dlist_init(&lock->procLocks); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; @@ -1184,7 +1184,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -1217,9 +1217,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); - SHMQueueInsertBefore(&(proc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&lock->procLocks, &proclock->lockLink); + dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else @@ -1349,9 +1348,8 @@ LockCheckConflicts(LockMethod lockMethodTable, int conflictMask = lockMethodTable->conflictTab[lockmode]; int conflictsRemaining[MAX_LOCKMODES]; int totalConflictsRemaining = 0; + dlist_iter proclock_iter; int i; - SHM_QUEUE *procLocks; - PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, @@ -1411,11 +1409,11 @@ LockCheckConflicts(LockMethod lockMethodTable, * shared memory state more complex (and larger) but it doesn't seem worth * it. */ - procLocks = &(lock->procLocks); - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); - while (otherproclock != NULL) + dlist_foreach(proclock_iter, &lock->procLocks) { + PROCLOCK *otherproclock = + dlist_container(PROCLOCK, lockLink, proclock_iter.cur); + if (proclock != otherproclock && proclock->groupLeader == otherproclock->groupLeader && (otherproclock->holdMask & conflictMask) != 0) @@ -1440,9 +1438,6 @@ LockCheckConflicts(LockMethod lockMethodTable, return false; } } - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, &otherproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* Nope, it's a real conflict. */ @@ -1555,8 +1550,8 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock, uint32 proclock_hashcode; PROCLOCK_PRINT("CleanUpLock: deleting", proclock); - SHMQueueDelete(&proclock->lockLink); - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->lockLink); + dlist_delete(&proclock->procLink); proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), @@ -1573,7 +1568,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock, * object. */ LOCK_PRINT("CleanUpLock: deleting", lock, 0); - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&lock->procLocks)); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -1837,7 +1832,7 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); /* Remove proc from lock's wait queue */ - SHMQueueDelete(&(proc->links)); + dlist_delete(&proc->links); waitLock->waitProcs.size--; /* Undo increments of request counts by waiting process */ @@ -2092,7 +2087,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) numLockModes; LOCALLOCK *locallock; LOCK *lock; - PROCLOCK *proclock; int partition; bool have_fast_path_lwlock = false; @@ -2249,8 +2243,8 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; - SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); - PROCLOCK *nextplock; + dlist_head *procLocks = &MyProc->myProcLocks[partition]; + dlist_mutable_iter proclock_iter; partitionLock = LockHashPartitionLockByIndex(partition); @@ -2273,24 +2267,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) * locallock situation, we lose that guarantee for fast-path locks. * This is not ideal. */ - if (SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)) == NULL) + if (dlist_is_empty(procLocks)) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - proclock; - proclock = nextplock) + dlist_foreach_modify(proclock_iter, procLocks) { + PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur); bool wakeupNeeded = false; - /* Get link first, since we may unlink/delete this proclock */ - nextplock = (PROCLOCK *) - SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); - Assert(proclock->tag.myProc == MyProc); lock = proclock->tag.myLock; @@ -2823,7 +2809,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) LockMethod lockMethodTable; LOCK *lock; LOCKMASK conflictMask; - SHM_QUEUE *procLocks; + dlist_iter proclock_iter; PROCLOCK *proclock; uint32 hashcode; LWLock *partitionLock; @@ -2971,14 +2957,10 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) /* * Examine each existing holder (or awaiter) of the lock. */ - - procLocks = &(lock->procLocks); - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (proclock) + dlist_foreach(proclock_iter, &lock->procLocks) { + proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur); + if (conflictMask & proclock->holdMask) { PGPROC *proc = proclock->tag.myProc; @@ -3008,9 +2990,6 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) } } } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); @@ -3328,8 +3307,8 @@ PostPrepare_Locks(TransactionId xid) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; - SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); - PROCLOCK *nextplock; + dlist_head *procLocks = &(MyProc->myProcLocks[partition]); + dlist_mutable_iter proclock_iter; partitionLock = LockHashPartitionLockByIndex(partition); @@ -3341,21 +3320,14 @@ PostPrepare_Locks(TransactionId xid) * another backend is adding something to our lists now. For safety, * though, we code this the same way as in LockReleaseAll. */ - if (SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)) == NULL) + if (dlist_is_empty(procLocks)) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - proclock; - proclock = nextplock) + dlist_foreach_modify(proclock_iter, procLocks) { - /* Get link first, since we may unlink/relink this proclock */ - nextplock = (PROCLOCK *) - SHMQueueNext(procLocks, &proclock->procLink, - offsetof(PROCLOCK, procLink)); + proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur); Assert(proclock->tag.myProc == MyProc); @@ -3393,7 +3365,7 @@ PostPrepare_Locks(TransactionId xid) * same hash partition, cf proclock_hash(). So the partition lock * we already hold is sufficient for this. */ - SHMQueueDelete(&proclock->procLink); + dlist_delete(&proclock->procLink); /* * Create the new hash key for the proclock. @@ -3419,8 +3391,7 @@ PostPrepare_Locks(TransactionId xid) elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks"); /* Re-link into the new proc's proclock list */ - SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&newproc->myProcLocks[partition], &proclock->procLink); PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); } /* loop over PROCLOCKs within this partition */ @@ -3741,12 +3712,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) { LOCK *theLock = blocked_proc->waitLock; BlockedProcData *bproc; - SHM_QUEUE *procLocks; - PROCLOCK *proclock; + dlist_iter proclock_iter; + dlist_iter proc_iter; PROC_QUEUE *waitQueue; - PGPROC *proc; int queue_size; - int i; /* Nothing to do if this proc is not blocked */ if (theLock == NULL) @@ -3764,11 +3733,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) */ /* Collect all PROCLOCKs associated with theLock */ - procLocks = &(theLock->procLocks); - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - while (proclock) + dlist_foreach(proclock_iter, &theLock->procLocks) { + PROCLOCK *proclock = + dlist_container(PROCLOCK, lockLink, proclock_iter.cur); PGPROC *proc = proclock->tag.myProc; LOCK *lock = proclock->tag.myLock; LockInstanceData *instance; @@ -3793,9 +3761,6 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) instance->leaderPid = proclock->groupLeader->pid; instance->fastpath = false; data->nlocks++; - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ @@ -3811,9 +3776,9 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) } /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */ - proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < queue_size; i++) + dlist_foreach(proc_iter, &waitQueue->links) { + PGPROC *proc = dlist_container(PGPROC, links, proc_iter.cur); if (proc == blocked_proc) break; data->waiter_pids[data->npids++] = proc->pid; @@ -4090,7 +4055,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->procLocks)); + dlist_init(&lock->procLocks); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; @@ -4133,7 +4098,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ - Assert(SHMQueueEmpty(&(lock->procLocks))); + Assert(dlist_is_empty(&lock->procLocks)); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, @@ -4158,9 +4123,9 @@ lock_twophase_recover(TransactionId xid, uint16 info, proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); - SHMQueueInsertBefore(&(proc->myProcLocks[partition]), - &proclock->procLink); + dlist_push_tail(&lock->procLocks, &proclock->lockLink); + dlist_push_tail(&proc->myProcLocks[partition], + &proclock->procLink); PROCLOCK_PRINT("lock_twophase_recover: new", proclock); } else diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index eb321f72ea4..5a157fc07d8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -178,10 +178,10 @@ InitProcGlobal(void) * Initialize the data structures. */ ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; - ProcGlobal->freeProcs = NULL; - ProcGlobal->autovacFreeProcs = NULL; - ProcGlobal->bgworkerFreeProcs = NULL; - ProcGlobal->walsenderFreeProcs = NULL; + dlist_init(&ProcGlobal->freeProcs); + dlist_init(&ProcGlobal->autovacFreeProcs); + dlist_init(&ProcGlobal->bgworkerFreeProcs); + dlist_init(&ProcGlobal->walsenderFreeProcs); ProcGlobal->startupProc = NULL; ProcGlobal->startupProcPid = 0; ProcGlobal->startupBufferPinWaitBufId = -1; @@ -218,6 +218,8 @@ InitProcGlobal(void) for (i = 0; i < TotalProcs; i++) { + PGPROC *proc = &procs[i]; + /* Common initialization for all PGPROCs, regardless of type. */ /* @@ -227,11 +229,11 @@ InitProcGlobal(void) */ if (i < MaxBackends + NUM_AUXILIARY_PROCS) { - procs[i].sem = PGSemaphoreCreate(); - InitSharedLatch(&(procs[i].procLatch)); - LWLockInitialize(&(procs[i].backendLock), LWTRANCHE_PROC); + proc->sem = PGSemaphoreCreate(); + InitSharedLatch(&(proc->procLatch)); + LWLockInitialize(&(proc->backendLock), LWTRANCHE_PROC); } - procs[i].pgprocno = i; + proc->pgprocno = i; /* * Newly created PGPROCs for normal backends, autovacuum and bgworkers @@ -244,45 +246,41 @@ InitProcGlobal(void) if (i < MaxConnections) { /* PGPROC for normal backend, add to freeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; - ProcGlobal->freeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->freeProcs; + dlist_push_head(&ProcGlobal->freeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->freeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; - ProcGlobal->autovacFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->autovacFreeProcs; + dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->autovacFreeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes) { /* PGPROC for bgworker, add to bgworkerFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; - ProcGlobal->bgworkerFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs; + dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->bgworkerFreeProcs; } else if (i < MaxBackends) { /* PGPROC for walsender, add to walsenderFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs; - ProcGlobal->walsenderFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs; + dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->walsenderFreeProcs; } /* Initialize myProcLocks[] shared memory queues. */ for (j = 0; j < NUM_LOCK_PARTITIONS; j++) - SHMQueueInit(&(procs[i].myProcLocks[j])); + dlist_init(&(proc->myProcLocks[j])); /* Initialize lockGroupMembers list. */ - dlist_init(&procs[i].lockGroupMembers); + dlist_init(&proc->lockGroupMembers); /* * Initialize the atomic variables, otherwise, it won't be safe to * access them for backends that aren't currently in use. */ - pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO); - pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u32(&(proc->procArrayGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u32(&(proc->clogGroupNext), INVALID_PGPROCNO); } /* @@ -303,7 +301,7 @@ InitProcGlobal(void) void InitProcess(void) { - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; /* * ProcGlobal should be set up already (if we are a backend, we inherit @@ -336,11 +334,9 @@ InitProcess(void) set_spins_per_delay(ProcGlobal->spins_per_delay); - MyProc = *procgloballist; - - if (MyProc != NULL) + if (!dlist_is_empty(procgloballist)) { - *procgloballist = (PGPROC *) MyProc->links.next; + MyProc = (PGPROC*) dlist_pop_head_node(procgloballist); SpinLockRelease(ProcStructLock); } else @@ -382,7 +378,7 @@ InitProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -411,7 +407,7 @@ InitProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif MyProc->recoveryConflictPending = false; @@ -566,7 +562,7 @@ InitAuxiliaryProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -590,7 +586,7 @@ InitAuxiliaryProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -670,16 +666,15 @@ GetStartupBufferPinWaitBufId(void) bool HaveNFreeProcs(int n) { - PGPROC *proc; + dlist_iter iter; SpinLockAcquire(ProcStructLock); - proc = ProcGlobal->freeProcs; - - while (n > 0 && proc != NULL) + dlist_foreach(iter, &ProcGlobal->freeProcs) { - proc = (PGPROC *) proc->links.next; n--; + if (n == 0) + break; } SpinLockRelease(ProcStructLock); @@ -742,7 +737,7 @@ LockErrorCleanup(void) partitionLock = LockHashPartitionLock(lockAwaited->hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); - if (MyProc->links.next != NULL) + if (!dlist_node_is_detached(&MyProc->links)) { /* We could not have been granted the lock yet */ RemoveFromWaitQueue(MyProc, lockAwaited->hashcode); @@ -815,7 +810,7 @@ static void ProcKill(int code, Datum arg) { PGPROC *proc; - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; Assert(MyProc != NULL); @@ -828,7 +823,7 @@ ProcKill(int code, Datum arg) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -851,7 +846,7 @@ ProcKill(int code, Datum arg) /* * Detach from any lock group of which we are a member. If the leader - * exist before all other group members, its PGPROC will remain allocated + * exits before all other group members, its PGPROC will remain allocated * until the last group process exits; that process must return the * leader's PGPROC to the appropriate list. */ @@ -872,8 +867,7 @@ ProcKill(int code, Datum arg) /* Leader exited first; return its PGPROC. */ SpinLockAcquire(ProcStructLock); - leader->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = leader; + dlist_push_head(procgloballist, &leader->links); SpinLockRelease(ProcStructLock); } } @@ -907,8 +901,7 @@ ProcKill(int code, Datum arg) Assert(dlist_is_empty(&proc->lockGroupMembers)); /* Return PGPROC structure (and semaphore) to appropriate freelist */ - proc->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = proc; + dlist_push_tail(procgloballist, &proc->links); } /* Update shared estimate of spins_per_delay */ @@ -1037,7 +1030,7 @@ ProcQueueAlloc(const char *name) void ProcQueueInit(PROC_QUEUE *queue) { - SHMQueueInit(&(queue->links)); + dlist_init(&queue->links); queue->size = 0; } @@ -1068,12 +1061,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LWLock *partitionLock = LockHashPartitionLock(hashcode); PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMASK myHeldLocks = MyProc->heldLocks; + PGPROC *insert_before = NULL; bool early_deadlock = false; bool allow_autovacuum_cancel = true; int myWaitStatus; - PGPROC *proc; PGPROC *leader = MyProc->lockGroupLeader; - int i; /* * If group locking is in use, locks held by members of my locking group @@ -1081,18 +1073,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (leader != NULL) { - SHM_QUEUE *procLocks = &(lock->procLocks); - PROCLOCK *otherproclock; + dlist_iter iter; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); - while (otherproclock != NULL) + dlist_foreach(iter, &lock->procLocks) { + PROCLOCK *otherproclock; + + otherproclock = dlist_container(PROCLOCK, lockLink, iter.cur); + if (otherproclock->groupLeader == leader) myHeldLocks |= otherproclock->holdMask; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, &otherproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } } @@ -1116,20 +1106,23 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) if (myHeldLocks != 0) { LOCKMASK aheadRequests = 0; + dlist_iter iter; - proc = (PGPROC *) waitQueue->links.next; - for (i = 0; i < waitQueue->size; i++) + // FIXME: Shouldn't we just use the correct offset math? + StaticAssertStmt(offsetof(PGPROC, links) == 0, "odd casting"); + + dlist_foreach(iter, &waitQueue->links) { + PGPROC *proc = dlist_container(PGPROC, links, iter.cur); + /* * If we're part of the same locking group as this waiter, its * locks neither conflict with ours nor contribute to * aheadRequests. */ if (leader != NULL && leader == proc->lockGroupLeader) - { - proc = (PGPROC *) proc->links.next; continue; - } + /* Must he wait for me? */ if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) { @@ -1157,29 +1150,25 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) GrantAwaitedLock(); return STATUS_OK; } - /* Break out of loop to put myself before him */ + + /* Put myself into wait queue before conflicting process */ + insert_before = proc; break; } /* Nope, so advance to next waiter */ aheadRequests |= LOCKBIT_ON(proc->waitLockMode); - proc = (PGPROC *) proc->links.next; } - - /* - * If we fall out of loop normally, proc points to waitQueue head, so - * we will insert at tail of queue as desired. - */ - } - else - { - /* I hold no locks, so I can't push in front of anyone. */ - proc = (PGPROC *) &(waitQueue->links); } /* - * Insert self into queue, ahead of the given proc (or at tail of queue). + * Insert self into queue, ahead of the determined proc or at the tail. */ - SHMQueueInsertBefore(&(proc->links), &(MyProc->links)); + if (insert_before) + dlist_insert_before(&insert_before->links, &MyProc->links); + else + dlist_push_tail(&waitQueue->links, &MyProc->links); + + waitQueue->size++; lock->waitMask |= LOCKBIT_ON(lockmode); @@ -1384,7 +1373,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) long secs; int usecs; long msecs; - SHM_QUEUE *procLocks; + dlist_iter proc_iter; PROCLOCK *proclock; bool first_holder = true, first_waiter = true; @@ -1414,12 +1403,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LWLockAcquire(partitionLock, LW_SHARED); - procLocks = &(lock->procLocks); - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (proclock) + dlist_foreach(proc_iter, &lock->procLocks) { + proclock = + dlist_container(PROCLOCK, lockLink, proc_iter.cur); + /* * we are a waiter if myProc->waitProcLock == proclock; we are * a holder if it is NULL or something different @@ -1450,9 +1438,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) lockHoldersNum++; } - - proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, - offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); @@ -1577,7 +1562,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * ProcWakeup -- wake up a process by setting its latch. * * Also remove the process from the wait queue and set its links invalid. - * RETURN: the next process in the wait queue. * * The appropriate lock partition lock must be held by caller. * @@ -1586,22 +1570,17 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * to twiddle the lock's request counts too --- see RemoveFromWaitQueue. * Hence, in practice the waitStatus parameter must be STATUS_OK. */ -PGPROC * +void ProcWakeup(PGPROC *proc, int waitStatus) { - PGPROC *retProc; - /* Proc should be sleeping ... */ - if (proc->links.prev == NULL || - proc->links.next == NULL) - return NULL; + if (dlist_node_is_detached(&proc->links)) + return; + Assert(proc->waitStatus == STATUS_WAITING); - /* Save next process before we zap the list link */ - retProc = (PGPROC *) proc->links.next; - /* Remove process from wait queue */ - SHMQueueDelete(&(proc->links)); + dlist_delete_thoroughly(&proc->links); (proc->waitLock->waitProcs.size)--; /* Clean up process' state and pass it the ok/fail signal */ @@ -1611,8 +1590,6 @@ ProcWakeup(PGPROC *proc, int waitStatus) /* And awaken it */ SetLatch(&proc->procLatch); - - return retProc; } /* @@ -1626,19 +1603,17 @@ void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; - PGPROC *proc; LOCKMASK aheadRequests = 0; + dlist_mutable_iter miter; - Assert(queue_size >= 0); + Assert(waitQueue->size >= 0); - if (queue_size == 0) + if (waitQueue->size == 0) return; - proc = (PGPROC *) waitQueue->links.next; - - while (queue_size-- > 0) + dlist_foreach_modify(miter, &waitQueue->links) { + PGPROC *proc = dlist_container(PGPROC, links, miter.cur); LOCKMODE lockmode = proc->waitLockMode; /* @@ -1651,7 +1626,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { /* OK to waken */ GrantLock(lock, proc->waitProcLock, lockmode); - proc = ProcWakeup(proc, STATUS_OK); + ProcWakeup(proc, STATUS_OK); /* * ProcWakeup removes proc from the lock's waiting process queue @@ -1661,11 +1636,8 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) } else { - /* - * Cannot wake this guy. Remember his request for later checks. - */ + /* Lock conflicts: don't wake, but remember for later checks. */ aheadRequests |= LOCKBIT_ON(lockmode); - proc = (PGPROC *) proc->links.next; } } -- 2.25.0.114.g5b0ca878e0
>From 08048c5fc255c70d8306bd573ccb827ea320f481 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 12:33:09 -0800 Subject: [PATCH v1 3/6] Use dlist for syncrep queue. --- src/include/replication/walsender_private.h | 3 +- src/include/storage/proc.h | 2 +- src/backend/replication/syncrep.c | 89 +++++++++------------ src/backend/replication/walsender.c | 2 +- src/backend/storage/lmgr/proc.c | 2 +- 5 files changed, 41 insertions(+), 57 deletions(-) diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a47..c2f946c5c48 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -13,6 +13,7 @@ #define _WALSENDER_PRIVATE_H #include "access/xlog.h" +#include "lib/ilist.h" #include "nodes/nodes.h" #include "replication/syncrep.h" #include "storage/latch.h" @@ -96,7 +97,7 @@ typedef struct * Synchronous replication queue with one queue per request type. * Protected by SyncRepLock. */ - SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; + dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; /* * Current location of the head of the queue. All waiters should have a diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 2ba37f250de..de222f96681 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -150,7 +150,7 @@ struct PGPROC */ XLogRecPtr waitLSN; /* waiting for this LSN or higher */ int syncRepState; /* wait state for sync rep */ - SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + dlist_node syncRepLinks; /* list link if process is in syncrep queue */ /* * All PROCLOCK objects for locks held or awaited by this backend are diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index c284103b548..402d3e41fc6 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -168,7 +168,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) if (!SyncRepRequested()) return; - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); Assert(WalSndCtl != NULL); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); @@ -304,7 +304,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) * assertions, but better safe than sorry). */ pg_read_barrier(); - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->waitLSN = 0; @@ -325,31 +325,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) static void SyncRepQueueInsert(int mode) { - PGPROC *proc; + dlist_head *queue; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); + queue = &WalSndCtl->SyncRepQueue[mode]; - while (proc) + dlist_reverse_foreach(iter, queue) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* - * Stop at the queue element that we should after to ensure the queue - * is ordered by LSN. + * Stop at the queue element that we should insert after to ensure the + * queue is ordered by LSN. */ if (proc->waitLSN < MyProc->waitLSN) - break; - - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); + { + dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks); + return; + } } - if (proc) - SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); - else - SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); + /* + * If we get here, the list was either empty, or this process needs to be + * at the head. + */ + dlist_push_head(queue, &MyProc->syncRepLinks); } /* @@ -359,8 +360,8 @@ static void SyncRepCancelWait(void) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); MyProc->syncRepState = SYNC_REP_NOT_WAITING; LWLockRelease(SyncRepLock); } @@ -372,13 +373,13 @@ SyncRepCleanupAtProcExit(void) * First check if we are removed from the queue without the lock to not * slow down backend exit. */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* maybe we have just been removed, so recheck */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); LWLockRelease(SyncRepLock); } @@ -1011,20 +1012,17 @@ static int SyncRepWakeQueue(bool all, int mode) { volatile WalSndCtlData *walsndctl = WalSndCtl; - PGPROC *proc = NULL; - PGPROC *thisproc = NULL; int numprocs = 0; + dlist_mutable_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); Assert(SyncRepQueueIsOrderedByLSN(mode)); - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Assume the queue is ordered by LSN */ @@ -1032,18 +1030,9 @@ SyncRepWakeQueue(bool all, int mode) return numprocs; /* - * Move to next proc, so we can delete thisproc from the queue. - * thisproc is valid, proc may be NULL after this. + * Remove from queue. */ - thisproc = proc; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); - - /* - * Remove thisproc from queue. - */ - SHMQueueDelete(&(thisproc->syncRepLinks)); + dlist_delete_thoroughly(&proc->syncRepLinks); /* * SyncRepWaitForLSN() reads syncRepState without holding the lock, so @@ -1056,12 +1045,12 @@ SyncRepWakeQueue(bool all, int mode) * Set state to complete; see SyncRepWaitForLSN() for discussion of * the various states. */ - thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; + proc->syncRepState = SYNC_REP_WAIT_COMPLETE; /* * Wake only when we have set state and removed from queue. */ - SetLatch(&(thisproc->procLatch)); + SetLatch(&(proc->procLatch)); numprocs++; } @@ -1115,19 +1104,17 @@ SyncRepUpdateSyncStandbysDefined(void) static bool SyncRepQueueIsOrderedByLSN(int mode) { - PGPROC *proc = NULL; XLogRecPtr lastLSN; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); lastLSN = 0; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Check the queue is ordered by LSN and that multiple procs don't * have matching LSNs @@ -1136,10 +1123,6 @@ SyncRepQueueIsOrderedByLSN(int mode) return false; lastLSN = proc->waitLSN; - - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); } return true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index abb533b9d03..2d2a0ceb503 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3021,7 +3021,7 @@ WalSndShmemInit(void) MemSet(WalSndCtl, 0, WalSndShmemSize()); for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); + dlist_init(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 5a157fc07d8..a4c338d7aff 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -415,7 +415,7 @@ InitProcess(void) /* Initialize fields for sync rep */ MyProc->waitLSN = 0; MyProc->syncRepState = SYNC_REP_NOT_WAITING; - SHMQueueElemInit(&(MyProc->syncRepLinks)); + dlist_node_init(&MyProc->syncRepLinks); /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; -- 2.25.0.114.g5b0ca878e0
>From df78ed14d4f56576d3ce67134b8a5c22b8b9c21f Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 15:19:37 -0800 Subject: [PATCH v1 4/6] Use dlists for predicate locking. --- src/include/storage/predicate_internals.h | 49 +- src/backend/storage/lmgr/predicate.c | 692 +++++++--------------- 2 files changed, 241 insertions(+), 500 deletions(-) diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 591ac6f42aa..4d1b0e0ffa0 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -14,6 +14,7 @@ #ifndef PREDICATE_INTERNALS_H #define PREDICATE_INTERNALS_H +#include "lib/ilist.h" #include "storage/lock.h" #include "storage/lwlock.h" @@ -84,14 +85,16 @@ typedef struct SERIALIZABLEXACT SerCommitSeqNo lastCommitBeforeSnapshot; /* when not committed or * no conflict out */ } SeqNo; - SHM_QUEUE outConflicts; /* list of write transactions whose data we + dlist_head outConflicts; /* list of write transactions whose data we * couldn't read. */ - SHM_QUEUE inConflicts; /* list of read transactions which couldn't + dlist_head inConflicts; /* list of read transactions which couldn't * see our write. */ - SHM_QUEUE predicateLocks; /* list of associated PREDICATELOCK objects */ - SHM_QUEUE finishedLink; /* list link in + dlist_head predicateLocks; /* list of associated PREDICATELOCK objects */ + dlist_node finishedLink; /* list link in * FinishedSerializableTransactions */ + dlist_node xactLink; /* PredXact->activeList/availableList */ + LWLock predicateLockListLock; /* protects predicateLocks in parallel * mode */ @@ -99,7 +102,7 @@ typedef struct SERIALIZABLEXACT * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions */ - SHM_QUEUE possibleUnsafeConflicts; + dlist_head possibleUnsafeConflicts; TransactionId topXid; /* top level xid for the transaction, if one * exists; else invalid */ @@ -134,28 +137,10 @@ typedef struct SERIALIZABLEXACT */ #define SXACT_FLAG_PARTIALLY_RELEASED 0x00000800 -/* - * The following types are used to provide an ad hoc list for holding - * SERIALIZABLEXACT objects. An HTAB is overkill, since there is no need to - * access these by key -- there are direct pointers to these objects where - * needed. If a shared memory list is created, these types can probably be - * eliminated in favor of using the general solution. - */ -typedef struct PredXactListElementData -{ - SHM_QUEUE link; - SERIALIZABLEXACT sxact; -} PredXactListElementData; - -typedef struct PredXactListElementData *PredXactListElement; - -#define PredXactListElementDataSize \ - ((Size)MAXALIGN(sizeof(PredXactListElementData))) - typedef struct PredXactListData { - SHM_QUEUE availableList; - SHM_QUEUE activeList; + dlist_head availableList; + dlist_head activeList; /* * These global variables are maintained when registering and cleaning up @@ -182,7 +167,7 @@ typedef struct PredXactListData * seq no */ SERIALIZABLEXACT *OldCommittedSxact; /* shared copy of dummy sxact */ - PredXactListElement element; + SERIALIZABLEXACT *element; } PredXactListData; typedef struct PredXactListData *PredXactList; @@ -203,8 +188,8 @@ typedef struct PredXactListData *PredXactList; */ typedef struct RWConflictData { - SHM_QUEUE outLink; /* link for list of conflicts out from a sxact */ - SHM_QUEUE inLink; /* link for list of conflicts in to a sxact */ + dlist_node outLink; /* link for list of conflicts out from a sxact */ + dlist_node inLink; /* link for list of conflicts in to a sxact */ SERIALIZABLEXACT *sxactOut; SERIALIZABLEXACT *sxactIn; } RWConflictData; @@ -216,7 +201,7 @@ typedef struct RWConflictData *RWConflict; typedef struct RWConflictPoolHeaderData { - SHM_QUEUE availableList; + dlist_head availableList; RWConflict element; } RWConflictPoolHeaderData; @@ -298,7 +283,7 @@ typedef struct PREDICATELOCKTARGET PREDICATELOCKTARGETTAG tag; /* unique identifier of lockable object */ /* data */ - SHM_QUEUE predicateLocks; /* list of PREDICATELOCK objects assoc. with + dlist_head predicateLocks; /* list of PREDICATELOCK objects assoc. with * predicate lock target */ } PREDICATELOCKTARGET; @@ -331,9 +316,9 @@ typedef struct PREDICATELOCK PREDICATELOCKTAG tag; /* unique identifier of lock */ /* data */ - SHM_QUEUE targetLink; /* list link in PREDICATELOCKTARGET's list of + dlist_node targetLink; /* list link in PREDICATELOCKTARGET's list of * predicate locks */ - SHM_QUEUE xactLink; /* list link in SERIALIZABLEXACT's list of + dlist_node xactLink; /* list link in SERIALIZABLEXACT's list of * predicate locks */ SerCommitSeqNo commitSeqNo; /* only used for summarized predicate locks */ } PREDICATELOCK; diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 654584b77af..695aeb1425a 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -259,7 +259,7 @@ #define NPREDICATELOCKTARGETENTS() \ mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) -#define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink))) +#define SxactIsOnFinishedList(sxact) (!dlist_node_is_detached(&(sxact)->finishedLink)) /* * Note that a sxact is marked "prepared" once it has passed @@ -391,7 +391,7 @@ static RWConflictPoolHeader RWConflictPool; static HTAB *SerializableXidHash; static HTAB *PredicateLockTargetHash; static HTAB *PredicateLockHash; -static SHM_QUEUE *FinishedSerializableTransactions; +static dlist_head *FinishedSerializableTransactions; /* * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing @@ -429,8 +429,6 @@ static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact; static SERIALIZABLEXACT *CreatePredXact(void); static void ReleasePredXact(SERIALIZABLEXACT *sxact); -static SERIALIZABLEXACT *FirstPredXact(void); -static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact); static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer); static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer); @@ -579,69 +577,24 @@ SerializationNeededForWrite(Relation relation) static SERIALIZABLEXACT * CreatePredXact(void) { - PredXactListElement ptle; + SERIALIZABLEXACT *sxact; - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->availableList, - &PredXact->availableList, - offsetof(PredXactListElementData, link)); - if (!ptle) + if (dlist_is_empty(&PredXact->availableList)) return NULL; - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->activeList, &ptle->link); - return &ptle->sxact; + sxact = dlist_container(SERIALIZABLEXACT, xactLink, + dlist_pop_head_node(&PredXact->availableList)); + dlist_push_tail(&PredXact->activeList, &sxact->xactLink); + return sxact; } static void ReleasePredXact(SERIALIZABLEXACT *sxact) { - PredXactListElement ptle; - Assert(ShmemAddrIsValid(sxact)); - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - SHMQueueDelete(&ptle->link); - SHMQueueInsertBefore(&PredXact->availableList, &ptle->link); -} - -static SERIALIZABLEXACT * -FirstPredXact(void) -{ - PredXactListElement ptle; - - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &PredXact->activeList, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; -} - -static SERIALIZABLEXACT * -NextPredXact(SERIALIZABLEXACT *sxact) -{ - PredXactListElement ptle; - - Assert(ShmemAddrIsValid(sxact)); - - ptle = (PredXactListElement) - (((char *) sxact) - - offsetof(PredXactListElementData, sxact) - + offsetof(PredXactListElementData, link)); - ptle = (PredXactListElement) - SHMQueueNext(&PredXact->activeList, - &ptle->link, - offsetof(PredXactListElementData, link)); - if (!ptle) - return NULL; - - return &ptle->sxact; + dlist_delete(&sxact->xactLink); + dlist_push_tail(&PredXact->availableList, &sxact->xactLink); } /*------------------------------------------------------------------------*/ @@ -652,30 +605,25 @@ NextPredXact(SERIALIZABLEXACT *sxact) static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer) { - RWConflict conflict; + dlist_iter iter; Assert(reader != writer); /* Check the ends of the purported conflict first. */ if (SxactIsDoomed(reader) || SxactIsDoomed(writer) - || SHMQueueEmpty(&reader->outConflicts) - || SHMQueueEmpty(&writer->inConflicts)) + || dlist_is_empty(&reader->outConflicts) + || dlist_is_empty(&writer->inConflicts)) return false; /* A conflict is possible; walk the list to find out. */ - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &reader->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); + if (conflict->sxactIn == writer) return true; - conflict = (RWConflict) - SHMQueueNext(&reader->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } /* No conflict found. */ @@ -690,22 +638,19 @@ SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) Assert(reader != writer); Assert(!RWConflictExists(reader, writer)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = reader; conflict->sxactIn = writer; - SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink); - SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink); + dlist_push_tail(&reader->outConflicts, &conflict->outLink); + dlist_push_tail(&writer->inConflicts, &conflict->inLink); } static void @@ -718,39 +663,33 @@ SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, Assert(SxactIsReadOnly(roXact)); Assert(!SxactIsReadOnly(activeXact)); - conflict = (RWConflict) - SHMQueueNext(&RWConflictPool->availableList, - &RWConflictPool->availableList, - offsetof(RWConflictData, outLink)); - if (!conflict) + if (dlist_is_empty(&RWConflictPool->availableList)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"), errhint("You might need to run fewer transactions at a time or increase max_connections."))); - SHMQueueDelete(&conflict->outLink); + conflict = dlist_head_element(RWConflictData, outLink, &RWConflictPool->availableList); + dlist_delete(&conflict->outLink); conflict->sxactOut = activeXact; conflict->sxactIn = roXact; - SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts, - &conflict->outLink); - SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts, - &conflict->inLink); + dlist_push_tail(&activeXact->possibleUnsafeConflicts, &conflict->outLink); + dlist_push_tail(&roXact->possibleUnsafeConflicts, &conflict->inLink); } static void ReleaseRWConflict(RWConflict conflict) { - SHMQueueDelete(&conflict->inLink); - SHMQueueDelete(&conflict->outLink); - SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink); + dlist_delete(&conflict->inLink); + dlist_delete(&conflict->outLink); + dlist_push_tail(&RWConflictPool->availableList, &conflict->outLink); } static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact) { - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(SxactIsReadOnly(sxact)); Assert(!SxactIsROSafe(sxact)); @@ -761,23 +700,15 @@ FlagSxactUnsafe(SERIALIZABLEXACT *sxact) * We know this isn't a safe snapshot, so we can stop looking for other * potential conflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(conflict->sxactOut)); Assert(sxact == conflict->sxactIn); ReleaseRWConflict(conflict); - - conflict = nextConflict; } } @@ -1171,8 +1102,8 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&PredXact->availableList); - SHMQueueInit(&PredXact->activeList); + dlist_init(&PredXact->availableList); + dlist_init(&PredXact->activeList); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; PredXact->WritableSxactCount = 0; @@ -1180,27 +1111,26 @@ InitPredicateLocks(void) PredXact->CanPartialClearThrough = 0; PredXact->HavePartialClearedThrough = 0; requestSize = mul_size((Size) max_table_size, - PredXactListElementDataSize); + sizeof(SERIALIZABLEXACT)); PredXact->element = ShmemAlloc(requestSize); /* Add all elements to available list, clean. */ memset(PredXact->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock, + LWLockInitialize(&PredXact->element[i].predicateLockListLock, LWTRANCHE_SXACT); - SHMQueueInsertBefore(&(PredXact->availableList), - &(PredXact->element[i].link)); + dlist_push_tail(&PredXact->availableList, &PredXact->element[i].xactLink); } PredXact->OldCommittedSxact = CreatePredXact(); SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid); PredXact->OldCommittedSxact->prepareSeqNo = 0; PredXact->OldCommittedSxact->commitSeqNo = 0; PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0; - SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts); - SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks); - SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink); - SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); + dlist_init(&PredXact->OldCommittedSxact->outConflicts); + dlist_init(&PredXact->OldCommittedSxact->inConflicts); + dlist_init(&PredXact->OldCommittedSxact->predicateLocks); + dlist_node_init(&PredXact->OldCommittedSxact->finishedLink); + dlist_init(&PredXact->OldCommittedSxact->possibleUnsafeConflicts); PredXact->OldCommittedSxact->topXid = InvalidTransactionId; PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId; PredXact->OldCommittedSxact->xmin = InvalidTransactionId; @@ -1246,7 +1176,7 @@ InitPredicateLocks(void) { int i; - SHMQueueInit(&RWConflictPool->availableList); + dlist_init(&RWConflictPool->availableList); requestSize = mul_size((Size) max_table_size, RWConflictDataSize); RWConflictPool->element = ShmemAlloc(requestSize); @@ -1254,8 +1184,8 @@ InitPredicateLocks(void) memset(RWConflictPool->element, 0, requestSize); for (i = 0; i < max_table_size; i++) { - SHMQueueInsertBefore(&(RWConflictPool->availableList), - &(RWConflictPool->element[i].outLink)); + dlist_push_tail(&RWConflictPool->availableList, + &RWConflictPool->element[i].outLink); } } @@ -1263,13 +1193,13 @@ InitPredicateLocks(void) * Create or attach to the header for the list of finished serializable * transactions. */ - FinishedSerializableTransactions = (SHM_QUEUE *) + FinishedSerializableTransactions = (dlist_head *) ShmemInitStruct("FinishedSerializableTransactions", - sizeof(SHM_QUEUE), + sizeof(dlist_head), &found); Assert(found == IsUnderPostmaster); if (!found) - SHMQueueInit(FinishedSerializableTransactions); + dlist_init(FinishedSerializableTransactions); /* * Initialize the SLRU storage for old committed serializable @@ -1308,7 +1238,7 @@ PredicateLockShmemSize(void) max_table_size *= 10; size = add_size(size, PredXactListDataSize); size = add_size(size, mul_size((Size) max_table_size, - PredXactListElementDataSize)); + sizeof(SERIALIZABLEXACT))); /* transaction xid table */ size = add_size(size, hash_estimate_size(max_table_size, @@ -1321,7 +1251,7 @@ PredicateLockShmemSize(void) RWConflictDataSize)); /* Head for list of finished serializable transactions. */ - size = add_size(size, sizeof(SHM_QUEUE)); + size = add_size(size, sizeof(dlist_head)); /* Shared memory structures for SLRU tracking of old committed xids. */ size = add_size(size, sizeof(OldSerXidControlData)); @@ -1444,7 +1374,7 @@ SummarizeOldestCommittedSxact(void) * that case, we have nothing to do here. The caller will find one of the * slots released by the other backend when it retries. */ - if (SHMQueueEmpty(FinishedSerializableTransactions)) + if (dlist_is_empty(FinishedSerializableTransactions)) { LWLockRelease(SerializableFinishedListLock); return; @@ -1454,11 +1384,9 @@ SummarizeOldestCommittedSxact(void) * Grab the first sxact off the finished list -- this will be the earliest * commit. Remove it from the list. */ - sxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); - SHMQueueDelete(&(sxact->finishedLink)); + sxact = dlist_head_element(SERIALIZABLEXACT, finishedLink, + FinishedSerializableTransactions); + dlist_delete_thoroughly(&sxact->finishedLink); /* Add to SLRU summary information. */ if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact)) @@ -1512,7 +1440,7 @@ GetSafeSnapshot(Snapshot origSnapshot) * them marked us as conflicted. */ MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING; - while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) || + while (!(dlist_is_empty(&MySerializableXact->possibleUnsafeConflicts) || SxactIsROUnsafe(MySerializableXact))) { LWLockRelease(SerializableXactHashLock); @@ -1558,13 +1486,16 @@ int GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) { int num_written = 0; - SERIALIZABLEXACT *sxact; + dlist_iter iter; + SERIALIZABLEXACT *sxact = NULL; LWLockAcquire(SerializableXactHashLock, LW_SHARED); /* Find blocked_pid's SERIALIZABLEXACT by linear search. */ - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + sxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (sxact->pid == blocked_pid) break; } @@ -1572,21 +1503,13 @@ GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size) /* Did we find it, and is it currently waiting in GetSafeSnapshot? */ if (sxact != NULL && SxactIsDeferrableWaiting(sxact)) { - RWConflict possibleUnsafeConflict; - /* Traverse the list of possible unsafe conflicts collecting PIDs. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &sxact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - - while (possibleUnsafeConflict != NULL && num_written < output_size) + dlist_foreach(iter, &sxact->possibleUnsafeConflicts) { + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); + output[num_written++] = possibleUnsafeConflict->sxactOut->pid; - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&sxact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -1799,18 +1722,20 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo; sxact->prepareSeqNo = InvalidSerCommitSeqNo; sxact->commitSeqNo = InvalidSerCommitSeqNo; - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); sxact->topXid = GetTopTransactionIdIfAny(); sxact->finishedBefore = InvalidTransactionId; sxact->xmin = snapshot->xmin; sxact->pid = MyProcPid; - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&sxact->predicateLocks); + dlist_node_init(&sxact->finishedLink); sxact->flags = 0; if (XactReadOnly) { + dlist_iter iter; + sxact->flags |= SXACT_FLAG_READ_ONLY; /* @@ -1819,10 +1744,10 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, * transactions then this snapshot can be deemed safe (and we can run * without tracking predicate locks). */ - for (othersxact = FirstPredXact(); - othersxact != NULL; - othersxact = NextPredXact(othersxact)) + dlist_foreach(iter, &PredXact->activeList) { + othersxact = dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsCommitted(othersxact) && !SxactIsDoomed(othersxact) && !SxactIsReadOnly(othersxact)) @@ -2100,7 +2025,7 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); /* Can't remove it until no locks at this target. */ - if (!SHMQueueEmpty(&target->predicateLocks)) + if (!dlist_is_empty(&target->predicateLocks)) return; /* Actually remove the target. */ @@ -2128,28 +2053,20 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) { SERIALIZABLEXACT *sxact; PREDICATELOCK *predlock; + dlist_mutable_iter iter; LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); sxact = MySerializableXact; if (IsInParallelMode()) LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + + dlist_foreach_modify(iter, &sxact->predicateLocks) { - SHM_QUEUE *predlocksxactlink; - PREDICATELOCK *nextpredlock; PREDICATELOCKTAG oldlocktag; PREDICATELOCKTARGET *oldtarget; PREDICATELOCKTARGETTAG oldtargettag; - predlocksxactlink = &(predlock->xactLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - predlocksxactlink, - offsetof(PREDICATELOCK, xactLink)); + predlock = dlist_container(PREDICATELOCK, xactLink, iter.cur); oldlocktag = predlock->tag; Assert(oldlocktag.myXact == sxact); @@ -2167,8 +2084,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(predlocksxactlink); - SHMQueueDelete(&(predlock->targetLink)); + dlist_delete(&predlock->xactLink); + dlist_delete(&predlock->targetLink); rmpredlock = hash_search_with_hash_value (PredicateLockHash, &oldlocktag, @@ -2183,8 +2100,6 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) DecrementParentLocks(&oldtargettag); } - - predlock = nextpredlock; } if (IsInParallelMode()) LWLockRelease(&sxact->predicateLockListLock); @@ -2401,7 +2316,7 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, errmsg("out of shared memory"), errhint("You might need to increase max_pred_locks_per_transaction."))); if (!found) - SHMQueueInit(&(target->predicateLocks)); + dlist_init(&target->predicateLocks); /* We've got the sxact and target, make sure they're joined. */ locktag.myTarget = target; @@ -2418,9 +2333,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, if (!found) { - SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink)); - SHMQueueInsertBefore(&(sxact->predicateLocks), - &(lock->xactLink)); + dlist_push_tail(&target->predicateLocks, &lock->targetLink); + dlist_push_tail(&sxact->predicateLocks, &lock->xactLink); lock->commitSeqNo = InvalidSerCommitSeqNo; } @@ -2592,30 +2506,22 @@ PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot, static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) { - PREDICATELOCK *predlock; - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - bool found; + dlist_mutable_iter iter; Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (predlock) - { - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); - SHMQueueDelete(&(predlock->xactLink)); - SHMQueueDelete(&(predlock->targetLink)); + dlist_foreach_modify(iter, &target->predicateLocks) + { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); + bool found; + + dlist_delete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2624,8 +2530,6 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) targettaghash), HASH_REMOVE, &found); Assert(found); - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); @@ -2724,8 +2628,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, if (oldtarget) { PREDICATELOCKTARGET *newtarget; - PREDICATELOCK *oldpredlock; PREDICATELOCKTAG newpredlocktag; + dlist_mutable_iter iter; newtarget = hash_search_with_hash_value(PredicateLockTargetHash, &newtargettag, @@ -2741,7 +2645,7 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, /* If we created a new entry, initialize it */ if (!found) - SHMQueueInit(&(newtarget->predicateLocks)); + dlist_init(&newtarget->predicateLocks); newpredlocktag.myTarget = newtarget; @@ -2749,29 +2653,21 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); - while (oldpredlock) + + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo; - predlocktargetlink = &(oldpredlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); newpredlocktag.myXact = oldpredlock->tag.myXact; if (removeOld) { - SHMQueueDelete(&(oldpredlock->xactLink)); - SHMQueueDelete(&(oldpredlock->targetLink)); + dlist_delete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->targetLink)); hash_search_with_hash_value (PredicateLockHash, @@ -2799,10 +2695,10 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, } if (!found) { - SHMQueueInsertBefore(&(newtarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(newtarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -2814,14 +2710,12 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, Assert(newpredlock->commitSeqNo != 0); Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); - - oldpredlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); if (removeOld) { - Assert(SHMQueueEmpty(&oldtarget->predicateLocks)); + Assert(dlist_is_empty(&oldtarget->predicateLocks)); RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash); } } @@ -2941,7 +2835,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *oldpredlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -2976,29 +2870,21 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) heaptargettaghash, HASH_ENTER, &found); if (!found) - SHMQueueInit(&heaptarget->predicateLocks); + dlist_init(&heaptarget->predicateLocks); } /* * Loop through all the locks on the old target, replacing them with * locks on the new target. */ - oldpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldtarget->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (oldpredlock) + dlist_foreach_modify(iter, &oldtarget->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *oldpredlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); PREDICATELOCK *newpredlock; SerCommitSeqNo oldCommitSeqNo; SERIALIZABLEXACT *oldXact; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(oldtarget->predicateLocks), - &(oldpredlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); - /* * Remove the old lock first. This avoids the chance of running * out of lock structure entries for the hash table. @@ -3006,7 +2892,7 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) oldCommitSeqNo = oldpredlock->commitSeqNo; oldXact = oldpredlock->tag.myXact; - SHMQueueDelete(&(oldpredlock->xactLink)); + dlist_delete(&(oldpredlock->xactLink)); /* * No need for retail delete from oldtarget list, we're removing @@ -3032,10 +2918,10 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) &found); if (!found) { - SHMQueueInsertBefore(&(heaptarget->predicateLocks), - &(newpredlock->targetLink)); - SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks), - &(newpredlock->xactLink)); + dlist_push_tail(&(heaptarget->predicateLocks), + &(newpredlock->targetLink)); + dlist_push_tail(&(newpredlocktag.myXact->predicateLocks), + &(newpredlock->xactLink)); newpredlock->commitSeqNo = oldCommitSeqNo; } else @@ -3048,8 +2934,6 @@ DropAllPredicateLocksFromTable(Relation relation, bool transfer) Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo) || (newpredlock->tag.myXact == OldCommittedSxact)); } - - oldpredlock = nextpredlock; } hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE, @@ -3204,15 +3088,18 @@ PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, static void SetNewSxactGlobalXmin(void) { - SERIALIZABLEXACT *sxact; + dlist_iter iter; Assert(LWLockHeldByMe(SerializableXactHashLock)); PredXact->SxactGlobalXmin = InvalidTransactionId; PredXact->SxactGlobalXminCount = 0; - for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact)) + dlist_foreach(iter, &PredXact->activeList) { + SERIALIZABLEXACT *sxact = + dlist_container(SERIALIZABLEXACT, xactLink, iter.cur); + if (!SxactIsRolledBack(sxact) && !SxactIsCommitted(sxact) && sxact != OldCommittedSxact) @@ -3263,10 +3150,8 @@ void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) { bool needToClear; - RWConflict conflict, - nextConflict, - possibleUnsafeConflict; SERIALIZABLEXACT *roXact; + dlist_mutable_iter iter; /* * We can't trust XactReadOnly here, because a transaction which started @@ -3454,23 +3339,15 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * make us unsafe. Note that we use 'inLink' for the iteration as * opposed to 'outLink' for the r/w xacts. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, inLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, inLink, iter.cur); Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut)); Assert(MySerializableXact == possibleUnsafeConflict->sxactIn); ReleaseRWConflict(possibleUnsafeConflict); - - possibleUnsafeConflict = nextConflict; } } @@ -3493,16 +3370,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * back clear them all. Set SXACT_FLAG_CONFLICT_OUT if any point to * previously committed transactions. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &MySerializableXact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); if (isCommit && !SxactIsReadOnly(MySerializableXact) @@ -3518,31 +3389,21 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) || SxactIsCommitted(conflict->sxactIn) || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } /* * Release all inConflicts from committed and read-only transactions. If * we're rolling back, clear them all. */ - conflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &MySerializableXact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); if (!isCommit || SxactIsCommitted(conflict->sxactOut) || SxactIsReadOnly(conflict->sxactOut)) ReleaseRWConflict(conflict); - - conflict = nextConflict; } if (!topLevelIsDeclaredReadOnly) @@ -3553,16 +3414,10 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * conflict out. If any are waiting DEFERRABLE transactions, wake them * up if they are known safe or known unsafe. */ - possibleUnsafeConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &MySerializableXact->possibleUnsafeConflicts, - offsetof(RWConflictData, outLink)); - while (possibleUnsafeConflict) + dlist_foreach_modify(iter, &MySerializableXact->possibleUnsafeConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts, - &possibleUnsafeConflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict possibleUnsafeConflict = + dlist_container(RWConflictData, outLink, iter.cur); roXact = possibleUnsafeConflict->sxactIn; Assert(MySerializableXact == possibleUnsafeConflict->sxactOut); @@ -3590,7 +3445,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) * transaction can now safely release its predicate locks (but * that transaction's backend has to do that itself). */ - if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts)) + if (dlist_is_empty(&roXact->possibleUnsafeConflicts)) roXact->flags |= SXACT_FLAG_RO_SAFE; } @@ -3601,8 +3456,6 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) ProcSendSignal(roXact->pid); - - possibleUnsafeConflict = nextConflict; } } @@ -3630,8 +3483,8 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) /* Add this to the list of transactions to check for later cleanup. */ if (isCommit) - SHMQueueInsertBefore(FinishedSerializableTransactions, - &MySerializableXact->finishedLink); + dlist_push_tail(FinishedSerializableTransactions, + &MySerializableXact->finishedLink); /* * If we're releasing a RO_SAFE transaction in parallel mode, we'll only @@ -3673,27 +3526,19 @@ ReleasePredicateLocksLocal(void) static void ClearOldPredicateLocks(void) { - SERIALIZABLEXACT *finishedSxact; - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Loop through finished transactions. They are in commit order, so we can * stop as soon as we find one that's still interesting. */ LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE); - finishedSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - FinishedSerializableTransactions, - offsetof(SERIALIZABLEXACT, finishedLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (finishedSxact) + dlist_foreach_modify(iter, FinishedSerializableTransactions) { - SERIALIZABLEXACT *nextSxact; + SERIALIZABLEXACT *finishedSxact = + dlist_container(SERIALIZABLEXACT, finishedLink, iter.cur); - nextSxact = (SERIALIZABLEXACT *) - SHMQueueNext(FinishedSerializableTransactions, - &(finishedSxact->finishedLink), - offsetof(SERIALIZABLEXACT, finishedLink)); if (!TransactionIdIsValid(PredXact->SxactGlobalXmin) || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore, PredXact->SxactGlobalXmin)) @@ -3703,7 +3548,7 @@ ClearOldPredicateLocks(void) * took its snapshot. It's no longer interesting. */ LWLockRelease(SerializableXactHashLock); - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&finishedSxact->finishedLink); ReleaseOneSerializableXact(finishedSxact, false, false); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } @@ -3720,7 +3565,7 @@ ClearOldPredicateLocks(void) if (SxactIsReadOnly(finishedSxact)) { /* A read-only transaction can be removed entirely */ - SHMQueueDelete(&(finishedSxact->finishedLink)); + dlist_delete_thoroughly(&(finishedSxact->finishedLink)); ReleaseOneSerializableXact(finishedSxact, false, false); } else @@ -3741,7 +3586,6 @@ ClearOldPredicateLocks(void) /* Still interesting. */ break; } - finishedSxact = nextSxact; } LWLockRelease(SerializableXactHashLock); @@ -3749,20 +3593,12 @@ ClearOldPredicateLocks(void) * Loop through predicate locks on dummy transaction for summarized data. */ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); - predlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &OldCommittedSxact->predicateLocks, - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + dlist_foreach_modify(iter, &OldCommittedSxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); bool canDoPartialCleanup; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&OldCommittedSxact->predicateLocks, - &predlock->xactLink, - offsetof(PREDICATELOCK, xactLink)); - LWLockAcquire(SerializableXactHashLock, LW_SHARED); Assert(predlock->commitSeqNo != 0); Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo); @@ -3789,8 +3625,8 @@ ClearOldPredicateLocks(void) LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(&(predlock->targetLink)); - SHMQueueDelete(&(predlock->xactLink)); + dlist_delete(&(predlock->targetLink)); + dlist_delete(&(predlock->xactLink)); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3800,8 +3636,6 @@ ClearOldPredicateLocks(void) LWLockRelease(partitionLock); } - - predlock = nextpredlock; } LWLockRelease(SerializablePredicateLockListLock); @@ -3831,10 +3665,8 @@ static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, bool summarize) { - PREDICATELOCK *predlock; SERIALIZABLEXIDTAG sxidtag; - RWConflict conflict, - nextConflict; + dlist_mutable_iter iter; Assert(sxact != NULL); Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact)); @@ -3848,27 +3680,18 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); if (IsInParallelMode()) LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - while (predlock) + + dlist_foreach_modify(iter, &sxact->predicateLocks) { - PREDICATELOCK *nextpredlock; + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); PREDICATELOCKTAG tag; - SHM_QUEUE *targetLink; PREDICATELOCKTARGET *target; PREDICATELOCKTARGETTAG targettag; uint32 targettaghash; LWLock *partitionLock; - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); - tag = predlock->tag; - targetLink = &(predlock->targetLink); target = tag.myTarget; targettag = target->tag; targettaghash = PredicateLockTargetTagHashCode(&targettag); @@ -3876,7 +3699,7 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, LWLockAcquire(partitionLock, LW_EXCLUSIVE); - SHMQueueDelete(targetLink); + dlist_delete(&predlock->targetLink); hash_search_with_hash_value(PredicateLockHash, &tag, PredicateLockHashCodeFromTargetHashCode(&tag, @@ -3906,10 +3729,10 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, } else { - SHMQueueInsertBefore(&(target->predicateLocks), - &(predlock->targetLink)); - SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks), - &(predlock->xactLink)); + dlist_push_tail(&target->predicateLocks, + &predlock->targetLink); + dlist_push_tail(&OldCommittedSxact->predicateLocks, + &predlock->xactLink); predlock->commitSeqNo = sxact->commitSeqNo; } } @@ -3917,15 +3740,13 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, RemoveTargetIfNoLongerUsed(target, targettaghash); LWLockRelease(partitionLock); - - predlock = nextpredlock; } /* * Rather than retail removal, just re-init the head after we've run * through the list. */ - SHMQueueInit(&sxact->predicateLocks); + dlist_init(&sxact->predicateLocks); if (IsInParallelMode()) LWLockRelease(&sxact->predicateLockListLock); @@ -3937,38 +3758,25 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, /* Release all outConflicts (unless 'partial' is true) */ if (!partial) { - conflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &sxact->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->outConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); if (summarize) conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; ReleaseRWConflict(conflict); - conflict = nextConflict; } } /* Release all inConflicts. */ - conflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &sxact->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) + dlist_foreach_modify(iter, &sxact->inConflicts) { - nextConflict = (RWConflict) - SHMQueueNext(&sxact->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + if (summarize) conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; ReleaseRWConflict(conflict); - conflict = nextConflict; } /* Finally, get rid of the xid and the record of the transaction itself. */ @@ -4098,7 +3906,7 @@ CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot s errhint("The transaction might succeed if retried."))); if (SxactHasSummaryConflictIn(MySerializableXact) - || !SHMQueueEmpty(&MySerializableXact->inConflicts)) + || !dlist_is_empty(&MySerializableXact->inConflicts)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to read/write dependencies among transactions"), @@ -4194,9 +4002,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) uint32 targettaghash; LWLock *partitionLock; PREDICATELOCKTARGET *target; - PREDICATELOCK *predlock; PREDICATELOCK *mypredlock = NULL; PREDICATELOCKTAG mypredlocktag; + dlist_mutable_iter iter; Assert(MySerializableXact != InvalidSerializableXact); @@ -4221,24 +4029,14 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) * Each lock for an overlapping transaction represents a conflict: a * rw-dependency in to this transaction. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); LWLockAcquire(SerializableXactHashLock, LW_SHARED); - while (predlock) + + dlist_foreach_modify(iter, &target->predicateLocks) { - SHM_QUEUE *predlocktargetlink; - PREDICATELOCK *nextpredlock; - SERIALIZABLEXACT *sxact; + PREDICATELOCK *predlock = (PREDICATELOCK *) + dlist_container(PREDICATELOCK, targetLink, iter.cur); + SERIALIZABLEXACT *sxact = predlock->tag.myXact; - predlocktargetlink = &(predlock->targetLink); - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - predlocktargetlink, - offsetof(PREDICATELOCK, targetLink)); - - sxact = predlock->tag.myXact; if (sxact == MySerializableXact) { /* @@ -4283,8 +4081,6 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockAcquire(SerializableXactHashLock, LW_SHARED); } - - predlock = nextpredlock; } LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); @@ -4324,8 +4120,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) { Assert(rmpredlock == mypredlock); - SHMQueueDelete(&(mypredlock->targetLink)); - SHMQueueDelete(&(mypredlock->xactLink)); + dlist_delete(&(mypredlock->targetLink)); + dlist_delete(&(mypredlock->xactLink)); rmpredlock = (PREDICATELOCK *) hash_search_with_hash_value(PredicateLockHash, @@ -4495,7 +4291,7 @@ CheckTableForSerializableConflictIn(Relation relation) while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat))) { - PREDICATELOCK *predlock; + dlist_mutable_iter iter; /* * Check whether this is a target which needs attention. @@ -4508,26 +4304,16 @@ CheckTableForSerializableConflictIn(Relation relation) /* * Loop through locks for this target and flag conflicts. */ - predlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(target->predicateLocks), - offsetof(PREDICATELOCK, targetLink)); - while (predlock) + dlist_foreach_modify(iter, &target->predicateLocks) { - PREDICATELOCK *nextpredlock; - - nextpredlock = (PREDICATELOCK *) - SHMQueueNext(&(target->predicateLocks), - &(predlock->targetLink), - offsetof(PREDICATELOCK, targetLink)); + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, targetLink, iter.cur); if (predlock->tag.myXact != MySerializableXact && !RWConflictExists(predlock->tag.myXact, MySerializableXact)) { FlagRWConflict(predlock->tag.myXact, MySerializableXact); } - - predlock = nextpredlock; } } @@ -4585,7 +4371,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer) { bool failure; - RWConflict conflict; Assert(LWLockHeldByMe(SerializableXactHashLock)); @@ -4625,20 +4410,16 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, * to abort. *------------------------------------------------------------------------ */ - if (!failure) + if (!failure && SxactHasSummaryConflictOut(writer)) + failure = true; + else if (!failure) { - if (SxactHasSummaryConflictOut(writer)) - { - failure = true; - conflict = NULL; - } - else - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &writer->outConflicts, - offsetof(RWConflictData, outLink)); - while (conflict) + dlist_iter iter; + + dlist_foreach(iter, &writer->outConflicts) { + RWConflict conflict = + dlist_container(RWConflictData, outLink, iter.cur); SERIALIZABLEXACT *t2 = conflict->sxactIn; if (SxactIsPrepared(t2) @@ -4652,10 +4433,6 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, failure = true; break; } - conflict = (RWConflict) - SHMQueueNext(&writer->outConflicts, - &conflict->outLink, - offsetof(RWConflictData, outLink)); } } @@ -4677,30 +4454,27 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, if (SxactHasSummaryConflictIn(reader)) { failure = true; - conflict = NULL; } else - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &reader->inConflicts, - offsetof(RWConflictData, inLink)); - while (conflict) { - SERIALIZABLEXACT *t0 = conflict->sxactOut; + dlist_iter iter; - if (!SxactIsDoomed(t0) - && (!SxactIsCommitted(t0) - || t0->commitSeqNo >= writer->prepareSeqNo) - && (!SxactIsReadOnly(t0) - || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + dlist_foreach(iter, &unconstify(SERIALIZABLEXACT *, reader)->inConflicts) { - failure = true; - break; + RWConflict conflict = + dlist_container(RWConflictData, inLink, iter.cur); + SERIALIZABLEXACT *t0 = conflict->sxactOut; + + if (!SxactIsDoomed(t0) + && (!SxactIsCommitted(t0) + || t0->commitSeqNo >= writer->prepareSeqNo) + && (!SxactIsReadOnly(t0) + || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo)) + { + failure = true; + break; + } } - conflict = (RWConflict) - SHMQueueNext(&reader->inConflicts, - &conflict->inLink, - offsetof(RWConflictData, inLink)); } } @@ -4758,7 +4532,7 @@ OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, void PreCommit_CheckForSerializationFailure(void) { - RWConflict nearConflict; + dlist_iter near_iter; if (MySerializableXact == InvalidSerializableXact) return; @@ -4779,23 +4553,21 @@ PreCommit_CheckForSerializationFailure(void) errhint("The transaction might succeed if retried."))); } - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &MySerializableXact->inConflicts, - offsetof(RWConflictData, inLink)); - while (nearConflict) + dlist_foreach(near_iter, &MySerializableXact->inConflicts) { + RWConflict nearConflict = + dlist_container(RWConflictData, inLink, near_iter.cur); + if (!SxactIsCommitted(nearConflict->sxactOut) && !SxactIsDoomed(nearConflict->sxactOut)) { - RWConflict farConflict; + dlist_iter far_iter; - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &nearConflict->sxactOut->inConflicts, - offsetof(RWConflictData, inLink)); - while (farConflict) + dlist_foreach(far_iter, &nearConflict->sxactOut->inConflicts) { + RWConflict farConflict = + dlist_container(RWConflictData, inLink, far_iter.cur); + if (farConflict->sxactOut == MySerializableXact || (!SxactIsCommitted(farConflict->sxactOut) && !SxactIsReadOnly(farConflict->sxactOut) @@ -4819,17 +4591,8 @@ PreCommit_CheckForSerializationFailure(void) nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED; break; } - farConflict = (RWConflict) - SHMQueueNext(&nearConflict->sxactOut->inConflicts, - &farConflict->inLink, - offsetof(RWConflictData, inLink)); } } - - nearConflict = (RWConflict) - SHMQueueNext(&MySerializableXact->inConflicts, - &nearConflict->inLink, - offsetof(RWConflictData, inLink)); } MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo); @@ -4852,11 +4615,11 @@ PreCommit_CheckForSerializationFailure(void) void AtPrepare_PredicateLocks(void) { - PREDICATELOCK *predlock; SERIALIZABLEXACT *sxact; TwoPhasePredicateRecord record; TwoPhasePredicateXactRecord *xactRecord; TwoPhasePredicateLockRecord *lockRecord; + dlist_iter iter; sxact = MySerializableXact; xactRecord = &(record.data.xactRecord); @@ -4896,23 +4659,16 @@ AtPrepare_PredicateLocks(void) */ Assert(!IsParallelWorker() && !ParallelContextActive()); - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(sxact->predicateLocks), - offsetof(PREDICATELOCK, xactLink)); - - while (predlock != NULL) + dlist_foreach(iter, &sxact->predicateLocks) { + PREDICATELOCK *predlock = + dlist_container(PREDICATELOCK, xactLink, iter.cur); + record.type = TWOPHASEPREDICATERECORD_LOCK; lockRecord->target = predlock->tag.myTarget->tag; RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0, &record, sizeof(record)); - - predlock = (PREDICATELOCK *) - SHMQueueNext(&(sxact->predicateLocks), - &(predlock->xactLink), - offsetof(PREDICATELOCK, xactLink)); } LWLockRelease(SerializablePredicateLockListLock); @@ -5022,10 +4778,10 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * recovered xact started are still active, except possibly other * prepared xacts and we don't care whether those are RO_SAFE or not. */ - SHMQueueInit(&(sxact->possibleUnsafeConflicts)); + dlist_init(&(sxact->possibleUnsafeConflicts)); - SHMQueueInit(&(sxact->predicateLocks)); - SHMQueueElemInit(&(sxact->finishedLink)); + dlist_init(&(sxact->predicateLocks)); + dlist_node_init(&sxact->finishedLink); sxact->topXid = xid; sxact->xmin = xactRecord->xmin; @@ -5043,8 +4799,8 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, * we'll conservatively assume that it had both a conflict in and a * conflict out, and represent that with the summary conflict flags. */ - SHMQueueInit(&(sxact->outConflicts)); - SHMQueueInit(&(sxact->inConflicts)); + dlist_init(&(sxact->outConflicts)); + dlist_init(&(sxact->inConflicts)); sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN; sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT; -- 2.25.0.114.g5b0ca878e0
>From ebccc235763516c6d548326063840f7d0e0b0363 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 15:19:58 -0800 Subject: [PATCH v1 5/6] Remove now unused SHMQueue*. --- src/include/storage/shmem.h | 22 ---- src/backend/storage/ipc/Makefile | 1 - src/backend/storage/ipc/shmqueue.c | 190 ----------------------------- 3 files changed, 213 deletions(-) delete mode 100644 src/backend/storage/ipc/shmqueue.c diff --git a/src/include/storage/shmem.h b/src/include/storage/shmem.h index 0c1af892062..b6bdacadee2 100644 --- a/src/include/storage/shmem.h +++ b/src/include/storage/shmem.h @@ -24,13 +24,6 @@ #include "utils/hsearch.h" -/* shmqueue.c */ -typedef struct SHM_QUEUE -{ - struct SHM_QUEUE *prev; - struct SHM_QUEUE *next; -} SHM_QUEUE; - /* shmem.c */ extern void InitShmemAccess(void *seghdr); extern void InitShmemAllocation(void); @@ -63,19 +56,4 @@ typedef struct Size allocated_size; /* # bytes actually allocated */ } ShmemIndexEnt; -/* - * prototypes for functions in shmqueue.c - */ -extern void SHMQueueInit(SHM_QUEUE *queue); -extern void SHMQueueElemInit(SHM_QUEUE *queue); -extern void SHMQueueDelete(SHM_QUEUE *queue); -extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem); -extern void SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem); -extern Pointer SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, - Size linkOffset); -extern Pointer SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, - Size linkOffset); -extern bool SHMQueueEmpty(const SHM_QUEUE *queue); -extern bool SHMQueueIsDetached(const SHM_QUEUE *queue); - #endif /* SHMEM_H */ diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index df90c6b093f..6d5b921038c 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -21,7 +21,6 @@ OBJS = \ shm_mq.o \ shm_toc.o \ shmem.o \ - shmqueue.o \ signalfuncs.o \ sinval.o \ sinvaladt.o \ diff --git a/src/backend/storage/ipc/shmqueue.c b/src/backend/storage/ipc/shmqueue.c deleted file mode 100644 index d52b28f0fa7..00000000000 --- a/src/backend/storage/ipc/shmqueue.c +++ /dev/null @@ -1,190 +0,0 @@ -/*------------------------------------------------------------------------- - * - * shmqueue.c - * shared memory linked lists - * - * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * src/backend/storage/ipc/shmqueue.c - * - * NOTES - * - * Package for managing doubly-linked lists in shared memory. - * The only tricky thing is that SHM_QUEUE will usually be a field - * in a larger record. SHMQueueNext has to return a pointer - * to the record itself instead of a pointer to the SHMQueue field - * of the record. It takes an extra parameter and does some extra - * pointer arithmetic to do this correctly. - * - * NOTE: These are set up so they can be turned into macros some day. - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "storage/shmem.h" - - -/* - * ShmemQueueInit -- make the head of a new queue point - * to itself - */ -void -SHMQueueInit(SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - queue->prev = queue->next = queue; -} - -/* - * SHMQueueIsDetached -- true if element is not currently - * in a queue. - */ -bool -SHMQueueIsDetached(const SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - return (queue->prev == NULL); -} - -/* - * SHMQueueElemInit -- clear an element's links - */ -void -SHMQueueElemInit(SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - queue->prev = queue->next = NULL; -} - -/* - * SHMQueueDelete -- remove an element from the queue and - * close the links - */ -void -SHMQueueDelete(SHM_QUEUE *queue) -{ - SHM_QUEUE *nextElem = queue->next; - SHM_QUEUE *prevElem = queue->prev; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(nextElem)); - Assert(ShmemAddrIsValid(prevElem)); - - prevElem->next = queue->next; - nextElem->prev = queue->prev; - - queue->prev = queue->next = NULL; -} - -/* - * SHMQueueInsertBefore -- put elem in queue before the given queue - * element. Inserting "before" the queue head puts the elem - * at the tail of the queue. - */ -void -SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem) -{ - SHM_QUEUE *prevPtr = queue->prev; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(elem)); - - elem->next = prevPtr->next; - elem->prev = queue->prev; - queue->prev = elem; - prevPtr->next = elem; -} - -/* - * SHMQueueInsertAfter -- put elem in queue after the given queue - * element. Inserting "after" the queue head puts the elem - * at the head of the queue. - */ -void -SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem) -{ - SHM_QUEUE *nextPtr = queue->next; - - Assert(ShmemAddrIsValid(queue)); - Assert(ShmemAddrIsValid(elem)); - - elem->prev = nextPtr->prev; - elem->next = queue->next; - queue->next = elem; - nextPtr->prev = elem; -} - -/*-------------------- - * SHMQueueNext -- Get the next element from a queue - * - * To start the iteration, pass the queue head as both queue and curElem. - * Returns NULL if no more elements. - * - * Next element is at curElem->next. If SHMQueue is part of - * a larger structure, we want to return a pointer to the - * whole structure rather than a pointer to its SHMQueue field. - * For example, - * struct { - * int stuff; - * SHMQueue elem; - * } ELEMType; - * When this element is in a queue, prevElem->next points at struct.elem. - * We subtract linkOffset to get the correct start address of the structure. - * - * calls to SHMQueueNext should take these parameters: - * &(queueHead), &(queueHead), offsetof(ELEMType, elem) - * or - * &(queueHead), &(curElem->elem), offsetof(ELEMType, elem) - *-------------------- - */ -Pointer -SHMQueueNext(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) -{ - SHM_QUEUE *elemPtr = curElem->next; - - Assert(ShmemAddrIsValid(curElem)); - - if (elemPtr == queue) /* back to the queue head? */ - return NULL; - - return (Pointer) (((char *) elemPtr) - linkOffset); -} - -/*-------------------- - * SHMQueuePrev -- Get the previous element from a queue - * - * Same as SHMQueueNext, just starting at tail and moving towards head. - * All other comments and usage applies. - */ -Pointer -SHMQueuePrev(const SHM_QUEUE *queue, const SHM_QUEUE *curElem, Size linkOffset) -{ - SHM_QUEUE *elemPtr = curElem->prev; - - Assert(ShmemAddrIsValid(curElem)); - - if (elemPtr == queue) /* back to the queue head? */ - return NULL; - - return (Pointer) (((char *) elemPtr) - linkOffset); -} - -/* - * SHMQueueEmpty -- true if queue head is only element, false otherwise - */ -bool -SHMQueueEmpty(const SHM_QUEUE *queue) -{ - Assert(ShmemAddrIsValid(queue)); - - if (queue->prev == queue) - { - Assert(queue->next == queue); - return true; - } - return false; -} -- 2.25.0.114.g5b0ca878e0
>From 7fad990773a868600c42786f7b6c8156adfc4a9c Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 19 Feb 2020 18:04:56 -0800 Subject: [PATCH v1 6/6] Remove PROC_QUEUE.size. It's not really needed, and it adds a bit of overhead to much more common paths than where it reduces costs. Perhaps we should just remove the PROC_QUEUE type now? --- src/include/storage/lock.h | 1 - src/backend/storage/lmgr/deadlock.c | 37 +++++++++++++---------------- src/backend/storage/lmgr/lock.c | 8 ++++--- src/backend/storage/lmgr/proc.c | 11 +-------- 4 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 3569f145092..a1356b50287 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -30,7 +30,6 @@ typedef struct PGPROC PGPROC; typedef struct PROC_QUEUE { dlist_head links; /* list of PGPROC objects */ - int size; /* number of entries in list */ } PROC_QUEUE; /* GUC variables */ diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index ca2abea07f1..0fde8a95e40 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -87,7 +87,7 @@ static bool FindLockCycleRecurseMember(PGPROC *checkProc, int depth, EDGE *softEdges, int *nSoftEdges); static bool ExpandConstraints(EDGE *constraints, int nConstraints); static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints, - PGPROC **ordering); + int queue_size, PGPROC **ordering); #ifdef DEBUG_DEADLOCK static void PrintLockQueue(LOCK *lock, const char *info); @@ -250,8 +250,6 @@ DeadLockCheck(PGPROC *proc) int nProcs = waitOrders[i].nProcs; PROC_QUEUE *waitQueue = &(lock->waitProcs); - Assert(nProcs == waitQueue->size); - #ifdef DEBUG_DEADLOCK PrintLockQueue(lock, "DeadLockCheck:"); #endif @@ -261,7 +259,6 @@ DeadLockCheck(PGPROC *proc) for (int j = 0; j < nProcs; j++) { dlist_push_tail(&waitQueue->links, &(procs[j]->links)); - waitQueue->size++; } #ifdef DEBUG_DEADLOCK @@ -802,7 +799,8 @@ ExpandConstraints(EDGE *constraints, for (i = nConstraints; --i >= 0;) { LOCK *lock = constraints[i].lock; - + dlist_iter proc_iter; + int queue_size = 0; /* Did we already make a list for this lock? */ for (j = nWaitOrders; --j >= 0;) { @@ -814,8 +812,17 @@ ExpandConstraints(EDGE *constraints, /* No, so allocate a new list */ waitOrders[nWaitOrders].lock = lock; waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; - waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; - nWaitOrderProcs += lock->waitProcs.size; + + /* Fill topoProcs[] array with the procs in their current order */ + dlist_foreach(proc_iter, &lock->waitProcs.links) + { + PGPROC *proc; + + proc = dlist_container(PGPROC, links, proc_iter.cur); + topoProcs[queue_size++] = proc; + } + waitOrders[nWaitOrders].nProcs = queue_size; + nWaitOrderProcs += waitOrders[nWaitOrders].nProcs; Assert(nWaitOrderProcs <= MaxBackends); /* @@ -823,8 +830,9 @@ ExpandConstraints(EDGE *constraints, * one, since they must be for different locks. */ if (!TopoSort(lock, constraints, i + 1, - waitOrders[nWaitOrders].procs)) + queue_size, waitOrders[nWaitOrders].procs)) return false; + nWaitOrders++; } return true; @@ -860,10 +868,9 @@ static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints, + int queue_size, PGPROC **ordering) /* output argument */ { - PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; PGPROC *proc; int i, j, @@ -871,16 +878,6 @@ TopoSort(LOCK *lock, k, kk, last; - dlist_iter proc_iter; - - /* First, fill topoProcs[] array with the procs in their current order */ - i = 0; - dlist_foreach(proc_iter, &waitQueue->links) - { - proc = dlist_container(PGPROC, links, proc_iter.cur); - topoProcs[i++] = proc; - } - Assert(i == queue_size); /* * Scan the constraints, and for each proc in the array, generate a count diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 01ac3c06c5e..ad8f7bca40a 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -1828,12 +1828,11 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) Assert(proc->waitStatus == STATUS_WAITING); Assert(proc->links.next != NULL); Assert(waitLock); - Assert(waitLock->waitProcs.size > 0); + Assert(!dlist_is_empty(&waitLock->waitProcs.links)); Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); /* Remove proc from lock's wait queue */ dlist_delete(&proc->links); - waitLock->waitProcs.size--; /* Undo increments of request counts by waiting process */ Assert(waitLock->nRequested > 0); @@ -3765,7 +3764,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ waitQueue = &(theLock->waitProcs); - queue_size = waitQueue->size; + + queue_size = 0; + dlist_foreach(proc_iter, &waitQueue->links) + queue_size++; if (queue_size > data->maxpids - data->npids) { diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index a4c338d7aff..33d2d358567 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -1031,7 +1031,6 @@ void ProcQueueInit(PROC_QUEUE *queue) { dlist_init(&queue->links); - queue->size = 0; } @@ -1168,9 +1167,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) else dlist_push_tail(&waitQueue->links, &MyProc->links); - - waitQueue->size++; - lock->waitMask |= LOCKBIT_ON(lockmode); /* Set up wait information in PGPROC object, too */ @@ -1581,7 +1577,6 @@ ProcWakeup(PGPROC *proc, int waitStatus) /* Remove process from wait queue */ dlist_delete_thoroughly(&proc->links); - (proc->waitLock->waitProcs.size)--; /* Clean up process' state and pass it the ok/fail signal */ proc->waitLock = NULL; @@ -1606,9 +1601,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) LOCKMASK aheadRequests = 0; dlist_mutable_iter miter; - Assert(waitQueue->size >= 0); - - if (waitQueue->size == 0) + if (dlist_is_empty(&waitQueue->links)) return; dlist_foreach_modify(miter, &waitQueue->links) @@ -1640,8 +1633,6 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) aheadRequests |= LOCKBIT_ON(lockmode); } } - - Assert(waitQueue->size >= 0); } /* -- 2.25.0.114.g5b0ca878e0