On Mon, 2024-02-12 at 17:44 -0800, Jeff Davis wrote:
> It looks like there's some renewed interest in this patch:

After rebasing (attached as 0001), I'm seeing some test failures. It
looks like the local LogwrtResult is not being updated in as many
places, and that's hitting the Assert that I recently added. The fix is
easy (attached as 0002).

Though it looks like we can remove the non-shared LogwrtResult
entirely. Andres expressed some concern here:

https://www.postgresql.org/message-id/20210130020211.rtu5ir3dpjrbi...@alap3.anarazel.de

But then seemed to favor removing it here:

https://www.postgresql.org/message-id/20240213001150.4uqzh7tinuhvo...@awork3.anarazel.de

I'm inclined to think we can get rid of the non-shared copy.

A few other comments:

 * Should GetFlushRecPtr()/GetXLogWriteRecPtr() use a read memory
barrier?
 * Why did you add pg_memory_barrier() right before a spinlock
acquisition?
 * Is it an invariant that Write >= Flush at all times? Are there
guaranteed to be write barriers in the right place to ensure that?

I would also like it if we could add a new "Copy" pointer indicating
how much WAL data has been copied to the WAL buffers. That would be set
by WaitXLogInsertionsToFinish() so that subsequent calls are cheap.
Attached a patch (0003) for illustration purposes. It adds to the size
of XLogCtlData, but it's fairly large already, so I'm not sure if
that's a problem. If we do add this, there would be an invariant that
Copy >= Write at all times.

Regards,
        Jeff Davis

From cd48125afc61faa46f15f444390ed01f64988320 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH v11j 1/3] Make XLogCtl->LogwrtResult accessible with atomics
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently, access to LogwrtResult is protected by a spinlock.  This
becomes severely contended in some scenarios, such as with a largish
replication flock: walsenders all calling GetFlushRecPtr repeatedly
cause the processor heat up to the point where eggs can be fried on top.

This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult
with a struct containing a pair of atomically accessed variables. Do so.
In a few places, we can adjust the exact location where the locals are
updated to account for the fact that we no longer need the spinlock.

Author: Álvaro Herrera <alvhe...@alvh.no-ip.org>
Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql
---
 src/backend/access/transam/xlog.c | 106 ++++++++++++++----------------
 src/include/port/atomics.h        |  29 ++++++++
 src/tools/pgindent/typedefs.list  |   1 +
 3 files changed, 78 insertions(+), 58 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 50c347a679..46394c5d39 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -294,16 +294,13 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * LogWrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are similar but are declared separately to indicate their
+ * slightly different functions; in addition, the latter is read and written
+ * using atomic operations.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -326,6 +323,12 @@ static bool doPageWrites;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 written out */
+	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -461,6 +464,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult; /* uses atomics */
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -478,12 +482,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -613,7 +611,7 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
 static XLogwrtResult LogwrtResult = {0, 0};
@@ -963,8 +961,6 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
@@ -1963,6 +1959,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 	 * Now that we have the lock, check if someone initialized the page
 	 * already.
 	 */
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
 	{
 		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
@@ -1982,17 +1979,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Before waiting, update LogwrtResult.Write and see if we still need
+			 * to write it or if someone else already did.
 			 */
+			LogwrtResult.Write =
+				pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -2007,7 +2005,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write =
+					pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2291,7 +2290,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2506,6 +2505,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 	Assert(npages == 0);
 
+	/* Publish current write result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write,
+									LogwrtResult.Write);
+
 	/*
 	 * If asked to flush, do so
 	 */
@@ -2542,22 +2545,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
-	/*
-	 * Update shared-memory status
-	 *
-	 * We make sure that the shared 'request' values do not fall behind the
-	 * 'result' values.  This is not absolutely essential, but it saves some
-	 * code in a couple of places.
-	 */
-	{
-		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
-		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
-			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
-		SpinLockRelease(&XLogCtl->info_lck);
-	}
+	/* Publish current flush result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush,
+									LogwrtResult.Flush);
 }
 
 /*
@@ -2573,8 +2563,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	bool		wakeup = false;
 	XLogRecPtr	prevAsyncXactLSN;
 
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2782,14 +2772,14 @@ XLogFlush(XLogRecPtr record)
 	{
 		XLogRecPtr	insertpos;
 
-		/* read LogwrtResult and update local state */
+		/* Skip ahead write request pointer to latest written globally */
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2817,7 +2807,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2941,7 +2931,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -2949,8 +2938,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -2966,6 +2957,7 @@ XLogBackgroundFlush(void)
 	{
 		if (openLogFile >= 0)
 		{
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
 								 wal_segment_size))
 			{
@@ -3029,7 +3021,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3117,9 +3110,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -5935,10 +5926,13 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status.  This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
-	XLogCtl->LogwrtResult = LogwrtResult;
-
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
@@ -6377,9 +6371,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9287,9 +9279,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index bf151037f7..d7bdb74f30 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -514,6 +514,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d808aad8b0..ee0cbd0b73 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3126,6 +3126,7 @@ XLogRedoAction
 XLogSegNo
 XLogSource
 XLogStats
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPV
-- 
2.34.1

From 42001626af46b93a6eff69c0076dab65deaea912 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Fri, 16 Feb 2024 12:15:40 -0800
Subject: [PATCH v11j 2/3] fixup

---
 src/backend/access/transam/xlog.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 46394c5d39..0a667a9dea 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -962,6 +962,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
 		SpinLockRelease(&XLogCtl->info_lck);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	}
 
 	/*
@@ -6371,7 +6373,9 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
+	pg_read_barrier();
 	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
-- 
2.34.1

From f4197c3413fd9c10e956509506b0b0c6354794e1 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Fri, 16 Feb 2024 12:24:13 -0800
Subject: [PATCH v11j 3/3] Add WAL Copy pointer, representing data copied to
 WAL buffers.

---
 src/backend/access/transam/xlog.c | 25 ++++++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0a667a9dea..41a7cb2fa5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -325,6 +325,7 @@ static bool doPageWrites;
 
 typedef struct XLogwrtAtomic
 {
+	pg_atomic_uint64 Copy;		/* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 Write;		/* last byte + 1 written out */
 	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
 } XLogwrtAtomic;
@@ -1495,6 +1496,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1503,6 +1505,12 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	pg_read_barrier();
+	copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1582,6 +1590,9 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Copy, finishedUpto);
+
 	return finishedUpto;
 }
 
@@ -1723,13 +1734,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	pg_read_barrier();
+	copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy);
+	if (startptr + count > copyptr)
+		ereport(WARNING,
+				(errmsg("request to read past end of generated WAL; request %X/%X, current position %X/%X",
+						LSN_FORMAT_ARGS(startptr + count), LSN_FORMAT_ARGS(copyptr))));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -5933,6 +5955,7 @@ StartupXLOG(void)
 	 * because no other process can be reading or writing WAL yet.
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Copy, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
-- 
2.34.1

Reply via email to