24.01.2025 12:07, Japin Li wrote:
On Thu, 23 Jan 2025 at 21:44, Japin Li <japi...@hotmail.com> wrote:
On Thu, 23 Jan 2025 at 15:03, Yura Sokolov <y.soko...@postgrespro.ru> wrote:
23.01.2025 11:46, Japin Li пишет:
On Wed, 22 Jan 2025 at 22:44, Japin Li <japi...@hotmail.com> wrote:
On Wed, 22 Jan 2025 at 17:02, Yura Sokolov <y.soko...@postgrespro.ru> wrote:
I believe, I know why it happens: I was in hurry making v2 by
cherry-picking internal version. I reverted some changes in
CalcCuckooPositions manually and forgot to add modulo
PREV_LINKS_HASH_CAPA.

Here's the fix:

          pos->pos[0] = hash % PREV_LINKS_HASH_CAPA;
-       pos->pos[1] = pos->pos[0] + 1;
+       pos->pos[1] = (pos->pos[0] + 1) % PREV_LINKS_HASH_CAPA;
          pos->pos[2] = (hash >> 16) % PREV_LINKS_HASH_CAPA;
-       pos->pos[3] = pos->pos[2] + 2;
+       pos->pos[3] = (pos->pos[2] + 2) % PREV_LINKS_HASH_CAPA;

Any way, here's v3:
- excess file "v0-0001-Increase..." removed. I believe it was source
    of white-space apply warnings.
- this mistake fixed
- more clear slots strategies + "8 positions in two cache-lines" strategy.

You may play with switching PREV_LINKS_HASH_STRATEGY to 2 or 3 and see
if it affects measurably.

Thanks for your quick fixing.  I will retest it tomorrow.
Hi, Yura Sokolov
Here is my test result of the v3 patch:
| case                          | min        | avg        | max
|
|-------------------------------+------------+------------+------------|
| master (44b61efb79)           | 865,743.55 | 871,237.40 | 874,492.59 |
| v3                            | 857,020.58 | 860,180.11 | 864,355.00 |
| v3 PREV_LINKS_HASH_STRATEGY=2 | 853,187.41 | 855,796.36 | 858,436.44 |
| v3 PREV_LINKS_HASH_STRATEGY=3 | 863,131.97 | 864,272.91 | 865,396.42 |
It seems there are some performance decreases :( or something I
missed?


Hi, Japin.
(Excuse me for duplicating message, I found I sent it only to you
first time).

Never mind!

v3 (as well as v2) doesn't increase NUM_XLOGINSERT_LOCKS itself.
With only 8 in-progress inserters spin-lock is certainly better than any
more complex solution.

You need to compare "master" vs "master + NUM_XLOGINSERT_LOCKS=64" vs
"master + NUM_XLOGINSERT_LOCKS=64 + v3".

And even this way I don't claim "Lock-free reservation" gives any profit.

That is why your benchmarking is very valuable! It could answer, does
we need such not-small patch, or there is no real problem at all?


Hi, Yura Sokolov

Here is the test result compared with NUM_XLOGINSERT_LOCKS and the v3 patch.

| case                  | min          | avg          | max          | rate% |
|-----------------------+--------------+--------------+--------------+-------|
| master (4108440)      | 891,225.77   | 904,868.75   | 913,708.17   |       |
| lock 64               | 1,007,716.95 | 1,012,013.22 | 1,018,674.00 | 11.84 |
| lock 64 attempt 1     | 1,016,716.07 | 1,017,735.55 | 1,019,328.36 | 12.47 |
| lock 64 attempt 2     | 1,015,328.31 | 1,018,147.74 | 1,021,513.14 | 12.52 |
| lock 128              | 1,010,147.38 | 1,014,128.11 | 1,018,672.01 | 12.07 |
| lock 128 attempt 1    | 1,018,154.79 | 1,023,348.35 | 1,031,365.42 | 13.09 |
| lock 128 attempt 2    | 1,013,245.56 | 1,018,984.78 | 1,023,696.00 | 12.61 |
| lock 64 v3            | 1,010,893.30 | 1,022,787.25 | 1,029,200.26 | 13.03 |
| lock 64 attempt 1 v3  | 1,014,961.21 | 1,019,745.09 | 1,025,511.62 | 12.70 |
| lock 64 attempt 2 v3  | 1,015,690.73 | 1,018,365.46 | 1,020,200.57 | 12.54 |
| lock 128 v3           | 1,012,653.14 | 1,013,637.09 | 1,014,358.69 | 12.02 |
| lock 128 attempt 1 v3 | 1,008,027.57 | 1,016,849.87 | 1,024,597.15 | 12.38 |
| lock 128 attempt 2 v3 | 1,020,552.04 | 1,024,658.92 | 1,027,855.90 | 13.24 |

By the way, I think I did a mistake by removing "pad" field in
XLogCtlInsert, and it could affect result in bad way.

So I've attached v4 with changes:
- "pad" field were returned to separate CurrBytePos from following fields, and static assert were added. - default PREV_LINKS_HASH_STRATEGY were changed to 3 as it shows less regression on not modified NUM_XLOGINSERT_LOCKS=8

Though I beg you to test "Dumb-lock-free..." patch from previous letter
first. And only if it shows some promising results, then spent time on
v4.

------

regards
Yura
From 189981c2e7dfe94ff86c4f9406e740b368aad79d Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Sun, 19 Jan 2025 17:40:28 +0300
Subject: [PATCH v4] Lock-free XLog Reservation using lock-free hash-table

Removed PrevBytePos to eliminate lock contention, allowing atomic updates
to CurrBytePos. Use lock-free hash-table based on 4-way Cuckoo Hashing
to store link to PrevBytePos.
---
 src/backend/access/transam/xlog.c | 582 +++++++++++++++++++++++++++---
 src/tools/pgindent/typedefs.list  |   2 +
 2 files changed, 530 insertions(+), 54 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901d..de9bbbfbbb6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -68,6 +68,8 @@
 #include "catalog/pg_database.h"
 #include "common/controldata_utils.h"
 #include "common/file_utils.h"
+#include "common/hashfn.h"
+#include "common/pg_prng.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -384,6 +386,94 @@ typedef union WALInsertLockPadded
 	char		pad[PG_CACHE_LINE_SIZE];
 } WALInsertLockPadded;
 
+/* #define WAL_LINK_64 0 */
+#ifndef WAL_LINK_64
+#ifdef PG_HAVE_ATOMIC_U64_SIMULATION
+#define WAL_LINK_64 0
+#else
+#define WAL_LINK_64 1
+#endif
+#endif
+
+/*
+ * It links current position with previous one.
+ * - CurrPosId is (CurrBytePos ^ (CurrBytePos>>32))
+ *   Since CurrBytePos grows monotonically and it is aligned to MAXALIGN,
+ *   CurrPosId correctly identifies CurrBytePos for at least 4*2^32 = 32GB of
+ *   WAL logs.
+ * - CurrPosHigh is (CurrBytePos>>32), it is stored for strong uniqueness check.
+ * - PrevSize is difference between CurrBytePos and PrevBytePos
+ */
+typedef struct
+{
+#if WAL_LINK_64
+	uint64		CurrPos;
+	uint64		PrevPos;
+#define WAL_PREV_EMPTY (~((uint64)0))
+#define WALLinkEmpty(l) ((l).PrevPos == WAL_PREV_EMPTY)
+#define WALLinkSamePos(a, b) ((a).CurrPos == (b).CurrPos)
+#define WALLinkCopyPrev(a, b) do {(a).PrevPos = (b).PrevPos;} while(0)
+#else
+	uint32		CurrPosId;
+	uint32		CurrPosHigh;
+	uint32		PrevSize;
+#define WALLinkEmpty(l) ((l).PrevSize == 0)
+#define WALLinkSamePos(a, b) ((a).CurrPosId == (b).CurrPosId && (a).CurrPosHigh == (b).CurrPosHigh)
+#define WALLinkCopyPrev(a, b) do {(a).PrevSize = (b).PrevSize;} while(0)
+#endif
+} WALPrevPosLinkVal;
+
+/*
+ * This is an element of lock-free hash-table.
+ * In 32 bit mode PrevSize's lowest bit is used as a lock, relying on fact it is MAXALIGN-ed.
+ * In 64 bit mode lock protocol is more complex.
+ */
+typedef struct
+{
+#if WAL_LINK_64
+	pg_atomic_uint64 CurrPos;
+	pg_atomic_uint64 PrevPos;
+#else
+	pg_atomic_uint32 CurrPosId;
+	uint32		CurrPosHigh;
+	pg_atomic_uint32 PrevSize;
+	uint32		pad;			/* to align to 16 bytes */
+#endif
+} WALPrevPosLink;
+
+StaticAssertDecl(sizeof(WALPrevPosLink) == 16, "WALPrevPosLink should be 16 bytes");
+
+#define PREV_LINKS_HASH_CAPA (NUM_XLOGINSERT_LOCKS * 2)
+StaticAssertDecl(!(PREV_LINKS_HASH_CAPA & (PREV_LINKS_HASH_CAPA - 1)),
+				 "PREV_LINKS_HASH_CAPA should be power of two");
+StaticAssertDecl(PREV_LINKS_HASH_CAPA < UINT16_MAX,
+				 "PREV_LINKS_HASH_CAPA is too large");
+
+/*-----------
+ * PREV_LINKS_HASH_STRATEGY - the way slots are chosen in hash table
+ *   1 - 4 positions h1,h1+1,h2,h2+2 - it guarantees at least 3 distinct points,
+ *     but may spread at 4 cache lines.
+ *   2 - 4 positions h,h^1,h^2,h^3 - 4 points in single cache line.
+ *   3 - 8 positions h1,h1^1,h1^2,h1^4,h2,h2^1,h2^2,h2^3 - 8 distinct points in
+ *     in two cache lines.
+ */
+#ifndef PREV_LINKS_HASH_STRATEGY
+#define PREV_LINKS_HASH_STRATEGY 3
+#endif
+
+#if PREV_LINKS_HASH_STRATEGY <= 2
+#define PREV_LINKS_LOOKUPS 4
+#else
+#define PREV_LINKS_LOOKUPS 8
+#endif
+
+struct WALPrevLinksLookups
+{
+	uint16		pos[PREV_LINKS_LOOKUPS];
+};
+
+#define SWAP_ONCE_IN 128
+
 /*
  * Session status of running backup, used for sanity checks in SQL-callable
  * functions to start and stop backups.
@@ -395,17 +485,18 @@ static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE;
  */
 typedef struct XLogCtlInsert
 {
-	slock_t		insertpos_lck;	/* protects CurrBytePos and PrevBytePos */
-
 	/*
 	 * CurrBytePos is the end of reserved WAL. The next record will be
-	 * inserted at that position. PrevBytePos is the start position of the
-	 * previously inserted (or rather, reserved) record - it is copied to the
-	 * prev-link of the next record. These are stored as "usable byte
-	 * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+	 * inserted at that position.
+	 *
+	 * The start position of the previously inserted (or rather, reserved)
+	 * record (it is copied to the prev-link of the next record) will be
+	 * stored in PrevLinksHash.
+	 *
+	 * These are stored as "usable byte positions" rather than XLogRecPtrs
+	 * (see XLogBytePosToRecPtr()).
 	 */
-	uint64		CurrBytePos;
-	uint64		PrevBytePos;
+	pg_atomic_uint64 CurrBytePos;
 
 	/*
 	 * Make sure the above heavily-contended spinlock and byte positions are
@@ -442,8 +533,37 @@ typedef struct XLogCtlInsert
 	 * WAL insertion locks.
 	 */
 	WALInsertLockPadded *WALInsertLocks;
+
+	/*
+	 * PrevLinksHash is a lock-free hash table based on Cuckoo algorithm.
+	 *
+	 * With default PREV_LINKS_HASH_STRATEGY == 1 it is mostly 4 way: for
+	 * every element computed two positions h1, h2, and neighbour h1+1 and
+	 * h2+2 are used as well. This way even on collision we have 3 distinct
+	 * position, which provide us ~75% fill rate without unsolvable cycles
+	 * (due to Cuckoo's theory). But chosen slots may be in 4 distinct
+	 * cache-lines.
+	 *
+	 * With PREV_LINKS_HASH_STRATEGY == 3 it takes two buckets 4 elements each
+	 * - 8 positions in total, but guaranteed to be in two cache lines. It
+	 * provides very high fill rate - upto 90%.
+	 *
+	 * With PREV_LINKS_HASH_STRATEGY == 2 it takes only one bucket with 4
+	 * elements. Strictly speaking it is not Cuckoo-hashing, but should work
+	 * for our case.
+	 *
+	 * Certainly, we rely on the fact we will delete elements with same speed
+	 * as we add them, and even unsolvable cycles will be destroyed soon by
+	 * concurrent deletions.
+	 */
+	WALPrevPosLink *PrevLinksHash;
+
 } XLogCtlInsert;
 
+StaticAssertDecl(offsetof(XLogCtlInsert, RedoRecPtr) / PG_CACHE_LINE_SIZE !=
+				 offsetof(XLogCtlInsert, CurrBytePos) / PG_CACHE_LINE_SIZE,
+				 "offset ok");
+
 /*
  * Total shared-memory state for XLOG.
  */
@@ -568,6 +688,9 @@ static XLogCtlData *XLogCtl = NULL;
 /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
 static WALInsertLockPadded *WALInsertLocks = NULL;
 
+/* same for XLogCtl->Insert.PrevLinksHash */
+static WALPrevPosLink *PrevLinksHash = NULL;
+
 /*
  * We maintain an image of pg_control in shared memory.
  */
@@ -700,6 +823,19 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos,
 								TimeLineID tli);
+
+static void WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr StartPos, XLogRecPtr PrevPos);
+static void WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr *PrevPos);
+static void CalcCuckooPositions(WALPrevPosLinkVal linkval, struct WALPrevLinksLookups *pos);
+
+static bool WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val);
+static bool WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal *val);
+static bool WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val);
+static void LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos,
+							   XLogRecPtr *PrevPtr);
+static void LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec);
+static XLogRecPtr ReadInsertCurrBytePos(void);
+
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -1086,6 +1222,341 @@ XLogInsertRecord(XLogRecData *rdata,
 	return EndPos;
 }
 
+static pg_attribute_always_inline void
+WALPrevPosLinkValCompose(WALPrevPosLinkVal *val, XLogRecPtr StartPos, XLogRecPtr PrevPos)
+{
+#if WAL_LINK_64
+	val->CurrPos = StartPos;
+	val->PrevPos = PrevPos;
+#else
+	val->CurrPosHigh = StartPos >> 32;
+	val->CurrPosId = StartPos ^ val->CurrPosHigh;
+	val->PrevSize = StartPos - PrevPos;
+#endif
+}
+
+static pg_attribute_always_inline void
+WALPrevPosLinkValGetPrev(WALPrevPosLinkVal val, XLogRecPtr *PrevPos)
+{
+#if WAL_LINK_64
+	*PrevPos = val.PrevPos;
+#else
+	XLogRecPtr	StartPos = val.CurrPosHigh;
+
+	StartPos ^= (StartPos << 32) | val.CurrPosId;
+	*PrevPos = StartPos - val.PrevSize;
+#endif
+}
+
+static pg_attribute_always_inline void
+CalcCuckooPositions(WALPrevPosLinkVal linkval, struct WALPrevLinksLookups *pos)
+{
+	uint32		hash;
+#if PREV_LINKS_HASH_STRATEGY == 3
+	uint32		offset;
+#endif
+
+
+#if WAL_LINK_64
+	hash = murmurhash32(linkval.CurrPos ^ (linkval.CurrPos >> 32));
+#else
+	hash = murmurhash32(linkval.CurrPosId);
+#endif
+
+#if PREV_LINKS_HASH_STRATEGY == 1
+	pos->pos[0] = hash % PREV_LINKS_HASH_CAPA;
+	pos->pos[1] = (pos->pos[0] + 1) % PREV_LINKS_HASH_CAPA;
+	pos->pos[2] = (hash >> 16) % PREV_LINKS_HASH_CAPA;
+	pos->pos[3] = (pos->pos[2] + 2) % PREV_LINKS_HASH_CAPA;
+#else
+	pos->pos[0] = hash % PREV_LINKS_HASH_CAPA;
+	pos->pos[1] = pos->pos[0] ^ 1;
+	pos->pos[2] = pos->pos[0] ^ 2;
+	pos->pos[3] = pos->pos[0] ^ 3;
+#if PREV_LINKS_HASH_STRATEGY == 3
+	/* use multiplication compute 0 <= offset < PREV_LINKS_HASH_CAPA-4 */
+	offset = (hash / PREV_LINKS_HASH_CAPA) * (PREV_LINKS_HASH_CAPA - 4);
+	offset /= UINT32_MAX / PREV_LINKS_HASH_CAPA + 1;
+	/* add start of next bucket */
+	offset += (pos->pos[0] | 3) + 1;
+	/* get position in strictly other bucket */
+	pos->pos[4] = offset % PREV_LINKS_HASH_CAPA;
+	pos->pos[5] = pos->pos[4] ^ 1;
+	pos->pos[6] = pos->pos[4] ^ 2;
+	pos->pos[7] = pos->pos[4] ^ 3;
+#endif
+#endif
+}
+
+/*
+ * Attempt to write into empty link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkInsert(WALPrevPosLink *link, WALPrevPosLinkVal val)
+{
+#if WAL_LINK_64
+	uint64		empty = WAL_PREV_EMPTY;
+
+	if (pg_atomic_read_u64(&link->PrevPos) != WAL_PREV_EMPTY)
+		return false;
+	if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &empty, val.PrevPos))
+		return false;
+	/* we could ignore concurrent lock of CurrPos */
+	pg_atomic_write_u64(&link->CurrPos, val.CurrPos);
+	return true;
+#else
+	uint32		empty = 0;
+
+	/* first check it read-only */
+	if (pg_atomic_read_u32(&link->PrevSize) != 0)
+		return false;
+	if (!pg_atomic_compare_exchange_u32(&link->PrevSize, &empty, 1))
+		/* someone else occupied the entry */
+		return false;
+
+	pg_atomic_write_u32(&link->CurrPosId, val.CurrPosId);
+	link->CurrPosHigh = val.CurrPosHigh;
+	pg_write_barrier();
+	/* This write acts as unlock as well. */
+	pg_atomic_write_u32(&link->PrevSize, val.PrevSize);
+	return true;
+#endif
+}
+
+/*
+ * Attempt to consume matched link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkConsume(WALPrevPosLink *link, WALPrevPosLinkVal *val)
+{
+#if WAL_LINK_64
+	uint64		oldCurr;
+
+	if (pg_atomic_read_u64(&link->CurrPos) != val->CurrPos)
+		return false;
+	/* lock against concurrent swapper */
+	oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1);
+	if (oldCurr & 1)
+		return false;			/* lock failed */
+	if (oldCurr != val->CurrPos)
+	{
+		/* link was swapped */
+		pg_atomic_write_u64(&link->CurrPos, oldCurr);
+		return false;
+	}
+	val->PrevPos = pg_atomic_read_u64(&link->PrevPos);
+	pg_atomic_write_u64(&link->PrevPos, WAL_PREV_EMPTY);
+
+	/*
+	 * concurrent inserter may already reuse this link, so we don't check
+	 * result of compare_exchange
+	 */
+	oldCurr |= 1;
+	pg_atomic_compare_exchange_u64(&link->CurrPos, &oldCurr, 0);
+	return true;
+#else
+	if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId)
+		return false;
+
+	/* Try lock */
+	val->PrevSize = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+	if (val->PrevSize & 1)
+		/* Lock failed */
+		return false;
+
+	if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId ||
+		link->CurrPosHigh != val->CurrPosHigh)
+	{
+		/* unlock with old value */
+		pg_atomic_write_u32(&link->PrevSize, val->PrevSize);
+		return false;
+	}
+
+	pg_atomic_write_u32(&link->CurrPosId, 0);
+	link->CurrPosHigh = 0;
+	pg_write_barrier();
+	/* This write acts as unlock as well. */
+	pg_atomic_write_u32(&link->PrevSize, 0);
+	return true;
+#endif
+}
+
+/*
+ * Attempt to swap entry: remember existing link and write our.
+ * It could happen we consume empty entry. Caller will detect it by checking
+ * remembered value.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkSwap(WALPrevPosLink *link, WALPrevPosLinkVal *val)
+{
+#if WAL_LINK_64
+	uint64		oldCurr;
+	uint64		oldPrev;
+
+	/* lock against concurrent swapper or consumer */
+	oldCurr = pg_atomic_fetch_or_u64(&link->CurrPos, 1);
+	if (oldCurr & 1)
+		return false;			/* lock failed */
+	if (oldCurr == 0)
+	{
+		/* link was empty */
+		oldPrev = WAL_PREV_EMPTY;
+		/* but concurrent inserter may concurrently insert */
+		if (!pg_atomic_compare_exchange_u64(&link->PrevPos, &oldPrev, val->PrevPos))
+			return false;		/* concurrent inserter won. It will overwrite
+								 * CurrPos */
+		/* this write acts as unlock */
+		pg_atomic_write_u64(&link->CurrPos, val->CurrPos);
+		val->CurrPos = 0;
+		val->PrevPos = WAL_PREV_EMPTY;
+		return true;
+	}
+	oldPrev = pg_atomic_read_u64(&link->PrevPos);
+	pg_atomic_write_u64(&link->PrevPos, val->PrevPos);
+	pg_write_barrier();
+	/* write acts as unlock */
+	pg_atomic_write_u64(&link->CurrPos, val->CurrPos);
+	val->CurrPos = oldCurr;
+	val->PrevPos = oldPrev;
+	return true;
+#else
+	uint32		oldPrev;
+	uint32		oldCurId;
+	uint32		oldCurHigh;
+
+	/* Attempt to lock entry against concurrent consumer or swapper */
+	oldPrev = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+	if (oldPrev & 1)
+		/* Lock failed */
+		return false;
+
+	oldCurId = pg_atomic_read_u32(&link->CurrPosId);
+	oldCurHigh = link->CurrPosHigh;
+	pg_atomic_write_u32(&link->CurrPosId, val->CurrPosId);
+	link->CurrPosHigh = val->CurrPosHigh;
+	pg_write_barrier();
+	/* This write acts as unlock as well. */
+	pg_atomic_write_u32(&link->PrevSize, val->PrevSize);
+
+	val->CurrPosId = oldCurId;
+	val->CurrPosHigh = oldCurHigh;
+	val->PrevSize = oldPrev;
+	return true;
+#endif
+}
+
+/*
+ * Write new link (EndPos, StartPos) and find PrevPtr for StartPos.
+ *
+ * Links are stored in lock-free Cuckoo based hash-table.
+ * We use mostly-4 way Cuckoo hashing which provides high fill rate without
+ * hard cycle collisions. Also we rely on concurrent consumers of existing
+ * entry, so cycles will be broken in mean time.
+ *
+ * Cuckoo hashing relies on re-insertion for balancing, so we occasionally
+ * swaps entry and try to insert swapped instead of our.
+ */
+static void
+LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos, XLogRecPtr *PrevPtr)
+{
+	SpinDelayStatus spin_stat;
+	WALPrevPosLinkVal lookup;
+	WALPrevPosLinkVal insert;
+	struct WALPrevLinksLookups lookup_pos;
+	struct WALPrevLinksLookups insert_pos;
+	uint32		i;
+	uint32		rand = 0;
+	bool		inserted = false;
+	bool		found = false;
+
+	/* pass StartPos second time to set PrevSize = 0 */
+	WALPrevPosLinkValCompose(&lookup, StartPos, StartPos);
+	WALPrevPosLinkValCompose(&insert, EndPos, StartPos);
+
+	CalcCuckooPositions(lookup, &lookup_pos);
+	CalcCuckooPositions(insert, &insert_pos);
+
+	init_local_spin_delay(&spin_stat);
+
+	while (!inserted || !found)
+	{
+		for (i = 0; !found && i < PREV_LINKS_LOOKUPS; i++)
+			found = WALPrevPosLinkConsume(&PrevLinksHash[lookup_pos.pos[i]], &lookup);
+
+		if (inserted)
+		{
+			/*
+			 * we may sleep only after we inserted our value, since other
+			 * backend waits for it
+			 */
+			perform_spin_delay(&spin_stat);
+			goto next;
+		}
+
+		for (i = 0; !inserted && i < PREV_LINKS_LOOKUPS; i++)
+			inserted = WALPrevPosLinkInsert(&PrevLinksHash[insert_pos.pos[i]], insert);
+
+		if (inserted)
+			goto next;
+
+		rand = pg_prng_uint32(&pg_global_prng_state);
+		if (rand % SWAP_ONCE_IN != 0)
+			goto next;
+
+		i = rand / SWAP_ONCE_IN % PREV_LINKS_LOOKUPS;
+		if (!WALPrevPosLinkSwap(&PrevLinksHash[insert_pos.pos[i]], &insert))
+			goto next;
+
+		if (WALLinkEmpty(insert))
+			/* Lucky case: entry become empty and we inserted into */
+			inserted = true;
+		else if (WALLinkSamePos(lookup, insert))
+		{
+			/*
+			 * We occasionally replaced entry we looked for. No need to insert
+			 * it again.
+			 */
+			inserted = true;
+			Assert(!found);
+			found = true;
+			WALLinkCopyPrev(lookup, insert);
+			break;
+		}
+		else
+			CalcCuckooPositions(insert, &insert_pos);
+
+next:
+		pg_spin_delay();
+		pg_read_barrier();
+	}
+
+	WALPrevPosLinkValGetPrev(lookup, PrevPtr);
+}
+
+static pg_attribute_always_inline void
+LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec)
+{
+	WALPrevPosLinkVal insert;
+	struct WALPrevLinksLookups insert_pos;
+
+	WALPrevPosLinkValCompose(&insert, EndOfLog, LastRec);
+	CalcCuckooPositions(insert, &insert_pos);
+#if WAL_LINK_64
+	pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].CurrPos, insert.CurrPos);
+	pg_atomic_write_u64(&PrevLinksHash[insert_pos.pos[0]].PrevPos, insert.PrevPos);
+#else
+	pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].CurrPosId, insert.CurrPosId);
+	PrevLinksHash[insert_pos.pos[0]].CurrPosHigh = insert.CurrPosHigh;
+	pg_atomic_write_u32(&PrevLinksHash[insert_pos.pos[0]].PrevSize, insert.PrevSize);
+#endif
+}
+
+static pg_attribute_always_inline XLogRecPtr
+ReadInsertCurrBytePos(void)
+{
+	return pg_atomic_read_u64(&XLogCtl->Insert.CurrBytePos);
+}
+
 /*
  * Reserves the right amount of space for a record of given size from the WAL.
  * *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1118,25 +1589,9 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	/* All (non xlog-switch) records should contain data. */
 	Assert(size > SizeOfXLogRecord);
 
-	/*
-	 * The duration the spinlock needs to be held is minimized by minimizing
-	 * the calculations that have to be done while holding the lock. The
-	 * current tip of reserved WAL is kept in CurrBytePos, as a byte position
-	 * that only counts "usable" bytes in WAL, that is, it excludes all WAL
-	 * page headers. The mapping between "usable" byte positions and physical
-	 * positions (XLogRecPtrs) can be done outside the locked region, and
-	 * because the usable byte position doesn't include any headers, reserving
-	 * X bytes from WAL is almost as simple as "CurrBytePos += X".
-	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
-
-	startbytepos = Insert->CurrBytePos;
+	startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size);
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
-
-	SpinLockRelease(&Insert->insertpos_lck);
+	LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
@@ -1172,26 +1627,24 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 	uint32		segleft;
 
 	/*
-	 * These calculations are a bit heavy-weight to be done while holding a
-	 * spinlock, but since we're holding all the WAL insertion locks, there
-	 * are no other inserters competing for it. GetXLogInsertRecPtr() does
-	 * compete for it, but that's not called very frequently.
+	 * Currently ReserveXLogInsertLocation is protected with exclusive
+	 * insertion lock, so there is no contention against CurrBytePos, But we
+	 * still do CAS loop for being uniform.
+	 *
+	 * Probably we'll get rid of exclusive lock in a future.
 	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
 
-	startbytepos = Insert->CurrBytePos;
+repeat:
+	startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 
 	ptr = XLogBytePosToEndRecPtr(startbytepos);
 	if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
 	{
-		SpinLockRelease(&Insert->insertpos_lck);
 		*EndPos = *StartPos = ptr;
 		return false;
 	}
 
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
 
@@ -1202,10 +1655,19 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 		*EndPos += segleft;
 		endbytepos = XLogRecPtrToBytePos(*EndPos);
 	}
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
 
-	SpinLockRelease(&Insert->insertpos_lck);
+	if (!pg_atomic_compare_exchange_u64(&Insert->CurrBytePos,
+										&startbytepos,
+										endbytepos))
+	{
+		/*
+		 * Don't use spin delay here: perform_spin_delay primary case is for
+		 * solving single core contention. But on single core we will succeed
+		 * on the next attempt.
+		 */
+		goto repeat;
+	}
+	LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
 	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
@@ -1507,7 +1969,6 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	XLogRecPtr	inserted;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	int			i;
 
 	if (MyProc == NULL)
@@ -1522,9 +1983,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		return inserted;
 
 	/* Read the current insert position */
-	SpinLockAcquire(&Insert->insertpos_lck);
-	bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
+	bytepos = ReadInsertCurrBytePos();
 	reservedUpto = XLogBytePosToEndRecPtr(bytepos);
 
 	/*
@@ -4898,6 +5357,8 @@ XLOGShmemSize(void)
 
 	/* WAL insertion locks, plus alignment */
 	size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
+	/* prevlinkshash, abuses alignment of WAL insertion locks. */
+	size = add_size(size, mul_size(sizeof(WALPrevPosLink), PREV_LINKS_HASH_CAPA));
 	/* xlblocks array */
 	size = add_size(size, mul_size(sizeof(pg_atomic_uint64), XLOGbuffers));
 	/* extra alignment padding for XLOG I/O buffers */
@@ -4999,6 +5460,9 @@ XLOGShmemInit(void)
 		WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
 	}
 
+	PrevLinksHash = XLogCtl->Insert.PrevLinksHash = (WALPrevPosLink *) allocptr;
+	allocptr += sizeof(WALPrevPosLink) * PREV_LINKS_HASH_CAPA;
+
 	/*
 	 * Align the start of the page buffers to a full xlog block size boundary.
 	 * This simplifies some calculations in XLOG insertion. It is also
@@ -5017,12 +5481,24 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
-	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+	pg_atomic_init_u64(&XLogCtl->Insert.CurrBytePos, 0);
+
+	for (i = 0; i < PREV_LINKS_HASH_CAPA; i++)
+	{
+#if WAL_LINK_64
+		pg_atomic_init_u64(&PrevLinksHash[i].CurrPos, 0);
+		pg_atomic_init_u64(&PrevLinksHash[i].PrevPos, WAL_PREV_EMPTY);
+#else
+		pg_atomic_init_u32(&PrevLinksHash[i].CurrPosId, 0);
+		pg_atomic_init_u32(&PrevLinksHash[i].PrevSize, 0);
+#endif
+	}
 }
 
 /*
@@ -6018,8 +6494,13 @@ StartupXLOG(void)
 	 * previous incarnation.
 	 */
 	Insert = &XLogCtl->Insert;
-	Insert->PrevBytePos = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
-	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+	{
+		XLogRecPtr	endOfLog = XLogRecPtrToBytePos(EndOfLog);
+		XLogRecPtr	lastRec = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
+
+		pg_atomic_write_u64(&Insert->CurrBytePos, endOfLog);
+		LinkStartPrevPos(endOfLog, lastRec);
+	}
 
 	/*
 	 * Tricky point here: lastPage contains the *last* block that the LastRec
@@ -7005,7 +7486,7 @@ CreateCheckPoint(int flags)
 
 	if (shutdown)
 	{
-		XLogRecPtr	curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
+		XLogRecPtr	curInsert = XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 
 		/*
 		 * Compute new REDO record ptr = location of next XLOG record.
@@ -9434,14 +9915,7 @@ register_persistent_abort_backup_handler(void)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	uint64		current_bytepos;
-
-	SpinLockAcquire(&Insert->insertpos_lck);
-	current_bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
-
-	return XLogBytePosToRecPtr(current_bytepos);
+	return XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d5aa5c295ae..118aa487adf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3123,6 +3123,8 @@ WALAvailability
 WALInsertLock
 WALInsertLockPadded
 WALOpenSegment
+WALPrevPosLink
+WALPrevPosLinkVal
 WALReadError
 WALSegmentCloseCB
 WALSegmentContext
-- 
2.43.0

Reply via email to