On Wed, Aug 09, 2023 at 05:00:49PM +0900, Kyotaro Horiguchi wrote:
> Looks fine.

Okay, I've updated the patch in consequence.  I'll look at 0001 again
at the beginning of next week.

> While it's a kind of bug in total, we encountered a case where an
> excessively large xl_tot_len actually came from a corrupted
> record. [1]

Right, I remember this one.  I think that Thomas was pretty much right
that this could be caused because of a lack of zeroing in the WAL
pages.

> I'm glad to see this infrastructure comes in, and I'm on board with
> retrying due to an OOM. However, I think we really need official steps
> to wrap up recovery when there is a truly broken, oversized
> xl_tot_len.

There are a few options on the table, only doable once the WAL reader
provider the error state to the startup process:
1) Retry a few times and FATAL.
2) Just FATAL immediately and don't wait.
3) Retry and hope for the best that the host calms down.
I have not seeing this issue being much of an issue in the field, so
perhaps option 2 with the structure of 0002 and a FATAL when we catch
XLOG_READER_OOM in the switch would be enough.  At least that's enough
for the cases we've seen.  I'll think a bit more about it, as well.

Yeah, agreed.  That's orthogonal to the issue reported by Ethan,
unfortunately, where he was able to trigger the issue of this thread
by manipulating the sizing of a host after producing a record larger
than what the host could afford after the resizing :/
--
Michael
From 1274883c4d06dec8876ec60ad983749b7fae3946 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 17:39:52 +0900
Subject: [PATCH v3 1/3] Add infrastructure to report error codes in WAL reader

This commits moves the error state coming from WAL readers into a new
structure, that includes the existing pointer to the error message
buffer, but it also gains an error code that fed back to the callers of
the following routines:
XLogPrefetcherReadRecord()
XLogReadRecord()
XLogNextRecord()
DecodeXLogRecord()

This will help in improving the decisions to take during recovery
depending on the failure more reported.
---
 src/include/access/xlogprefetcher.h           |   2 +-
 src/include/access/xlogreader.h               |  33 +++-
 src/backend/access/transam/twophase.c         |   8 +-
 src/backend/access/transam/xlog.c             |   6 +-
 src/backend/access/transam/xlogprefetcher.c   |   4 +-
 src/backend/access/transam/xlogreader.c       | 167 ++++++++++++------
 src/backend/access/transam/xlogrecovery.c     |  14 +-
 src/backend/access/transam/xlogutils.c        |   2 +-
 src/backend/replication/logical/logical.c     |   9 +-
 .../replication/logical/logicalfuncs.c        |   9 +-
 src/backend/replication/slotfuncs.c           |   8 +-
 src/backend/replication/walsender.c           |   8 +-
 src/bin/pg_rewind/parsexlog.c                 |  24 +--
 src/bin/pg_waldump/pg_waldump.c               |  10 +-
 contrib/pg_walinspect/pg_walinspect.c         |  11 +-
 src/tools/pgindent/typedefs.list              |   2 +
 16 files changed, 200 insertions(+), 117 deletions(-)

diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
index 7dd7f20ad0..5563ad1a67 100644
--- a/src/include/access/xlogprefetcher.h
+++ b/src/include/access/xlogprefetcher.h
@@ -48,7 +48,7 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher,
 									XLogRecPtr recPtr);
 
 extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher,
-											char **errmsg);
+											XLogReaderError *errordata);
 
 extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher);
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index da32c7db77..06664dc6fb 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -58,6 +58,24 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Values for XLogReaderError.errorcode */
+typedef enum XLogReaderErrorCode
+{
+	XLOG_READER_NO_ERROR = 0,
+	XLOG_READER_OOM,			/* out-of-memory */
+	XLOG_READER_INVALID_DATA,	/* record data */
+} XLogReaderErrorCode;
+
+/* Error status generated by a WAL reader on failure */
+typedef struct XLogReaderError
+{
+	/* Buffer to hold error message */
+	char	   *message;
+	/* Error code when filling *message */
+	XLogReaderErrorCode code;
+} XLogReaderError;
+
+
 /* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
@@ -307,9 +325,9 @@ struct XLogReaderState
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
-	/* Buffer to hold error message */
-	char	   *errormsg_buf;
-	bool		errormsg_deferred;
+	/* Error state data */
+	XLogReaderError errordata;
+	bool		errordata_deferred;
 
 	/*
 	 * Flag to indicate to XLogPageReadCB that it should not block waiting for
@@ -324,7 +342,8 @@ struct XLogReaderState
 static inline bool
 XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
 {
-	return (state->decode_queue_head != NULL) || state->errormsg_deferred;
+	return (state->decode_queue_head != NULL) ||
+		state->errordata_deferred;
 }
 
 /* Get a new XLogReader */
@@ -355,11 +374,11 @@ typedef enum XLogPageReadResult
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Consume the next record or error. */
 extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Release the previously returned record, if necessary. */
 extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
@@ -399,7 +418,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state,
 							 DecodedXLogRecord *decoded,
 							 XLogRecord *record,
 							 XLogRecPtr lsn,
-							 char **errormsg);
+							 XLogReaderError *errordata);
 
 /*
  * Macros that provide access to parts of the record most recently returned by
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..08bd6586ec 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1399,7 +1399,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
 									XL_ROUTINE(.page_read = &read_local_xlog_page,
@@ -1413,15 +1413,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
 	XLogBeginRead(xlogreader, lsn);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read two-phase state from WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(lsn), errormsg)));
+							LSN_FORMAT_ARGS(lsn), errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60c0b7ec3a..d16acd5e49 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -953,7 +953,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		DecodedXLogRecord *decoded;
 		StringInfoData buf;
 		StringInfoData recordBuf;
-		char	   *errormsg = NULL;
+		XLogReaderError	errordata = {0};
 		MemoryContext oldCxt;
 
 		oldCxt = MemoryContextSwitchTo(walDebugCxt);
@@ -987,10 +987,10 @@ XLogInsertRecord(XLogRecData *rdata,
 								   decoded,
 								   record,
 								   EndPos,
-								   &errormsg))
+								   &errordata))
 		{
 			appendStringInfo(&buf, "error decoding record: %s",
-							 errormsg ? errormsg : "no error message");
+							 errordata.message ? errordata.message : "no error message");
 		}
 		else
 		{
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 539928cb85..92d691ca49 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -984,7 +984,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
  * tries to initiate I/O for blocks referenced in future WAL records.
  */
 XLogRecord *
-XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
+XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, XLogReaderError *errdata)
 {
 	DecodedXLogRecord *record;
 	XLogRecPtr	replayed_up_to;
@@ -1052,7 +1052,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	}
 
 	/* Read the next record. */
-	record = XLogNextRecord(prefetcher->reader, errmsg);
+	record = XLogNextRecord(prefetcher->reader, errdata);
 	if (!record)
 		return NULL;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c9f9f6e98f..c29b8ff387 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -41,8 +41,10 @@
 #include "common/logging.h"
 #endif
 
-static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
-			pg_attribute_printf(2, 3);
+static void report_invalid_record(XLogReaderState *state,
+								  XLogReaderErrorCode errorcode,
+								  const char *fmt,...)
+			pg_attribute_printf(3, 4);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
 							 int reqLen);
@@ -66,21 +68,23 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
 
 /*
- * Construct a string in state->errormsg_buf explaining what's wrong with
+ * Construct a string in state->errordata.message explaining what's wrong with
  * the current record being read.
  */
 static void
-report_invalid_record(XLogReaderState *state, const char *fmt,...)
+report_invalid_record(XLogReaderState *state, XLogReaderErrorCode errorcode,
+					  const char *fmt,...)
 {
 	va_list		args;
 
 	fmt = _(fmt);
 
 	va_start(args, fmt);
-	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
+	vsnprintf(state->errordata.message, MAX_ERRORMSG_LEN, fmt, args);
 	va_end(args);
 
-	state->errormsg_deferred = true;
+	state->errordata_deferred = true;
+	state->errordata.code = errorcode;
 }
 
 /*
@@ -141,15 +145,16 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
-	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
-										  MCXT_ALLOC_NO_OOM);
-	if (!state->errormsg_buf)
+	state->errordata.message = palloc_extended(MAX_ERRORMSG_LEN + 1,
+											   MCXT_ALLOC_NO_OOM);
+	if (!state->errordata.message)
 	{
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
 	}
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NO_ERROR;
 
 	/*
 	 * Allocate an initial readRecordBuf of minimal size, which can later be
@@ -157,7 +162,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	 */
 	if (!allocate_recordbuf(state, 0))
 	{
-		pfree(state->errormsg_buf);
+		pfree(state->errordata.message);
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
@@ -175,7 +180,7 @@ XLogReaderFree(XLogReaderState *state)
 	if (state->decode_buffer && state->free_decode_buffer)
 		pfree(state->decode_buffer);
 
-	pfree(state->errormsg_buf);
+	pfree(state->errordata.message);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
 	pfree(state->readBuf);
@@ -351,23 +356,27 @@ XLogReleasePreviousRecord(XLogReaderState *state)
  *
  * On success, a record is returned.
  *
- * The returned record (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogNextRecord.
+ * The returned record (or errordata->message) points to an internal buffer
+ * that's valid until the next call to XLogNextRecord.
  */
 DecodedXLogRecord *
-XLogNextRecord(XLogReaderState *state, char **errormsg)
+XLogNextRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	/* Release the last record returned by XLogNextRecord(). */
 	XLogReleasePreviousRecord(state);
 
 	if (state->decode_queue_head == NULL)
 	{
-		*errormsg = NULL;
-		if (state->errormsg_deferred)
+		errordata->message = NULL;
+		errordata->code = XLOG_READER_NO_ERROR;
+		if (state->errordata_deferred)
 		{
-			if (state->errormsg_buf[0] != '\0')
-				*errormsg = state->errormsg_buf;
-			state->errormsg_deferred = false;
+			if (state->errordata.message[0] != '\0')
+				errordata->message = state->errordata.message;
+			if (state->errordata.code != XLOG_READER_NO_ERROR)
+				errordata->code = state->errordata.code;
+			state->errordata_deferred = false;
+			state->errordata.code = XLOG_READER_NO_ERROR;
 		}
 
 		/*
@@ -397,7 +406,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
 	state->ReadRecPtr = state->record->lsn;
 	state->EndRecPtr = state->record->next_lsn;
 
-	*errormsg = NULL;
+	errordata->message = NULL;
+	errordata->code = XLOG_READER_NO_ERROR;
 
 	return state->record;
 }
@@ -409,17 +419,17 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
  * to XLogReadRecord().
  *
  * If the page_read callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * returned.  The callback is expected to have reported the error;
+ * errordata->message is set to NULL.
  *
  * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * *errordata is set with details of the failure.
  *
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
+ * The returned pointer (or *errordata.message) points to an internal
+ * buffer that's valid until the next call to XLogReadRecord.
  */
 XLogRecord *
-XLogReadRecord(XLogReaderState *state, char **errormsg)
+XLogReadRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	DecodedXLogRecord *decoded;
 
@@ -437,7 +447,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		XLogReadAhead(state, false /* nonblocking */ );
 
 	/* Consume the head record or error. */
-	decoded = XLogNextRecord(state, errormsg);
+	decoded = XLogNextRecord(state, errordata);
 	if (decoded)
 	{
 		/*
@@ -546,7 +556,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	bool		gotheader;
 	int			readOff;
 	DecodedXLogRecord *decoded;
-	char	   *errormsg;		/* not used */
+	XLogReaderError errordata = {0};		/* not used */
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -556,7 +566,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	randAccess = false;
 
 	/* reset error state */
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NO_ERROR;
 	decoded = NULL;
 
 	state->abortedRecPtr = InvalidXLogRecPtr;
@@ -623,7 +634,9 @@ restart:
 	}
 	else if (targetRecOff < pageHeaderSize)
 	{
-		report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "invalid record offset at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  pageHeaderSize, targetRecOff);
 		goto err;
@@ -632,7 +645,9 @@ restart:
 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
 		targetRecOff == pageHeaderSize)
 	{
-		report_invalid_record(state, "contrecord is requested by %X/%X",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "contrecord is requested by %X/%X",
 							  LSN_FORMAT_ARGS(RecPtr));
 		goto err;
 	}
@@ -673,6 +688,7 @@ restart:
 		if (total_len < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
 								  (uint32) SizeOfXLogRecord, total_len);
@@ -691,6 +707,7 @@ restart:
 	decoded = XLogReadRecordAlloc(state,
 								  total_len,
 								  !nonblocking /* allow_oversized */ );
+
 	if (decoded == NULL)
 	{
 		/*
@@ -702,6 +719,7 @@ restart:
 
 		/* We failed to allocate memory for an oversized record. */
 		report_invalid_record(state,
+							  XLOG_READER_OOM,
 							  "out of memory while trying to decode a record of length %u", total_len);
 		goto err;
 	}
@@ -724,7 +742,9 @@ restart:
 			!allocate_recordbuf(state, total_len))
 		{
 			/* We treat this as a "bogus data" condition */
-			report_invalid_record(state, "record length %u at %X/%X too long",
+			report_invalid_record(state,
+								  XLOG_READER_OOM,
+								  "record length %u at %X/%X too long",
 								  total_len, LSN_FORMAT_ARGS(RecPtr));
 			goto err;
 		}
@@ -773,6 +793,7 @@ restart:
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "there is no contrecord flag at %X/%X",
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
@@ -786,6 +807,7 @@ restart:
 				total_len != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
 									  ((long long) total_len) - gotlen,
@@ -867,7 +889,7 @@ restart:
 		state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
 	}
 
-	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
+	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errordata))
 	{
 		/* Record the location of the next record. */
 		decoded->next_lsn = state->NextRecPtr;
@@ -918,7 +940,7 @@ err:
 		 * queued so that XLogPrefetcherReadRecord() doesn't bring us back a
 		 * second time and clobber the above state.
 		 */
-		state->errormsg_deferred = true;
+		state->errordata_deferred = true;
 	}
 
 	if (decoded && decoded->oversized)
@@ -931,9 +953,9 @@ err:
 	XLogReaderInvalReadState(state);
 
 	/*
-	 * If an error was written to errmsg_buf, it'll be returned to the caller
-	 * of XLogReadRecord() after all successfully decoded records from the
-	 * read queue.
+	 * If an error was written to errordata.message, it'll be returned to the
+	 * caller of XLogReadRecord() after all successfully decoded records from
+	 * the read queue.
 	 */
 
 	return XLREAD_FAIL;
@@ -952,7 +974,7 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
 {
 	XLogPageReadResult result;
 
-	if (state->errormsg_deferred)
+	if (state->errordata_deferred)
 		return NULL;
 
 	result = XLogDecodeNextRecord(state, nonblocking);
@@ -970,8 +992,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
  * via the page_read() callback.
  *
  * Returns XLREAD_FAIL if the required page cannot be read for some
- * reason; errormsg_buf is set in that case (unless the error occurs in the
- * page_read callback).
+ * reason; errordata.message is set in that case (unless the error occurs in
+ * the page_read callback).
  *
  * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
  * waiting.  This can be returned only if the installed page_read callback
@@ -1116,6 +1138,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid record length at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
@@ -1124,6 +1147,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (!RmgrIdIsValid(record->xl_rmid))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid resource manager ID %u at %X/%X",
 							  record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
 		return false;
@@ -1137,6 +1161,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (!(record->xl_prev < RecPtr))
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1153,6 +1178,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (record->xl_prev != PrevRecPtr)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1189,6 +1215,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 	if (!EQ_CRC32C(record->xl_crc, crc))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "incorrect resource manager data checksum in record at %X/%X",
 							  LSN_FORMAT_ARGS(recptr));
 		return false;
@@ -1223,6 +1250,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_magic,
 							  fname,
@@ -1238,6 +1266,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1254,6 +1283,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			longhdr->xlp_sysid != state->system_identifier)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
 								  (unsigned long long) longhdr->xlp_sysid,
 								  (unsigned long long) state->system_identifier);
@@ -1262,12 +1292,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect segment size in page header");
 			return false;
 		}
 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
 			return false;
 		}
@@ -1280,6 +1312,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
 		/* hmm, first page of file doesn't have a long header? */
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1300,6 +1333,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u",
 							  LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
 							  fname,
@@ -1326,6 +1360,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u",
 								  hdr->xlp_tli,
 								  state->latestPageTLI,
@@ -1347,8 +1382,9 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 void
 XLogReaderResetError(XLogReaderState *state)
 {
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata_deferred = false;
+	state->errordata.code = XLOG_READER_NO_ERROR;
 }
 
 /*
@@ -1368,7 +1404,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -1453,7 +1489,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * or we just jumped over the remaining data of a continuation.
 	 */
 	XLogBeginRead(state, tmpRecPtr);
-	while (XLogReadRecord(state, &errormsg) != NULL)
+	while (XLogReadRecord(state, &errordata) != NULL)
 	{
 		/* past the record we've found, break out */
 		if (RecPtr <= state->ReadRecPtr)
@@ -1598,8 +1634,9 @@ ResetDecoder(XLogReaderState *state)
 	state->decode_buffer_head = state->decode_buffer;
 
 	/* Clear error state. */
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata_deferred = false;
+	state->errordata.code = XLOG_READER_NO_ERROR;
 }
 
 /*
@@ -1649,7 +1686,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				 DecodedXLogRecord *decoded,
 				 XLogRecord *record,
 				 XLogRecPtr lsn,
-				 char **errormsg)
+				 XLogReaderError *errordata)
 {
 	/*
 	 * read next _size bytes from record buffer, but check for overrun first.
@@ -1732,6 +1769,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (block_id <= decoded->max_block_id)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "out-of-order block_id %u at %X/%X",
 									  block_id,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1756,6 +1794,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (blk->has_data && blk->data_len == 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
 				goto err;
@@ -1763,6 +1802,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (!blk->has_data && blk->data_len != 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
 									  (unsigned int) blk->data_len,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1799,6 +1839,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					 blk->bimg_len == BLCKSZ))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1815,6 +1856,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					(blk->hole_offset != 0 || blk->hole_length != 0))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1829,6 +1871,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len == BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
 										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1844,6 +1887,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1860,6 +1904,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				if (rlocator == NULL)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -1872,6 +1917,7 @@ DecodeXLogRecord(XLogReaderState *state,
 		else
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid block_id %u at %X/%X",
 								  block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
 			goto err;
@@ -1939,10 +1985,12 @@ DecodeXLogRecord(XLogReaderState *state,
 
 shortdata_err:
 	report_invalid_record(state,
+						  XLOG_READER_INVALID_DATA,
 						  "record with invalid length at %X/%X",
 						  LSN_FORMAT_ARGS(state->ReadRecPtr));
 err:
-	*errormsg = state->errormsg_buf;
+	errordata->message = state->errordata.message;
+	errordata->code = state->errordata.code;
 
 	return false;
 }
@@ -2049,6 +2097,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		!record->record->blocks[block_id].in_use)
 	{
 		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
 							  "could not restore image at %X/%X with invalid block %d specified",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
@@ -2056,7 +2105,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	}
 	if (!record->record->blocks[block_id].has_image)
 	{
-		report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d",
+		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
+							  "could not restore image at %X/%X with invalid state, block %d",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
 		return false;
@@ -2083,7 +2134,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "LZ4",
 								  block_id);
@@ -2100,7 +2153,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 			if (ZSTD_isError(decomp_result))
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "zstd",
 								  block_id);
@@ -2109,7 +2164,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		}
 		else
 		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with unknown method, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
@@ -2117,7 +2174,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 		if (!decomp_success)
 		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not decompress image at %X/%X, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..68100bfa4a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -2454,7 +2454,7 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		if (!RestoreBlockImage(record, block_id, primary_image_masked))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * If masking function is defined, mask both the primary and replay
@@ -3062,9 +3062,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 
 	for (;;)
 	{
-		char	   *errormsg;
+		XLogReaderError errordata = {0};
 
-		record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
+		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
 			/*
@@ -3098,9 +3098,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * StandbyMode that only happens if we have been triggered, so we
 			 * shouldn't loop anymore in that case.
 			 */
-			if (errormsg)
+			if (errordata.message)
 				ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-						(errmsg_internal("%s", errormsg) /* already translated */ ));
+						(errmsg_internal("%s", errordata.message) /* already translated */ ));
 		}
 
 		/*
@@ -3385,9 +3385,9 @@ retry:
 		 * Emit this error right now then retry this page immediately. Use
 		 * errmsg_internal() because the message was already translated.
 		 */
-		if (xlogreader->errormsg_buf[0])
+		if (xlogreader->errordata.message[0])
 			ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-					(errmsg_internal("%s", xlogreader->errormsg_buf)));
+					(errmsg_internal("%s", xlogreader->errordata.message)));
 
 		/* reset any error XLogReaderValidatePageHeader() might have set */
 		XLogReaderResetError(xlogreader);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index e174a2a891..5c64454e7e 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -395,7 +395,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 		if (!RestoreBlockImage(record, block_id, page))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * The page may be uninitialized. If so, we can't set the LSN because
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..f48feab944 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -641,12 +641,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	for (;;)
 	{
 		XLogRecord *record;
-		char	   *err = NULL;
+		XLogReaderError errordata = {0};
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, &err);
-		if (err)
-			elog(ERROR, "could not find logical decoding starting point: %s", err);
+		record = XLogReadRecord(ctx->reader, &errordata);
+		if (errordata.message)
+			elog(ERROR, "could not find logical decoding starting point: %s",
+				 errordata.message);
 		if (!record)
 			elog(ERROR, "could not find logical decoding starting point");
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 55a24c02c9..e7f74809e3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,11 +244,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		while (ctx->reader->EndRecPtr < end_of_wal)
 		{
 			XLogRecord *record;
-			char	   *errm = NULL;
+			XLogReaderError errordata = {0};
 
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record for logical decoding: %s", errm);
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
+				elog(ERROR, "could not find record for logical decoding: %s",
+					 errordata.message);
 
 			/*
 			 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..4fa4e6bfed 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -503,17 +503,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		/* Decode at least one record, until we run out of records */
 		while (ctx->reader->EndRecPtr < moveto)
 		{
-			char	   *errm = NULL;
 			XLogRecord *record;
+			XLogReaderError errordata = {0};
 
 			/*
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
 				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
+					 errordata.message);
 
 			/*
 			 * Process the record.  Storage-level changes are ignored in
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d27ef2985d..d05c60f09f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3045,7 +3045,7 @@ static void
 XLogSendLogical(void)
 {
 	XLogRecord *record;
-	char	   *errm;
+	XLogReaderError errordata = {0};
 
 	/*
 	 * We'll use the current flush point to determine whether we've caught up.
@@ -3063,12 +3063,12 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+	record = XLogReadRecord(logical_decoding_ctx->reader, &errordata);
 
 	/* xlog record was invalid */
-	if (errm != NULL)
+	if (errordata.message != NULL)
 		elog(ERROR, "could not find record while sending logically-decoded data: %s",
-			 errm);
+			 errordata.message);
 
 	if (record != NULL)
 	{
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 27782237d0..2705d9bf45 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -68,7 +68,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	private.tliIndex = tliIndex;
@@ -82,16 +82,16 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	XLogBeginRead(xlogreader, startpoint);
 	do
 	{
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
 			XLogRecPtr	errptr = xlogreader->EndRecPtr;
 
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not read WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(errptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not read WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(errptr));
@@ -126,7 +126,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 	XLogRecPtr	endptr;
 
@@ -139,12 +139,12 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 		pg_fatal("out of memory while allocating a WAL reading processor");
 
 	XLogBeginRead(xlogreader, ptr);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			pg_fatal("could not read WAL record at %X/%X: %s",
-					 LSN_FORMAT_ARGS(ptr), errormsg);
+					 LSN_FORMAT_ARGS(ptr), errordata.message);
 		else
 			pg_fatal("could not read WAL record at %X/%X",
 					 LSN_FORMAT_ARGS(ptr));
@@ -173,7 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	XLogRecord *record;
 	XLogRecPtr	searchptr;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	/*
@@ -204,14 +204,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 		uint8		info;
 
 		XLogBeginRead(xlogreader, searchptr);
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not find previous WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(searchptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not find previous WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(searchptr));
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e8b5a6cd61..4129ba901b 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -508,7 +508,7 @@ XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath)
 
 		/* Full page exists, so let's save it */
 		if (!RestoreBlockImage(record, block_id, page))
-			pg_fatal("%s", record->errormsg_buf);
+			pg_fatal("%s", record->errordata.message);
 
 		(void) XLogRecGetBlockTagExtended(record, block_id,
 										  &rnode, &fork, &blk, NULL);
@@ -796,7 +796,7 @@ main(int argc, char **argv)
 	XLogRecord *record;
 	XLogRecPtr	first_record;
 	char	   *waldir = NULL;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	static struct option long_options[] = {
 		{"bkp-details", no_argument, NULL, 'b'},
@@ -1239,7 +1239,7 @@ main(int argc, char **argv)
 		}
 
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, &errormsg);
+		record = XLogReadRecord(xlogreader_state, &errordata);
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
@@ -1304,10 +1304,10 @@ main(int argc, char **argv)
 	if (time_to_stop)
 		exit(0);
 
-	if (errormsg)
+	if (errordata.message)
 		pg_fatal("error in WAL record at %X/%X: %s",
 				 LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr),
-				 errormsg);
+				 errordata.message);
 
 	XLogReaderFree(xlogreader_state);
 
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..e7d30554ed 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -146,9 +146,9 @@ static XLogRecord *
 ReadNextXLogRecord(XLogReaderState *xlogreader)
 {
 	XLogRecord *record;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
@@ -161,11 +161,12 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
 		if (private_data->end_of_wal)
 			return NULL;
 
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr),
+							errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
@@ -384,7 +385,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 			if (!RestoreBlockImage(record, block_id, page))
 				ereport(ERROR,
 						(errcode(ERRCODE_INTERNAL_ERROR),
-						 errmsg_internal("%s", record->errormsg_buf)));
+						 errmsg_internal("%s", record->errordata.message)));
 
 			block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
 			SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 66823bc2a7..53ce72c4c2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3077,6 +3077,8 @@ XLogPageReadResult
 XLogPrefetchStats
 XLogPrefetcher
 XLogPrefetcherFilter
+XLogReaderError
+XLogReaderErrorCode
 XLogReaderRoutine
 XLogReaderState
 XLogRecData
-- 
2.40.1

From 56a11dc5cc8986c1a2fef9cb14508dd4dae82566 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 17:41:41 +0900
Subject: [PATCH v3 2/3] Make WAL replay more robust on OOM failures

This takes advantage of the new error facility for WAL readers, allowing
WAL replay to loop when an out-of-memory happens when reading a record.
This was the origin of potential data loss scenarios, making WAL replay
more robust by acting like a standby here.
---
 src/backend/access/transam/xlogrecovery.c | 75 ++++++++++++++++-------
 1 file changed, 52 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 68100bfa4a..a1149439e9 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3067,29 +3067,50 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
-			/*
-			 * When we find that WAL ends in an incomplete record, keep track
-			 * of that record.  After recovery is done, we'll write a record
-			 * to indicate to downstream WAL readers that that portion is to
-			 * be ignored.
-			 *
-			 * However, when ArchiveRecoveryRequested = true, we're going to
-			 * switch to a new timeline at the end of recovery. We will only
-			 * copy WAL over to the new timeline up to the end of the last
-			 * complete record, so if we did this, we would later create an
-			 * overwrite contrecord in the wrong place, breaking everything.
-			 */
-			if (!ArchiveRecoveryRequested &&
-				!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			switch (errordata.code)
 			{
-				abortedRecPtr = xlogreader->abortedRecPtr;
-				missingContrecPtr = xlogreader->missingContrecPtr;
-			}
+				case XLOG_READER_NO_ERROR:
+					/* Possible when XLogPageRead() has failed */
+					Assert(!errordata.message);
+					/* FALLTHROUGH */
 
-			if (readFile >= 0)
-			{
-				close(readFile);
-				readFile = -1;
+				case XLOG_READER_INVALID_DATA:
+
+					/*
+					 * When we find that WAL ends in an incomplete record,
+					 * keep track of that record.  After recovery is done,
+					 * we'll write a record to indicate to downstream WAL
+					 * readers that that portion is to be ignored.
+					 *
+					 * However, when ArchiveRecoveryRequested = true, we're
+					 * going to switch to a new timeline at the end of
+					 * recovery. We will only copy WAL over to the new
+					 * timeline up to the end of the last complete record, so
+					 * if we did this, we would later create an overwrite
+					 * contrecord in the wrong place, breaking everything.
+					 */
+					if (!ArchiveRecoveryRequested &&
+						!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+					{
+						abortedRecPtr = xlogreader->abortedRecPtr;
+						missingContrecPtr = xlogreader->missingContrecPtr;
+					}
+
+					if (readFile >= 0)
+					{
+						close(readFile);
+						readFile = -1;
+					}
+					break;
+				case XLOG_READER_OOM:
+
+					/*
+					 * If we failed because of an out-of-memory problem, just
+					 * give up and retry recovery later.  It may be posible
+					 * that the WAL record to decode required a larger memory
+					 * allocation than what the host can offer.
+					 */
+					break;
 			}
 
 			/*
@@ -3147,9 +3168,12 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * WAL from the archive, even if pg_wal is completely empty, but
 			 * we'd have no idea how far we'd have to replay to reach
 			 * consistency.  So err on the safe side and give up.
+			 *
+			 * It may be possible that the record was not decoded because of
+			 * an out-of-memory failure.  In this case, just loop.
 			 */
 			if (!InArchiveRecovery && ArchiveRecoveryRequested &&
-				!fetching_ckpt)
+				!fetching_ckpt && errordata.code != XLOG_READER_OOM)
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
@@ -3173,9 +3197,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 				continue;
 			}
 
-			/* In standby mode, loop back to retry. Otherwise, give up. */
+			/*
+			 * In standby mode or if the WAL record failed on a out-of-memory,
+			 * loop back to retry.  Otherwise, give up.
+			 */
 			if (StandbyMode && !CheckForStandbyTrigger())
 				continue;
+			else if (errordata.code == XLOG_READER_OOM)
+				continue;
 			else
 				return NULL;
 		}
-- 
2.40.1

From 62406086b3d2046a3ce1d7d84d51e6ce4721b885 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 9 Aug 2023 14:53:44 +0900
Subject: [PATCH v3 3/3] Tweak to force OOM behavior when replaying records

---
 src/backend/access/transam/xlogreader.c | 26 ++++++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c29b8ff387..ed43360f78 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -557,6 +557,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	int			readOff;
 	DecodedXLogRecord *decoded;
 	XLogReaderError errordata = {0};		/* not used */
+	bool		trigger_oom = false;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -708,7 +709,30 @@ restart:
 								  total_len,
 								  !nonblocking /* allow_oversized */ );
 
-	if (decoded == NULL)
+#ifndef FRONTEND
+
+	/*
+	 * Trick to emulate an OOM after a hardcoded number of records replayed.
+	 */
+	{
+		struct stat fstat;
+		static int	counter = 0;
+
+		if (stat("/tmp/xlogreader_oom", &fstat) == 0)
+		{
+			counter++;
+			if (counter >= 100)
+			{
+				trigger_oom = true;
+
+				/* Reset counter, to not fail when shutting down WAL */
+				counter = 0;
+			}
+		}
+	}
+#endif
+
+	if (decoded == NULL || trigger_oom)
 	{
 		/*
 		 * There is no space in the decode buffer.  The caller should help
-- 
2.40.1

Attachment: signature.asc
Description: PGP signature

Reply via email to