Rebase again.

Regards,
Zhiguo

On 4/30/2025 10:55 PM, Yura Sokolov wrote:
Just rebase

From 4e3fbbd66382c6a6dfd4abae802eda8e79d8d892 Mon Sep 17 00:00:00 2001
From: Zhiguo Zhou <[email protected]>
Date: Mon, 27 Oct 2025 17:11:30 +0800
Subject: [PATCH] Lock-free XLog Reservation using lock-free hash-table

Removed PrevBytePos to eliminate lock contention, allowing atomic updates
to CurrBytePos. Use lock-free hash-table based on 4-way Cuckoo Hashing
to store link to PrevBytePos.
---
 src/backend/access/transam/xlog.c | 587 +++++++++++++++++++++++++++---
 src/tools/pgindent/typedefs.list  |   2 +
 2 files changed, 532 insertions(+), 57 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index eceab341255..1c9830f89af 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -68,6 +68,8 @@
 #include "catalog/pg_database.h"
 #include "common/controldata_utils.h"
 #include "common/file_utils.h"
+#include "common/hashfn.h"
+#include "common/pg_prng.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -385,6 +387,94 @@ typedef union WALInsertLockPadded
        char            pad[PG_CACHE_LINE_SIZE];
 } WALInsertLockPadded;
 
+/* #define WAL_LINK_64 0 */
+#ifndef WAL_LINK_64
+#ifdef PG_HAVE_ATOMIC_U64_SIMULATION
+#define WAL_LINK_64 0
+#else
+#define WAL_LINK_64 1
+#endif
+#endif
+
+/*
+ * It links current position with previous one.
+ * - CurrPosId is (CurrBytePos ^ (CurrBytePos>>32))
+ *   Since CurrBytePos grows monotonically and it is aligned to MAXALIGN,
+ *   CurrPosId correctly identifies CurrBytePos for at least 4*2^32 = 32GB of
+ *   WAL logs.
+ * - CurrPosHigh is (CurrBytePos>>32), it is stored for strong uniqueness 
check.
+ * - PrevSize is difference between CurrBytePos and PrevBytePos
+ */
+typedef struct
+{
+#if WAL_LINK_64
+       uint64          CurrPos;
+       uint64          PrevPos;
+#define WAL_PREV_EMPTY (~((uint64)0))
+#define WALLinkEmpty(l) ((l).PrevPos == WAL_PREV_EMPTY)
+#define WALLinkSamePos(a, b) ((a).CurrPos == (b).CurrPos)
+#define WALLinkCopyPrev(a, b) do {(a).PrevPos = (b).PrevPos;} while(0)
+#else
+       uint32          CurrPosId;
+       uint32          CurrPosHigh;
+       uint32          PrevSize;
+#define WALLinkEmpty(l) ((l).PrevSize == 0)
+#define WALLinkSamePos(a, b) ((a).CurrPosId == (b).CurrPosId && 
(a).CurrPosHigh == (b).CurrPosHigh)
+#define WALLinkCopyPrev(a, b) do {(a).PrevSize = (b).PrevSize;} while(0)
+#endif
+} WALPrevPosLinkVal;
+
+/*
+ * This is an element of lock-free hash-table.
+ * In 32 bit mode PrevSize's lowest bit is used as a lock, relying on fact it 
is MAXALIGN-ed.
+ * In 64 bit mode lock protocol is more complex.
+ */
+typedef struct
+{
+#if WAL_LINK_64
+       pg_atomic_uint64 CurrPos;
+       pg_atomic_uint64 PrevPos;
+#else
+       pg_atomic_uint32 CurrPosId;
+       uint32          CurrPosHigh;
+       pg_atomic_uint32 PrevSize;
+       uint32          pad;                    /* to align to 16 bytes */
+#endif
+} WALPrevPosLink;
+
+StaticAssertDecl(sizeof(WALPrevPosLink) == 16, "WALPrevPosLink should be 16 
bytes");
+
+#define PREV_LINKS_HASH_CAPA (NUM_XLOGINSERT_LOCKS * 2)
+StaticAssertDecl(!(PREV_LINKS_HASH_CAPA & (PREV_LINKS_HASH_CAPA - 1)),
+                                "PREV_LINKS_HASH_CAPA should be power of two");
+StaticAssertDecl(PREV_LINKS_HASH_CAPA < UINT16_MAX,
+                                "PREV_LINKS_HASH_CAPA is too large");
+
+/*-----------
+ * PREV_LINKS_HASH_STRATEGY - the way slots are chosen in hash table
+ *   1 - 4 positions h1,h1+1,h2,h2+2 - it guarantees at least 3 distinct 
points,
+ *     but may spread at 4 cache lines.
+ *   2 - 4 positions h,h^1,h^2,h^3 - 4 points in single cache line.
+ *   3 - 8 positions h1,h1^1,h1^2,h1^4,h2,h2^1,h2^2,h2^3 - 8 distinct points in
+ *     in two cache lines.
+ */
+#ifndef PREV_LINKS_HASH_STRATEGY
+#define PREV_LINKS_HASH_STRATEGY 3
+#endif
+
+#if PREV_LINKS_HASH_STRATEGY <= 2
+#define PREV_LINKS_LOOKUPS 4
+#else
+#define PREV_LINKS_LOOKUPS 8
+#endif
+
+struct WALPrevLinksLookups
+{
+       uint16          pos[PREV_LINKS_LOOKUPS];
+};
+
+#define SWAP_ONCE_IN 128
+
 /*
  * Session status of running backup, used for sanity checks in SQL-callable
  * functions to start and stop backups.
@@ -396,17 +486,18 @@ static SessionBackupState sessionBackupState = 
SESSION_BACKUP_NONE;
  */
 typedef struct XLogCtlInsert
 {
-       slock_t         insertpos_lck;  /* protects CurrBytePos and PrevBytePos 
*/
-
        /*
         * CurrBytePos is the end of reserved WAL. The next record will be
-        * inserted at that position. PrevBytePos is the start position of the
-        * previously inserted (or rather, reserved) record - it is copied to 
the
-        * prev-link of the next record. These are stored as "usable byte
-        * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+        * inserted at that position.
+        *
+        * The start position of the previously inserted (or rather, reserved)
+        * record (it is copied to the prev-link of the next record) will be
+        * stored in PrevLinksHash.
+        *
+        * These are stored as "usable byte positions" rather than XLogRecPtrs
+        * (see XLogBytePosToRecPtr()).
         */
-       uint64          CurrBytePos;
-       uint64          PrevBytePos;
+       pg_atomic_uint64 CurrBytePos;
 
        /*
         * Make sure the above heavily-contended spinlock and byte positions are
@@ -443,8 +534,37 @@ typedef struct XLogCtlInsert
         * WAL insertion locks.
         */
        WALInsertLockPadded *WALInsertLocks;
+
+       /*
+        * PrevLinksHash is a lock-free hash table based on Cuckoo algorithm.
+        *
+        * With default PREV_LINKS_HASH_STRATEGY == 1 it is mostly 4 way: for
+        * every element computed two positions h1, h2, and neighbour h1+1 and
+        * h2+2 are used as well. This way even on collision we have 3 distinct
+        * position, which provide us ~75% fill rate without unsolvable cycles
+        * (due to Cuckoo's theory). But chosen slots may be in 4 distinct
+        * cache-lines.
+        *
+        * With PREV_LINKS_HASH_STRATEGY == 3 it takes two buckets 4 elements 
each
+        * - 8 positions in total, but guaranteed to be in two cache lines. It
+        * provides very high fill rate - upto 90%.
+        *
+        * With PREV_LINKS_HASH_STRATEGY == 2 it takes only one bucket with 4
+        * elements. Strictly speaking it is not Cuckoo-hashing, but should work
+        * for our case.
+        *
+        * Certainly, we rely on the fact we will delete elements with same 
speed
+        * as we add them, and even unsolvable cycles will be destroyed soon by
+        * concurrent deletions.
+        */
+       WALPrevPosLink *PrevLinksHash;
+
 } XLogCtlInsert;
 
+StaticAssertDecl(offsetof(XLogCtlInsert, RedoRecPtr) / PG_CACHE_LINE_SIZE !=
+                                offsetof(XLogCtlInsert, CurrBytePos) / 
PG_CACHE_LINE_SIZE,
+                                "offset ok");
+
 /*
  * Total shared-memory state for XLOG.
  */
@@ -568,6 +688,9 @@ static XLogCtlData *XLogCtl = NULL;
 /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
 static WALInsertLockPadded *WALInsertLocks = NULL;
 
+/* same for XLogCtl->Insert.PrevLinksHash */
+static WALPrevPosLink *PrevLinksHash = NULL;
+
 /*
  * We maintain an image of pg_control in shared memory.
  */
@@ -700,6 +823,19 @@ static void CopyXLogRecordToWAL(int write_len, bool 
isLogSwitch,
                                                                XLogRecData 
*rdata,
                                                                XLogRecPtr 
StartPos, XLogRecPtr EndPos,
                                                                TimeLineID tli);
+
+static void WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr 
StartPos, XLogRecPtr PrevPos);
+static void WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr 
*PrevPos);
+static void CalcCuckooPositions(WALPrevPosLinkVal linkval, struct 
WALPrevLinksLookups *pos);
+
+static bool WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val);
+static bool WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal 
*val);
+static bool WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val);
+static void LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos,
+                                                          XLogRecPtr *PrevPtr);
+static void LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec);
+static XLogRecPtr ReadInsertCurrBytePos(void);
+
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
                                                                          
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -1089,6 +1225,341 @@ XLogInsertRecord(XLogRecData *rdata,
        return EndPos;
 }
 
+static pg_attribute_always_inline void
+WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr StartPos, 
XLogRecPtr PrevPos)
+{
+#if WAL_LINK_64
+       val->CurrPos = StartPos;
+       val->PrevPos = PrevPos;
+#else
+       val->CurrPosHigh = StartPos >> 32;
+       val->CurrPosId = StartPos ^ val->CurrPosHigh;
+       val->PrevSize = StartPos - PrevPos;
+#endif
+}
+
+static pg_attribute_always_inline void
+WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr *PrevPos)
+{
+#if WAL_LINK_64
+       *PrevPos = val.PrevPos;
+#else
+       XLogRecPtr      StartPos = val.CurrPosHigh;
+
+       StartPos ^= (StartPos << 32) | val.CurrPosId;
+       *PrevPos = StartPos - val.PrevSize;
+#endif
+}
+
+static pg_attribute_always_inline void
+CalcCuckooPositions(WALPrevPosLinkVal linkval, struct WALPrevLinksLookups *pos)
+{
+       uint32          hash;
+#if PREV_LINKS_HASH_STRATEGY == 3
+       uint32          offset;
+#endif
+
+
+#if WAL_LINK_64
+       hash = murmurhash32(linkval.CurrPos ^ (linkval.CurrPos >> 32));
+#else
+       hash = murmurhash32(linkval.CurrPosId);
+#endif
+
+#if PREV_LINKS_HASH_STRATEGY == 1
+       pos->pos[0] = hash % PREV_LINKS_HASH_CAPA;
+       pos->pos[1] = (pos->pos[0] + 1) % PREV_LINKS_HASH_CAPA;
+       pos->pos[2] = (hash >> 16) % PREV_LINKS_HASH_CAPA;
+       pos->pos[3] = (pos->pos[2] + 2) % PREV_LINKS_HASH_CAPA;
+#else
+       pos->pos[0] = hash % PREV_LINKS_HASH_CAPA;
+       pos->pos[1] = pos->pos[0] ^ 1;
+       pos->pos[2] = pos->pos[0] ^ 2;
+       pos->pos[3] = pos->pos[0] ^ 3;
+#if PREV_LINKS_HASH_STRATEGY == 3
+       /* use multiplication compute 0 <= offset < PREV_LINKS_HASH_CAPA-4 */
+       offset = (hash / PREV_LINKS_HASH_CAPA) * (PREV_LINKS_HASH_CAPA - 4);
+       offset /= UINT32_MAX / PREV_LINKS_HASH_CAPA + 1;
+       /* add start of next bucket */
+       offset += (pos->pos[0] | 3) + 1;
+       /* get position in strictly other bucket */
+       pos->pos[4] = offset % PREV_LINKS_HASH_CAPA;
+       pos->pos[5] = pos->pos[4] ^ 1;
+       pos->pos[6] = pos->pos[4] ^ 2;
+       pos->pos[7] = pos->pos[4] ^ 3;
+#endif
+#endif
+}
+
+/*
+ * Attempt to write into empty link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val)
+{
+#if WAL_LINK_64
+       uint64          empty = WAL_PREV_EMPTY;
+
+       if (pg_atomic_read_u64(&link->PrevPos) != WAL_PREV_EMPTY)
+               return false;
+       if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &empty, 
val.PrevPos))
+               return false;
+       /* we could ignore concurrent lock of CurrPos */
+       pg_atomic_write_u64(&link->CurrPos, val.CurrPos);
+       return true;
+#else
+       uint32          empty = 0;
+
+       /* first check it read-only */
+       if (pg_atomic_read_u32(&link->PrevSize) != 0)
+               return false;
+       if (!pg_atomic_compare_exchange_u32(&link->PrevSize, &empty, 1))
+               /* someone else occupied the entry */
+               return false;
+
+       pg_atomic_write_u32(&link->CurrPosId, val.CurrPosId);
+       link->CurrPosHigh = val.CurrPosHigh;
+       pg_write_barrier();
+       /* This write acts as unlock as well. */
+       pg_atomic_write_u32(&link->PrevSize, val.PrevSize);
+       return true;
+#endif
+}
+
+/*
+ * Attempt to consume matched link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal *val)
+{
+#if WAL_LINK_64
+       uint64          oldCurr;
+
+       if (pg_atomic_read_u64(&link->CurrPos) != val->CurrPos)
+               return false;
+       /* lock against concurrent swapper */
+       oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1);
+       if (oldCurr & 1)
+               return false;                   /* lock failed */
+       if (oldCurr != val->CurrPos)
+       {
+               /* link was swapped */
+               pg_atomic_write_u64(&link->CurrPos, oldCurr);
+               return false;
+       }
+       val->PrevPos = pg_atomic_read_u64(&link->PrevPos);
+       pg_atomic_write_u64(&link->PrevPos, WAL_PREV_EMPTY);
+
+       /*
+        * concurrent inserter may already reuse this link, so we don't check
+        * result of compare_exchange
+        */
+       oldCurr |= 1;
+       pg_atomic_compare_exchange_u64(&link->CurrPos, &oldCurr, 0);
+       return true;
+#else
+       if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId)
+               return false;
+
+       /* Try lock */
+       val->PrevSize = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+       if (val->PrevSize & 1)
+               /* Lock failed */
+               return false;
+
+       if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId ||
+               link->CurrPosHigh != val->CurrPosHigh)
+       {
+               /* unlock with old value */
+               pg_atomic_write_u32(&link->PrevSize, val->PrevSize);
+               return false;
+       }
+
+       pg_atomic_write_u32(&link->CurrPosId, 0);
+       link->CurrPosHigh = 0;
+       pg_write_barrier();
+       /* This write acts as unlock as well. */
+       pg_atomic_write_u32(&link->PrevSize, 0);
+       return true;
+#endif
+}
+
+/*
+ * Attempt to swap entry: remember existing link and write our.
+ * It could happen we consume empty entry. Caller will detect it by checking
+ * remembered value.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val)
+{
+#if WAL_LINK_64
+       uint64          oldCurr;
+       uint64          oldPrev;
+
+       /* lock against concurrent swapper or consumer */
+       oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1);
+       if (oldCurr & 1)
+               return false;                   /* lock failed */
+       if (oldCurr == 0)
+       {
+               /* link was empty */
+               oldPrev = WAL_PREV_EMPTY;
+               /* but concurrent inserter may concurrently insert */
+               if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &oldPrev, 
val->PrevPos))
+                       return false;           /* concurrent inserter won. It 
will overwrite
+                                                                * CurrPos */
+               /* this write acts as unlock */
+               pg_atomic_write_u64(&link->CurrPos, val->CurrPos);
+               val->CurrPos = 0;
+               val->PrevPos = WAL_PREV_EMPTY;
+               return true;
+       }
+       oldPrev = pg_atomic_read_u64(&link->PrevPos);
+       pg_atomic_write_u64(&link->PrevPos, val->PrevPos);
+       pg_write_barrier();
+       /* write acts as unlock */
+       pg_atomic_write_u64(&link->CurrPos, val->CurrPos);
+       val->CurrPos = oldCurr;
+       val->PrevPos = oldPrev;
+       return true;
+#else
+       uint32          oldPrev;
+       uint32          oldCurId;
+       uint32          oldCurHigh;
+
+       /* Attempt to lock entry against concurrent consumer or swapper */
+       oldPrev = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+       if (oldPrev & 1)
+               /* Lock failed */
+               return false;
+
+       oldCurId = pg_atomic_read_u32(&link->CurrPosId);
+       oldCurHigh = link->CurrPosHigh;
+       pg_atomic_write_u32(&link->CurrPosId, val->CurrPosId);
+       link->CurrPosHigh = val->CurrPosHigh;
+       pg_write_barrier();
+       /* This write acts as unlock as well. */
+       pg_atomic_write_u32(&link->PrevSize, val->PrevSize);
+
+       val->CurrPosId = oldCurId;
+       val->CurrPosHigh = oldCurHigh;
+       val->PrevSize = oldPrev;
+       return true;
+#endif
+}
+
+/*
+ * Write new link (EndPos, StartPos) and find PrevPtr for StartPos.
+ *
+ * Links are stored in lock-free Cuckoo based hash-table.
+ * We use mostly-4 way Cuckoo hashing which provides high fill rate without
+ * hard cycle collisions. Also we rely on concurrent consumers of existing
+ * entry, so cycles will be broken in mean time.
+ *
+ * Cuckoo hashing relies on re-insertion for balancing, so we occasionally
+ * swaps entry and try to insert swapped instead of our.
+ */
+static void
+LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos, XLogRecPtr *PrevPtr)
+{
+       SpinDelayStatus spin_stat;
+       WALPrevPosLinkVal lookup;
+       WALPrevPosLinkVal insert;
+       struct WALPrevLinksLookups lookup_pos;
+       struct WALPrevLinksLookups insert_pos;
+       uint32          i;
+       uint32          rand = 0;
+       bool            inserted = false;
+       bool            found = false;
+
+       /* pass StartPos second time to set PrevSize = 0 */
+       WALPrevPosLinkValCompose(&lookup, StartPos, StartPos);
+       WALPrevPosLinkValCompose(&insert, EndPos, StartPos);
+
+       CalcCuckooPositions(lookup, &lookup_pos);
+       CalcCuckooPositions(insert, &insert_pos);
+
+       init_local_spin_delay(&spin_stat);
+
+       while (!inserted || !found)
+       {
+               for (i = 0; !found && i < PREV_LINKS_LOOKUPS; i++)
+                       found = 
WALPrevPosLinkConsume(&PrevLinksHash[lookup_pos.pos[i]], &lookup);
+
+               if (inserted)
+               {
+                       /*
+                        * we may sleep only after we inserted our value, since 
other
+                        * backend waits for it
+                        */
+                       perform_spin_delay(&spin_stat);
+                       goto next;
+               }
+
+               for (i = 0; !inserted && i < PREV_LINKS_LOOKUPS; i++)
+                       inserted = 
WALPrevPosLinkInsert(&PrevLinksHash[insert_pos.pos[i]], insert);
+
+               if (inserted)
+                       goto next;
+
+               rand = pg_prng_uint32(&pg_global_prng_state);
+               if (rand % SWAP_ONCE_IN != 0)
+                       goto next;
+
+               i = rand / SWAP_ONCE_IN % PREV_LINKS_LOOKUPS;
+               if (!WALPrevPosLinkSwap(&PrevLinksHash[insert_pos.pos[i]], 
&insert))
+                       goto next;
+
+               if (WALLinkEmpty(insert))
+                       /* Lucky case: entry become empty and we inserted into 
*/
+                       inserted = true;
+               else if (WALLinkSamePos(lookup, insert))
+               {
+                       /*
+                        * We occasionally replaced entry we looked for. No 
need to insert
+                        * it again.
+                        */
+                       inserted = true;
+                       Assert(!found);
+                       found = true;
+                       WALLinkCopyPrev(lookup, insert);
+                       break;
+               }
+               else
+                       CalcCuckooPositions(insert, &insert_pos);
+
+next:
+               pg_spin_delay();
+               pg_read_barrier();
+       }
+
+       WALPrevPosLinkValGetPrev(lookup, PrevPtr);
+}
+
+static pg_attribute_always_inline void
+LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec)
+{
+       WALPrevPosLinkVal insert;
+       struct WALPrevLinksLookups insert_pos;
+
+       WALPrevPosLinkValCompose(&insert, EndOfLog, LastRec);
+       CalcCuckooPositions(insert, &insert_pos);
+#if WAL_LINK_64
+       pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].CurrPos, 
insert.CurrPos);
+       pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].PrevPos, 
insert.PrevPos);
+#else
+       pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].CurrPosId, 
insert.CurrPosId);
+       PrevLinksHash[insert_pos.pos[0]].CurrPosHigh = insert.CurrPosHigh;
+       pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].PrevSize, 
insert.PrevSize);
+#endif
+}
+
+static pg_attribute_always_inline XLogRecPtr
+ReadInsertCurrBytePos(void)
+{
+       return pg_atomic_read_u64(&XLogCtl->Insert.CurrBytePos);
+}
+
 /*
  * Reserves the right amount of space for a record of given size from the WAL.
  * *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1121,25 +1592,9 @@ ReserveXLogInsertLocation(int size, XLogRecPtr 
*StartPos, XLogRecPtr *EndPos,
        /* All (non xlog-switch) records should contain data. */
        Assert(size > SizeOfXLogRecord);
 
-       /*
-        * The duration the spinlock needs to be held is minimized by minimizing
-        * the calculations that have to be done while holding the lock. The
-        * current tip of reserved WAL is kept in CurrBytePos, as a byte 
position
-        * that only counts "usable" bytes in WAL, that is, it excludes all WAL
-        * page headers. The mapping between "usable" byte positions and 
physical
-        * positions (XLogRecPtrs) can be done outside the locked region, and
-        * because the usable byte position doesn't include any headers, 
reserving
-        * X bytes from WAL is almost as simple as "CurrBytePos += X".
-        */
-       SpinLockAcquire(&Insert->insertpos_lck);
-
-       startbytepos = Insert->CurrBytePos;
+       startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size);
        endbytepos = startbytepos + size;
-       prevbytepos = Insert->PrevBytePos;
-       Insert->CurrBytePos = endbytepos;
-       Insert->PrevBytePos = startbytepos;
-
-       SpinLockRelease(&Insert->insertpos_lck);
+       LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
        *StartPos = XLogBytePosToRecPtr(startbytepos);
        *EndPos = XLogBytePosToEndRecPtr(endbytepos);
@@ -1175,26 +1630,23 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr 
*EndPos, XLogRecPtr *PrevPtr)
        uint32          segleft;
 
        /*
-        * These calculations are a bit heavy-weight to be done while holding a
-        * spinlock, but since we're holding all the WAL insertion locks, there
-        * are no other inserters competing for it. GetXLogInsertRecPtr() does
-        * compete for it, but that's not called very frequently.
+        * Currently ReserveXLogInsertLocation is protected with exclusive
+        * insertion lock, so there is no contention against CurrBytePos, But we
+        * still do CAS loop for being uniform.
+        *
+        * Probably we'll get rid of exclusive lock in a future.
         */
-       SpinLockAcquire(&Insert->insertpos_lck);
-
-       startbytepos = Insert->CurrBytePos;
+repeat:
+       startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 
        ptr = XLogBytePosToEndRecPtr(startbytepos);
        if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
        {
-               SpinLockRelease(&Insert->insertpos_lck);
                *EndPos = *StartPos = ptr;
                return false;
        }
 
        endbytepos = startbytepos + size;
-       prevbytepos = Insert->PrevBytePos;
-
        *StartPos = XLogBytePosToRecPtr(startbytepos);
        *EndPos = XLogBytePosToEndRecPtr(endbytepos);
 
@@ -1205,10 +1657,18 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr 
*EndPos, XLogRecPtr *PrevPtr)
                *EndPos += segleft;
                endbytepos = XLogRecPtrToBytePos(*EndPos);
        }
-       Insert->CurrBytePos = endbytepos;
-       Insert->PrevBytePos = startbytepos;
-
-       SpinLockRelease(&Insert->insertpos_lck);
+       if (!pg_atomic_compare_exchange_u64(&Insert->CurrBytePos,
+                                                                               
&startbytepos,
+                                                                               
endbytepos))
+       {
+               /*
+                * Don't use spin delay here: perform_spin_delay primary case 
is for
+                * solving single core contention. But on single core we will 
succeed
+                * on the next attempt.
+                */
+               goto repeat;
+       }
+       LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
        *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
@@ -1510,7 +1970,6 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
        XLogRecPtr      inserted;
        XLogRecPtr      reservedUpto;
        XLogRecPtr      finishedUpto;
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
        int                     i;
 
        if (MyProc == NULL)
@@ -1525,9 +1984,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
                return inserted;
 
        /* Read the current insert position */
-       SpinLockAcquire(&Insert->insertpos_lck);
-       bytepos = Insert->CurrBytePos;
-       SpinLockRelease(&Insert->insertpos_lck);
+       bytepos = ReadInsertCurrBytePos();
        reservedUpto = XLogBytePosToEndRecPtr(bytepos);
 
        /*
@@ -4940,6 +5397,8 @@ XLOGShmemSize(void)
 
        /* WAL insertion locks, plus alignment */
        size = add_size(size, mul_size(sizeof(WALInsertLockPadded), 
NUM_XLOGINSERT_LOCKS + 1));
+       /* prevlinkshash, abuses alignment of WAL insertion locks. */
+       size = add_size(size, mul_size(sizeof(WALPrevPosLink), 
PREV_LINKS_HASH_CAPA));
        /* xlblocks array */
        size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers));
        /* extra alignment padding for XLOG I/O buffers */
@@ -4994,8 +5453,9 @@ XLOGShmemInit(void)
                /* both should be present or neither */
                Assert(foundCFile && foundXLog);
 
-               /* Initialize local copy of WALInsertLocks */
+               /* Initialize local copy of WALInsertLocks and PrevLinksHash */
                WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+               PrevLinksHash = XLogCtl->Insert.PrevLinksHash;
 
                if (localControlFile)
                        pfree(localControlFile);
@@ -5041,6 +5501,9 @@ XLOGShmemInit(void)
                WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
        }
 
+       PrevLinksHash = XLogCtl->Insert.PrevLinksHash = (WALPrevPosLink *) 
allocptr;
+       allocptr += sizeof(WALPrevPosLink) * PREV_LINKS_HASH_CAPA;
+
        /*
         * Align the start of the page buffers to a full xlog block size 
boundary.
         * This simplifies some calculations in XLOG insertion. It is also
@@ -5059,12 +5522,24 @@ XLOGShmemInit(void)
        XLogCtl->InstallXLogFileSegmentActive = false;
        XLogCtl->WalWriterSleeping = false;
 
-       SpinLockInit(&XLogCtl->Insert.insertpos_lck);
        SpinLockInit(&XLogCtl->info_lck);
        pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
        pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
        pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
        pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+       pg_atomic_init_u64(&XLogCtl->Insert.CurrBytePos, 0);
+
+       for (i = 0; i < PREV_LINKS_HASH_CAPA; i++)
+       {
+#if WAL_LINK_64
+               pg_atomic_init_u64(&PrevLinksHash[i].CurrPos, 0);
+               pg_atomic_init_u64(&PrevLinksHash[i].PrevPos, WAL_PREV_EMPTY);
+#else
+               pg_atomic_init_u32(&PrevLinksHash[i].CurrPosId, 0);
+               pg_atomic_init_u32(&PrevLinksHash[i].PrevSize, 0);
+#endif
+       }
 }
 
 /*
@@ -6064,8 +6539,13 @@ StartupXLOG(void)
         * previous incarnation.
         */
        Insert = &XLogCtl->Insert;
-       Insert->PrevBytePos = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
-       Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+       {
+               XLogRecPtr      endOfLog = XLogRecPtrToBytePos(EndOfLog);
+               XLogRecPtr      lastRec = 
XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
+
+               pg_atomic_write_u64(&Insert->CurrBytePos, endOfLog);
+               LinkStartPrevPos(endOfLog, lastRec);
+       }
 
        /*
         * Tricky point here: lastPage contains the *last* block that the 
LastRec
@@ -7057,7 +7537,7 @@ CreateCheckPoint(int flags)
 
        if (shutdown)
        {
-               XLogRecPtr      curInsert = 
XLogBytePosToRecPtr(Insert->CurrBytePos);
+               XLogRecPtr      curInsert = 
XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 
                /*
                 * Compute new REDO record ptr = location of next XLOG record.
@@ -9478,14 +9958,7 @@ register_persistent_abort_backup_handler(void)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-       XLogCtlInsert *Insert = &XLogCtl->Insert;
-       uint64          current_bytepos;
-
-       SpinLockAcquire(&Insert->insertpos_lck);
-       current_bytepos = Insert->CurrBytePos;
-       SpinLockRelease(&Insert->insertpos_lck);
-
-       return XLogBytePosToRecPtr(current_bytepos);
+       return XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 43fe3bcd593..8dda440df50 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3240,6 +3240,8 @@ WALAvailability
 WALInsertLock
 WALInsertLockPadded
 WALOpenSegment
+WALPrevPosLink
+WALPrevPosLinkVal
 WALReadError
 WALSegmentCloseCB
 WALSegmentContext
-- 
2.43.0

Reply via email to