On Mon, 2024-02-12 at 17:44 -0800, Jeff Davis wrote: > It looks like there's some renewed interest in this patch:
After rebasing (attached as 0001), I'm seeing some test failures. It looks like the local LogwrtResult is not being updated in as many places, and that's hitting the Assert that I recently added. The fix is easy (attached as 0002). Though it looks like we can remove the non-shared LogwrtResult entirely. Andres expressed some concern here: https://www.postgresql.org/message-id/20210130020211.rtu5ir3dpjrbi...@alap3.anarazel.de But then seemed to favor removing it here: https://www.postgresql.org/message-id/20240213001150.4uqzh7tinuhvo...@awork3.anarazel.de I'm inclined to think we can get rid of the non-shared copy. A few other comments: * Should GetFlushRecPtr()/GetXLogWriteRecPtr() use a read memory barrier? * Why did you add pg_memory_barrier() right before a spinlock acquisition? * Is it an invariant that Write >= Flush at all times? Are there guaranteed to be write barriers in the right place to ensure that? I would also like it if we could add a new "Copy" pointer indicating how much WAL data has been copied to the WAL buffers. That would be set by WaitXLogInsertionsToFinish() so that subsequent calls are cheap. Attached a patch (0003) for illustration purposes. It adds to the size of XLogCtlData, but it's fairly large already, so I'm not sure if that's a problem. If we do add this, there would be an invariant that Copy >= Write at all times. Regards, Jeff Davis
From cd48125afc61faa46f15f444390ed01f64988320 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 2 Feb 2021 14:03:43 -0300 Subject: [PATCH v11j 1/3] Make XLogCtl->LogwrtResult accessible with atomics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, access to LogwrtResult is protected by a spinlock. This becomes severely contended in some scenarios, such as with a largish replication flock: walsenders all calling GetFlushRecPtr repeatedly cause the processor heat up to the point where eggs can be fried on top. This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult with a struct containing a pair of atomically accessed variables. Do so. In a few places, we can adjust the exact location where the locals are updated to account for the fact that we no longer need the spinlock. Author: Álvaro Herrera <alvhe...@alvh.no-ip.org> Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql --- src/backend/access/transam/xlog.c | 106 ++++++++++++++---------------- src/include/port/atomics.h | 29 ++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 78 insertions(+), 58 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 50c347a679..46394c5d39 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -294,16 +294,13 @@ static bool doPageWrites; * * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). - * LogwrtResult indicates the byte positions we have already written/fsynced. - * These structs are identical but are declared separately to indicate their - * slightly different functions. + * LogWrtResult indicates the byte positions we have already written/fsynced. + * These structs are similar but are declared separately to indicate their + * slightly different functions; in addition, the latter is read and written + * using atomic operations. * - * To read XLogCtl->LogwrtResult, you must hold either info_lck or - * WALWriteLock. To update it, you need to hold both locks. The point of - * this arrangement is that the value can be examined by code that already - * holds WALWriteLock without needing to grab info_lck as well. In addition - * to the shared variable, each backend has a private copy of LogwrtResult, - * which is updated when convenient. + * In addition to the shared variable, each backend has a private copy of + * LogwrtResult, each member of which is separately updated when convenient. * * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst * (protected by info_lck), but we don't need to cache any copies of it. @@ -326,6 +323,12 @@ static bool doPageWrites; *---------- */ +typedef struct XLogwrtAtomic +{ + pg_atomic_uint64 Write; /* last byte + 1 written out */ + pg_atomic_uint64 Flush; /* last byte + 1 flushed */ +} XLogwrtAtomic; + typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ @@ -461,6 +464,7 @@ typedef struct XLogCtlData { XLogCtlInsert Insert; + XLogwrtAtomic LogwrtResult; /* uses atomics */ /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ @@ -478,12 +482,6 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogwrtResult LogwrtResult; - /* * Latest initialized page in the cache (last byte position + 1). * @@ -613,7 +611,7 @@ static ControlFileData *ControlFile = NULL; static int UsableBytesInSegment; /* - * Private, possibly out-of-date copy of shared LogwrtResult. + * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult. * See discussion above. */ static XLogwrtResult LogwrtResult = {0, 0}; @@ -963,8 +961,6 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); } @@ -1963,6 +1959,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now that we have the lock, check if someone initialized the page * already. */ + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); while (upto >= XLogCtl->InitializedUpTo || opportunistic) { nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); @@ -1982,17 +1979,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. + * Before waiting, update LogwrtResult.Write and see if we still need + * to write it or if someone else already did. */ + LogwrtResult.Write = + pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); if (LogwrtResult.Write < OldPageRqstPtr) { /* @@ -2007,7 +2005,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = + pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); if (LogwrtResult.Write >= OldPageRqstPtr) { /* OK, someone wrote it already */ @@ -2291,7 +2290,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* * Update local LogwrtResult (caller probably did this already, but...) */ - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); /* * Since successive pages in the xlog cache are consecutively allocated, @@ -2506,6 +2505,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); + /* Publish current write result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, + LogwrtResult.Write); + /* * If asked to flush, do so */ @@ -2542,22 +2545,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; } - /* - * Update shared-memory status - * - * We make sure that the shared 'request' values do not fall behind the - * 'result' values. This is not absolutely essential, but it saves some - * code in a couple of places. - */ - { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&XLogCtl->info_lck); - } + /* Publish current flush result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, + LogwrtResult.Flush); } /* @@ -2573,8 +2563,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) bool wakeup = false; XLogRecPtr prevAsyncXactLSN; + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; sleeping = XLogCtl->WalWriterSleeping; prevAsyncXactLSN = XLogCtl->asyncXactLSN; if (XLogCtl->asyncXactLSN < asyncXactLSN) @@ -2782,14 +2772,14 @@ XLogFlush(XLogRecPtr record) { XLogRecPtr insertpos; - /* read LogwrtResult and update local state */ + /* Skip ahead write request pointer to latest written globally */ SpinLockAcquire(&XLogCtl->info_lck); if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* done already? */ + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (record <= LogwrtResult.Flush) break; @@ -2817,7 +2807,7 @@ XLogFlush(XLogRecPtr record) } /* Got the lock; recheck whether request is satisfied */ - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); @@ -2941,7 +2931,6 @@ XLogBackgroundFlush(void) /* read LogwrtResult and update local state */ SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); @@ -2949,8 +2938,10 @@ XLogBackgroundFlush(void) WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; /* if we have already flushed that far, consider async commit records */ + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqst.Write <= LogwrtResult.Flush) { + pg_memory_barrier(); SpinLockAcquire(&XLogCtl->info_lck); WriteRqst.Write = XLogCtl->asyncXactLSN; SpinLockRelease(&XLogCtl->info_lck); @@ -2966,6 +2957,7 @@ XLogBackgroundFlush(void) { if (openLogFile >= 0) { + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size)) { @@ -3029,7 +3021,8 @@ XLogBackgroundFlush(void) /* now wait for any in-progress insertions to finish and get write lock */ WaitXLogInsertionsToFinish(WriteRqst.Write); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqst.Write > LogwrtResult.Write || WriteRqst.Flush > LogwrtResult.Flush) { @@ -3117,9 +3110,7 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* check again */ if (record <= LogwrtResult.Flush) @@ -5935,10 +5926,13 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } + /* + * Update local and shared status. This is OK to do without any locks + * because no other process can be reading or writing WAL yet. + */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - - XLogCtl->LogwrtResult = LogwrtResult; - + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -6377,9 +6371,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* * If we're writing and flushing WAL, the time line can't be changing, so @@ -9287,9 +9279,7 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); return LogwrtResult.Write; } diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index bf151037f7..d7bdb74f30 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -514,6 +514,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline void +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return; + } + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64(ptr, &currval, target_)) + break; + } +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..ee0cbd0b73 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3126,6 +3126,7 @@ XLogRedoAction XLogSegNo XLogSource XLogStats +XLogwrtAtomic XLogwrtResult XLogwrtRqst XPV -- 2.34.1
From 42001626af46b93a6eff69c0076dab65deaea912 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Fri, 16 Feb 2024 12:15:40 -0800 Subject: [PATCH v11j 2/3] fixup --- src/backend/access/transam/xlog.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 46394c5d39..0a667a9dea 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -962,6 +962,8 @@ XLogInsertRecord(XLogRecData *rdata, if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); } /* @@ -6371,7 +6373,9 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); + pg_read_barrier(); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); /* * If we're writing and flushing WAL, the time line can't be changing, so -- 2.34.1
From f4197c3413fd9c10e956509506b0b0c6354794e1 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Fri, 16 Feb 2024 12:24:13 -0800 Subject: [PATCH v11j 3/3] Add WAL Copy pointer, representing data copied to WAL buffers. --- src/backend/access/transam/xlog.c | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0a667a9dea..41a7cb2fa5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -325,6 +325,7 @@ static bool doPageWrites; typedef struct XLogwrtAtomic { + pg_atomic_uint64 Copy; /* last byte + 1 copied to WAL buffers */ pg_atomic_uint64 Write; /* last byte + 1 written out */ pg_atomic_uint64 Flush; /* last byte + 1 flushed */ } XLogwrtAtomic; @@ -1495,6 +1496,7 @@ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; + XLogRecPtr copyptr; XLogRecPtr reservedUpto; XLogRecPtr finishedUpto; XLogCtlInsert *Insert = &XLogCtl->Insert; @@ -1503,6 +1505,12 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (MyProc == NULL) elog(PANIC, "cannot wait without a PGPROC structure"); + /* check if there's any work to do */ + pg_read_barrier(); + copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy); + if (upto <= copyptr) + return copyptr; + /* Read the current insert position */ SpinLockAcquire(&Insert->insertpos_lck); bytepos = Insert->CurrBytePos; @@ -1582,6 +1590,9 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) finishedUpto = insertingat; } + + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Copy, finishedUpto); + return finishedUpto; } @@ -1723,13 +1734,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, { char *pdst = dstbuf; XLogRecPtr recptr = startptr; + XLogRecPtr copyptr; Size nbytes = count; if (RecoveryInProgress() || tli != GetWALInsertionTimeLine()) return 0; Assert(!XLogRecPtrIsInvalid(startptr)); - Assert(startptr + count <= LogwrtResult.Write); + + /* + * Caller should ensure that the requested data has been copied to WAL + * buffers before we try to read it. + */ + pg_read_barrier(); + copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy); + if (startptr + count > copyptr) + ereport(WARNING, + (errmsg("request to read past end of generated WAL; request %X/%X, current position %X/%X", + LSN_FORMAT_ARGS(startptr + count), LSN_FORMAT_ARGS(copyptr)))); /* * Loop through the buffers without a lock. For each buffer, atomically @@ -5933,6 +5955,7 @@ StartupXLOG(void) * because no other process can be reading or writing WAL yet. */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Copy, EndOfLog); pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; -- 2.34.1