On Mon, Sep 5, 2022 at 5:34 PM Kyotaro Horiguchi
<horikyota....@gmail.com> wrote:
> At Mon, 05 Sep 2022 14:15:27 +0900 (JST), Kyotaro Horiguchi 
> <horikyota....@gmail.com> wrote in
> me> +1 for showing any message for the failure, but I think we shouldn't
> me> hide an existing message if any.
>
> At Mon, 5 Sep 2022 16:54:07 +1200, Thomas Munro <thomas.mu...@gmail.com> 
> wrote in
> > On reflection, it'd be better not to clobber any pre-existing error
> > there, but report one only if there isn't one already queued.  I've
> > done that in this version, which I'm planning to do a bit more testing
> > on and commit soonish if there are no comments/objections, especially
> > for that part.
>
> It looks fine in this regard.  I still think that the message looks
> somewhat internal.

Thanks for looking!

> me> And the error messages around are
> me> just telling that "<some error happened> at RecPtr". So I think
> me> "missing contrecord at RecPtr" is sufficient here.

Ok, I've updated it like that.

> > I'll have to check whether a doc change is necessary somewhere to
> > advertise that maintenance_io_concurrency=0 turns off prefetching, but
> > IIRC that's kinda already implied.
> >
> > I've tested quite a lot of scenarios including make check-world with
> > maintenance_io_concurrency = 0, 1, 10, 1000, and ALTER SYSTEM for all
> > relevant GUCs on a standby running large pgbench to check expected
> > effect on pg_stat_recovery_prefetch view and generate system calls.
>
> +       if (likely(record = prefetcher->reader->record))
>
> Isn't this confusing a bit?
>
>
> +       if (likely(record = prefetcher->reader->record))
> +       {
> +               XLogRecPtr      replayed_up_to = record->next_lsn;
> +
> +               XLogReleasePreviousRecord(prefetcher->reader);
> +
>
> The likely condition is the prerequisite for
> XLogReleasePreviousRecord.  But is is a little hard to read the
> condition as "in case no previous record exists".  Since there is one
> in most cases, can't call XLogReleasePreviousRecord() unconditionally
> then the function returns true when the previous record exists and
> false if not?

We also need the LSN that is past that record.
XLogReleasePreviousRecord() could return it (or we could use
reader->EndRecPtr I suppose).  Thoughts on this version?
From efa095529b3615f30b710a4b42bd0904b187af53 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 5 Sep 2022 12:07:24 +1200
Subject: [PATCH v3] Fix recovery_prefetch with low maintenance_io_concurrency.

The LSN-based logic for knowing when IOs complete ran operations in the
wrong order.  We should process completed IOs first before trying to
start more, so that it is always possible to decode one more record when
the decoded record queue is empty, even if maintenance_io_concurrency is
set so low that a single earlier WAL record might have saturated the IO
queue.

That thinko was hidden because the effect of maintenance_io_concurrency
was arbitrarily clamped to be at least 2.  Fix the ordering, and also
remove that clamp.  We need a special case for 0, which is now treated
the same as recovery_prefetch=off, but otherwise the number is used
directly.  This allows for testing with 1, which would have made the
problem obvious in simple test scenarios.

Also add an explicit error message for missing contrecords.  It's needed
for correctness with the resulting slightly more aggressive prefetching,
to avoid losing track of that state, but it's also a bit strange that we
didn't have an error message already.

Back-patch to 15.

Reported-by: Justin Pryzby <pry...@telsasoft.com>
Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Discussion: https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
---
 src/backend/access/transam/xlogprefetcher.c | 54 ++++++++++++++-------
 src/backend/access/transam/xlogreader.c     | 22 +++++++--
 src/include/access/xlogreader.h             |  2 +-
 3 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 9aa56411d5..91bfba30ec 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -72,7 +72,9 @@
 int			recovery_prefetch = RECOVERY_PREFETCH_TRY;
 
 #ifdef USE_PREFETCH
-#define RecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
+#define RecoveryPrefetchEnabled() \
+		(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
+		 maintenance_io_concurrency > 0)
 #else
 #define RecoveryPrefetchEnabled() false
 #endif
@@ -985,6 +987,7 @@ XLogRecord *
 XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 {
 	DecodedXLogRecord *record;
+	XLogRecPtr	replayed_up_to;
 
 	/*
 	 * See if it's time to reset the prefetching machinery, because a relevant
@@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 
 		if (RecoveryPrefetchEnabled())
 		{
-			max_inflight = Max(maintenance_io_concurrency, 2);
+			Assert(maintenance_io_concurrency > 0);
+			max_inflight = maintenance_io_concurrency;
 			max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
 		}
 		else
@@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	}
 
 	/*
-	 * Release last returned record, if there is one.  We need to do this so
-	 * that we can check for empty decode queue accurately.
+	 * Release last returned record, if there is one, and perform tasks that
+	 * are we can do now that the caller has replayed it.
 	 */
-	XLogReleasePreviousRecord(prefetcher->reader);
+	replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
 
-	/* If there's nothing queued yet, then start prefetching. */
+	/*
+	 * Can we drop any filters yet?  If we were waiting for a relation to be
+	 * created or extended, it is now OK to access blocks in the covered
+	 * range.
+	 */
+	XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
+
+	/*
+	 * All IO initiated by earlier WAL is now completed.  This might trigger
+	 * further prefetching.
+	 */
+	lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
+
+	/*
+	 * If there's nothing queued yet, then start prefetching to cause at least
+	 * one record to be queued.
+	 */
 	if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
+	{
+		Assert(lrq_inflight(prefetcher->streaming_read) == 0);
+		Assert(lrq_completed(prefetcher->streaming_read) == 0);
 		lrq_prefetch(prefetcher->streaming_read);
+	}
 
 	/* Read the next record. */
 	record = XLogNextRecord(prefetcher->reader, errmsg);
@@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	Assert(record == prefetcher->reader->record);
 
 	/*
-	 * Can we drop any prefetch filters yet, given the record we're about to
-	 * return?  This assumes that any records with earlier LSNs have been
-	 * replayed, so if we were waiting for a relation to be created or
-	 * extended, it is now OK to access blocks in the covered range.
+	 * If maintenance_io_concurrency is set very low, we might have started
+	 * prefetching some but not all of the blocks referenced in the record
+	 * we're about to return.  Forget about the rest of the blocks in this
+	 * record by dropping the prefetcher's reference to it.
 	 */
-	XLogPrefetcherCompleteFilters(prefetcher, record->lsn);
+	if (record == prefetcher->record)
+		prefetcher->record = NULL;
 
 	/*
 	 * See if it's time to compute some statistics, because enough WAL has
@@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
 		XLogPrefetcherComputeStats(prefetcher);
 
-	/*
-	 * The caller is about to replay this record, so we can now report that
-	 * all IO initiated because of early WAL must be finished. This may
-	 * trigger more readahead.
-	 */
-	lrq_complete_lsn(prefetcher->streaming_read, record->lsn);
-
 	Assert(record == prefetcher->reader->record);
 
 	return &record->header;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index cdcacc7803..c1c2d9ca90 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -276,21 +276,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
 
 /*
  * See if we can release the last record that was returned by
- * XLogNextRecord(), if any, to free up space.
+ * XLogNextRecord(), if any, to free up space.  Returns the LSN
+ * past the end of the previous record.
  */
-void
+XLogRecPtr
 XLogReleasePreviousRecord(XLogReaderState *state)
 {
 	DecodedXLogRecord *record;
+	XLogRecPtr		next_lsn;
 
 	if (!state->record)
-		return;
+		return InvalidXLogRecPtr;
 
 	/*
 	 * Remove it from the decoded record queue.  It must be the oldest item
 	 * decoded, decode_queue_head.
 	 */
 	record = state->record;
+	next_lsn = record->next_lsn;
 	Assert(record == state->decode_queue_head);
 	state->record = NULL;
 	state->decode_queue_head = record->next;
@@ -336,6 +339,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
 			state->decode_buffer_tail = state->decode_buffer;
 		}
 	}
+
+	return next_lsn;
 }
 
 /*
@@ -907,6 +912,17 @@ err:
 		 */
 		state->abortedRecPtr = RecPtr;
 		state->missingContrecPtr = targetPagePtr;
+
+		/*
+		 * If we got here without reporting an error, report one now so that
+		 * XLogPrefetcherReadRecord() doesn't bring us back a second time and
+		 * clobber the above state.  Otherwise, the existing error takes
+		 * precedence.
+		 */
+		if (!state->errormsg_buf[0])
+			report_invalid_record(state,
+								  "missing contrecord at %X/%X",
+								  LSN_FORMAT_ARGS(RecPtr));
 	}
 
 	if (decoded && decoded->oversized)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 97b6f00114..6afec33d41 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
 										 char **errormsg);
 
 /* Release the previously returned record, if necessary. */
-extern void XLogReleasePreviousRecord(XLogReaderState *state);
+extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
 
 /* Try to read ahead, if there is data and space. */
 extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,
-- 
2.30.2

Reply via email to