On Tue, Sep 26, 2023 at 03:48:07PM +0900, Michael Paquier wrote:
> By the way, anything that I am proposing here cannot be backpatched
> because of the infrastructure changes required in walreader.c, so I am
> going to create a second thread with something that could be
> backpatched (yeah, likely FATALs on OOM to stop recovery from doing
> something bad)..

Patch set is rebased as an effect of 6b18b3fe2c2f, that switched the
OOMs to fail harder now in xlogreader.c.  The patch set has nothing
new, except that 0001 is now a revert of 6b18b3fe2c2f to switch back
xlogreader.c to use soft errors on OOMs.

If there's no interest in this patch set after the next CF, I'm OK to
drop it.  The state of HEAD is at least correct in the OOM cases now.
--
Michael
From aa5377d221371f6be8729a27f1df18aa9c4a48e2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Tue, 3 Oct 2023 16:12:14 +0900
Subject: [PATCH v5 1/4] Revert "Fail hard on out-of-memory failures in
 xlogreader.c"

This reverts commit 6b18b3fe2c2, putting back the code of xlogreader.c
to handle OOMs as soft failures.
---
 src/backend/access/transam/xlogreader.c | 47 ++++++++++++++++++++-----
 1 file changed, 39 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a1363e3b8f..a17263df20 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -43,7 +43,7 @@
 
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
 			pg_attribute_printf(2, 3);
-static void allocate_recordbuf(XLogReaderState *state, uint32 reclength);
+static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
 							 int reqLen);
 static void XLogReaderInvalReadState(XLogReaderState *state);
@@ -155,7 +155,14 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	 * Allocate an initial readRecordBuf of minimal size, which can later be
 	 * enlarged if necessary.
 	 */
-	allocate_recordbuf(state, 0);
+	if (!allocate_recordbuf(state, 0))
+	{
+		pfree(state->errormsg_buf);
+		pfree(state->readBuf);
+		pfree(state);
+		return NULL;
+	}
+
 	return state;
 }
 
@@ -177,6 +184,7 @@ XLogReaderFree(XLogReaderState *state)
 
 /*
  * Allocate readRecordBuf to fit a record of at least the given length.
+ * Returns true if successful, false if out of memory.
  *
  * readRecordBufSize is set to the new buffer size.
  *
@@ -188,7 +196,7 @@ XLogReaderFree(XLogReaderState *state)
  * Note: This routine should *never* be called for xl_tot_len until the header
  * of the record has been fully validated.
  */
-static void
+static bool
 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 {
 	uint32		newSize = reclength;
@@ -198,8 +206,15 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
-	state->readRecordBuf = (char *) palloc(newSize);
+	state->readRecordBuf =
+		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
+	if (state->readRecordBuf == NULL)
+	{
+		state->readRecordBufSize = 0;
+		return false;
+	}
 	state->readRecordBufSize = newSize;
+	return true;
 }
 
 /*
@@ -490,7 +505,9 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi
 	/* Not enough space in the decode buffer.  Are we allowed to allocate? */
 	if (allow_oversized)
 	{
-		decoded = palloc(required_space);
+		decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
+		if (decoded == NULL)
+			return NULL;
 		decoded->oversized = true;
 		return decoded;
 	}
@@ -798,7 +815,13 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				if (!allocate_recordbuf(state, total_len))
+				{
+					/* We treat this as a "bogus data" condition */
+					report_invalid_record(state, "record length %u at %X/%X too long",
+										  total_len, LSN_FORMAT_ARGS(RecPtr));
+					goto err;
+				}
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
@@ -854,8 +877,16 @@ restart:
 		decoded = XLogReadRecordAlloc(state,
 									  total_len,
 									  true /* allow_oversized */ );
-		/* allocation should always happen under allow_oversized */
-		Assert(decoded != NULL);
+		if (decoded == NULL)
+		{
+			/*
+			 * We failed to allocate memory for an oversized record.  As
+			 * above, we currently treat this as a "bogus data" condition.
+			 */
+			report_invalid_record(state,
+								  "out of memory while trying to decode a record of length %u", total_len);
+			goto err;
+		}
 	}
 
 	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
-- 
2.42.0

From 970f1374272c058be51eedf77585d98b925b25c0 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Tue, 26 Sep 2023 15:40:05 +0900
Subject: [PATCH v5 2/4] Add infrastructure to report error codes in WAL reader

This commits moves the error state coming from WAL readers into a new
structure, that includes the existing pointer to the error message
buffer, but it also gains an error code that fed back to the callers of
the following routines:
XLogPrefetcherReadRecord()
XLogReadRecord()
XLogNextRecord()
DecodeXLogRecord()

This will help in improving the decisions to take during recovery
depending on the failure more reported.
---
 src/include/access/xlogprefetcher.h           |   2 +-
 src/include/access/xlogreader.h               |  33 +++-
 src/backend/access/transam/twophase.c         |   8 +-
 src/backend/access/transam/xlog.c             |   6 +-
 src/backend/access/transam/xlogprefetcher.c   |   4 +-
 src/backend/access/transam/xlogreader.c       | 170 ++++++++++++------
 src/backend/access/transam/xlogrecovery.c     |  14 +-
 src/backend/access/transam/xlogutils.c        |   2 +-
 src/backend/replication/logical/logical.c     |   9 +-
 .../replication/logical/logicalfuncs.c        |   9 +-
 src/backend/replication/slotfuncs.c           |   8 +-
 src/backend/replication/walsender.c           |   8 +-
 src/bin/pg_rewind/parsexlog.c                 |  24 +--
 src/bin/pg_waldump/pg_waldump.c               |  10 +-
 contrib/pg_walinspect/pg_walinspect.c         |  11 +-
 src/tools/pgindent/typedefs.list              |   2 +
 16 files changed, 201 insertions(+), 119 deletions(-)

diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h
index 7dd7f20ad0..5563ad1a67 100644
--- a/src/include/access/xlogprefetcher.h
+++ b/src/include/access/xlogprefetcher.h
@@ -48,7 +48,7 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher,
 									XLogRecPtr recPtr);
 
 extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher,
-											char **errmsg);
+											XLogReaderError *errordata);
 
 extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher);
 
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index da32c7db77..06664dc6fb 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -58,6 +58,24 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Values for XLogReaderError.errorcode */
+typedef enum XLogReaderErrorCode
+{
+	XLOG_READER_NO_ERROR = 0,
+	XLOG_READER_OOM,			/* out-of-memory */
+	XLOG_READER_INVALID_DATA,	/* record data */
+} XLogReaderErrorCode;
+
+/* Error status generated by a WAL reader on failure */
+typedef struct XLogReaderError
+{
+	/* Buffer to hold error message */
+	char	   *message;
+	/* Error code when filling *message */
+	XLogReaderErrorCode code;
+} XLogReaderError;
+
+
 /* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
@@ -307,9 +325,9 @@ struct XLogReaderState
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
-	/* Buffer to hold error message */
-	char	   *errormsg_buf;
-	bool		errormsg_deferred;
+	/* Error state data */
+	XLogReaderError errordata;
+	bool		errordata_deferred;
 
 	/*
 	 * Flag to indicate to XLogPageReadCB that it should not block waiting for
@@ -324,7 +342,8 @@ struct XLogReaderState
 static inline bool
 XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
 {
-	return (state->decode_queue_head != NULL) || state->errormsg_deferred;
+	return (state->decode_queue_head != NULL) ||
+		state->errordata_deferred;
 }
 
 /* Get a new XLogReader */
@@ -355,11 +374,11 @@ typedef enum XLogPageReadResult
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Consume the next record or error. */
 extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
-										 char **errormsg);
+										 XLogReaderError *errordata);
 
 /* Release the previously returned record, if necessary. */
 extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state);
@@ -399,7 +418,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state,
 							 DecodedXLogRecord *decoded,
 							 XLogRecord *record,
 							 XLogRecPtr lsn,
-							 char **errormsg);
+							 XLogReaderError *errordata);
 
 /*
  * Macros that provide access to parts of the record most recently returned by
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..08bd6586ec 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1399,7 +1399,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
 									XL_ROUTINE(.page_read = &read_local_xlog_page,
@@ -1413,15 +1413,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
 	XLogBeginRead(xlogreader, lsn);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read two-phase state from WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(lsn), errormsg)));
+							LSN_FORMAT_ARGS(lsn), errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fcbde10529..56dd9f5b64 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -953,7 +953,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		DecodedXLogRecord *decoded;
 		StringInfoData buf;
 		StringInfoData recordBuf;
-		char	   *errormsg = NULL;
+		XLogReaderError errordata = {0};
 		MemoryContext oldCxt;
 
 		oldCxt = MemoryContextSwitchTo(walDebugCxt);
@@ -987,10 +987,10 @@ XLogInsertRecord(XLogRecData *rdata,
 								   decoded,
 								   record,
 								   EndPos,
-								   &errormsg))
+								   &errordata))
 		{
 			appendStringInfo(&buf, "error decoding record: %s",
-							 errormsg ? errormsg : "no error message");
+							 errordata.message ? errordata.message : "no error message");
 		}
 		else
 		{
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 539928cb85..92d691ca49 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -984,7 +984,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
  * tries to initiate I/O for blocks referenced in future WAL records.
  */
 XLogRecord *
-XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
+XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, XLogReaderError *errdata)
 {
 	DecodedXLogRecord *record;
 	XLogRecPtr	replayed_up_to;
@@ -1052,7 +1052,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
 	}
 
 	/* Read the next record. */
-	record = XLogNextRecord(prefetcher->reader, errmsg);
+	record = XLogNextRecord(prefetcher->reader, errdata);
 	if (!record)
 		return NULL;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a17263df20..fd1413b6d3 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -41,8 +41,10 @@
 #include "common/logging.h"
 #endif
 
-static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
-			pg_attribute_printf(2, 3);
+static void report_invalid_record(XLogReaderState *state,
+								  XLogReaderErrorCode errorcode,
+								  const char *fmt,...)
+			pg_attribute_printf(3, 4);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
 							 int reqLen);
@@ -66,21 +68,23 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
 
 /*
- * Construct a string in state->errormsg_buf explaining what's wrong with
+ * Construct a string in state->errordata.message explaining what's wrong with
  * the current record being read.
  */
 static void
-report_invalid_record(XLogReaderState *state, const char *fmt,...)
+report_invalid_record(XLogReaderState *state, XLogReaderErrorCode errorcode,
+					  const char *fmt,...)
 {
 	va_list		args;
 
 	fmt = _(fmt);
 
 	va_start(args, fmt);
-	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
+	vsnprintf(state->errordata.message, MAX_ERRORMSG_LEN, fmt, args);
 	va_end(args);
 
-	state->errormsg_deferred = true;
+	state->errordata_deferred = true;
+	state->errordata.code = errorcode;
 }
 
 /*
@@ -141,15 +145,16 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
-	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
-										  MCXT_ALLOC_NO_OOM);
-	if (!state->errormsg_buf)
+	state->errordata.message = palloc_extended(MAX_ERRORMSG_LEN + 1,
+											   MCXT_ALLOC_NO_OOM);
+	if (!state->errordata.message)
 	{
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
 	}
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NO_ERROR;
 
 	/*
 	 * Allocate an initial readRecordBuf of minimal size, which can later be
@@ -157,7 +162,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
 	 */
 	if (!allocate_recordbuf(state, 0))
 	{
-		pfree(state->errormsg_buf);
+		pfree(state->errordata.message);
 		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
@@ -175,7 +180,7 @@ XLogReaderFree(XLogReaderState *state)
 	if (state->decode_buffer && state->free_decode_buffer)
 		pfree(state->decode_buffer);
 
-	pfree(state->errormsg_buf);
+	pfree(state->errordata.message);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
 	pfree(state->readBuf);
@@ -335,23 +340,27 @@ XLogReleasePreviousRecord(XLogReaderState *state)
  *
  * On success, a record is returned.
  *
- * The returned record (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogNextRecord.
+ * The returned record (or errordata->message) points to an internal buffer
+ * that's valid until the next call to XLogNextRecord.
  */
 DecodedXLogRecord *
-XLogNextRecord(XLogReaderState *state, char **errormsg)
+XLogNextRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	/* Release the last record returned by XLogNextRecord(). */
 	XLogReleasePreviousRecord(state);
 
 	if (state->decode_queue_head == NULL)
 	{
-		*errormsg = NULL;
-		if (state->errormsg_deferred)
+		errordata->message = NULL;
+		errordata->code = XLOG_READER_NO_ERROR;
+		if (state->errordata_deferred)
 		{
-			if (state->errormsg_buf[0] != '\0')
-				*errormsg = state->errormsg_buf;
-			state->errormsg_deferred = false;
+			if (state->errordata.message[0] != '\0')
+				errordata->message = state->errordata.message;
+			if (state->errordata.code != XLOG_READER_NO_ERROR)
+				errordata->code = state->errordata.code;
+			state->errordata_deferred = false;
+			state->errordata.code = XLOG_READER_NO_ERROR;
 		}
 
 		/*
@@ -381,7 +390,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
 	state->ReadRecPtr = state->record->lsn;
 	state->EndRecPtr = state->record->next_lsn;
 
-	*errormsg = NULL;
+	errordata->message = NULL;
+	errordata->code = XLOG_READER_NO_ERROR;
 
 	return state->record;
 }
@@ -393,17 +403,17 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
  * to XLogReadRecord().
  *
  * If the page_read callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * returned.  The callback is expected to have reported the error;
+ * errordata->message is set to NULL.
  *
  * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * *errordata is set with details of the failure.
  *
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
+ * The returned pointer (or *errordata.message) points to an internal
+ * buffer that's valid until the next call to XLogReadRecord.
  */
 XLogRecord *
-XLogReadRecord(XLogReaderState *state, char **errormsg)
+XLogReadRecord(XLogReaderState *state, XLogReaderError *errordata)
 {
 	DecodedXLogRecord *decoded;
 
@@ -421,7 +431,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		XLogReadAhead(state, false /* nonblocking */ );
 
 	/* Consume the head record or error. */
-	decoded = XLogNextRecord(state, errormsg);
+	decoded = XLogNextRecord(state, errordata);
 	if (decoded)
 	{
 		/*
@@ -530,7 +540,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	bool		gotheader;
 	int			readOff;
 	DecodedXLogRecord *decoded;
-	char	   *errormsg;		/* not used */
+	XLogReaderError errordata = {0};	/* not used */
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -540,7 +550,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	randAccess = false;
 
 	/* reset error state */
-	state->errormsg_buf[0] = '\0';
+	state->errordata.message[0] = '\0';
+	state->errordata.code = XLOG_READER_NO_ERROR;
 	decoded = NULL;
 
 	state->abortedRecPtr = InvalidXLogRecPtr;
@@ -607,7 +618,9 @@ restart:
 	}
 	else if (targetRecOff < pageHeaderSize)
 	{
-		report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "invalid record offset at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  pageHeaderSize, targetRecOff);
 		goto err;
@@ -616,7 +629,9 @@ restart:
 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
 		targetRecOff == pageHeaderSize)
 	{
-		report_invalid_record(state, "contrecord is requested by %X/%X",
+		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
+							  "contrecord is requested by %X/%X",
 							  LSN_FORMAT_ARGS(RecPtr));
 		goto err;
 	}
@@ -657,6 +672,7 @@ restart:
 		if (total_len < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
 								  (uint32) SizeOfXLogRecord, total_len);
@@ -746,6 +762,7 @@ restart:
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "there is no contrecord flag at %X/%X",
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
@@ -759,6 +776,7 @@ restart:
 				total_len != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
 									  ((long long) total_len) - gotlen,
@@ -817,8 +835,10 @@ restart:
 				memcpy(save_copy, state->readRecordBuf, gotlen);
 				if (!allocate_recordbuf(state, total_len))
 				{
-					/* We treat this as a "bogus data" condition */
-					report_invalid_record(state, "record length %u at %X/%X too long",
+					/* We treat this as an out-of-memory error */
+					report_invalid_record(state,
+										  XLOG_READER_OOM,
+										  "record length %u at %X/%X too long",
 										  total_len, LSN_FORMAT_ARGS(RecPtr));
 					goto err;
 				}
@@ -881,15 +901,16 @@ restart:
 		{
 			/*
 			 * We failed to allocate memory for an oversized record.  As
-			 * above, we currently treat this as a "bogus data" condition.
+			 * above, we currently treat this as an out-of-memory error.
 			 */
 			report_invalid_record(state,
+								  XLOG_READER_OOM,
 								  "out of memory while trying to decode a record of length %u", total_len);
 			goto err;
 		}
 	}
 
-	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
+	if (DecodeXLogRecord(state, decoded, record, RecPtr, &errordata))
 	{
 		/* Record the location of the next record. */
 		decoded->next_lsn = state->NextRecPtr;
@@ -938,7 +959,7 @@ err:
 		 * queued so that XLogPrefetcherReadRecord() doesn't bring us back a
 		 * second time and clobber the above state.
 		 */
-		state->errormsg_deferred = true;
+		state->errordata_deferred = true;
 	}
 
 	if (decoded && decoded->oversized)
@@ -951,9 +972,9 @@ err:
 	XLogReaderInvalReadState(state);
 
 	/*
-	 * If an error was written to errmsg_buf, it'll be returned to the caller
-	 * of XLogReadRecord() after all successfully decoded records from the
-	 * read queue.
+	 * If an error was written to errordata.message, it'll be returned to the
+	 * caller of XLogReadRecord() after all successfully decoded records from
+	 * the read queue.
 	 */
 
 	return XLREAD_FAIL;
@@ -972,7 +993,7 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
 {
 	XLogPageReadResult result;
 
-	if (state->errormsg_deferred)
+	if (state->errordata_deferred)
 		return NULL;
 
 	result = XLogDecodeNextRecord(state, nonblocking);
@@ -990,8 +1011,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
  * via the page_read() callback.
  *
  * Returns XLREAD_FAIL if the required page cannot be read for some
- * reason; errormsg_buf is set in that case (unless the error occurs in the
- * page_read callback).
+ * reason; errordata.message is set in that case (unless the error occurs in
+ * the page_read callback).
  *
  * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
  * waiting.  This can be returned only if the installed page_read callback
@@ -1136,6 +1157,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid record length at %X/%X: expected at least %u, got %u",
 							  LSN_FORMAT_ARGS(RecPtr),
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
@@ -1144,6 +1166,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (!RmgrIdIsValid(record->xl_rmid))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid resource manager ID %u at %X/%X",
 							  record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
 		return false;
@@ -1157,6 +1180,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (!(record->xl_prev < RecPtr))
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1173,6 +1197,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 		if (record->xl_prev != PrevRecPtr)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "record with incorrect prev-link %X/%X at %X/%X",
 								  LSN_FORMAT_ARGS(record->xl_prev),
 								  LSN_FORMAT_ARGS(RecPtr));
@@ -1211,6 +1236,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 	if (!EQ_CRC32C(record->xl_crc, crc))
 	{
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "incorrect resource manager data checksum in record at %X/%X",
 							  LSN_FORMAT_ARGS(recptr));
 		return false;
@@ -1245,6 +1271,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_magic,
 							  fname,
@@ -1260,6 +1287,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1276,6 +1304,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			longhdr->xlp_sysid != state->system_identifier)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
 								  (unsigned long long) longhdr->xlp_sysid,
 								  (unsigned long long) state->system_identifier);
@@ -1284,12 +1313,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect segment size in page header");
 			return false;
 		}
 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
 			return false;
 		}
@@ -1302,6 +1333,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
 		/* hmm, first page of file doesn't have a long header? */
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u",
 							  hdr->xlp_info,
 							  fname,
@@ -1322,6 +1354,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
+							  XLOG_READER_INVALID_DATA,
 							  "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u",
 							  LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
 							  fname,
@@ -1348,6 +1381,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u",
 								  hdr->xlp_tli,
 								  state->latestPageTLI,
@@ -1369,8 +1403,9 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 void
 XLogReaderResetError(XLogReaderState *state)
 {
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata_deferred = false;
+	state->errordata.code = XLOG_READER_NO_ERROR;
 }
 
 /*
@@ -1390,7 +1425,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -1475,7 +1510,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * or we just jumped over the remaining data of a continuation.
 	 */
 	XLogBeginRead(state, tmpRecPtr);
-	while (XLogReadRecord(state, &errormsg) != NULL)
+	while (XLogReadRecord(state, &errordata) != NULL)
 	{
 		/* past the record we've found, break out */
 		if (RecPtr <= state->ReadRecPtr)
@@ -1620,8 +1655,9 @@ ResetDecoder(XLogReaderState *state)
 	state->decode_buffer_head = state->decode_buffer;
 
 	/* Clear error state. */
-	state->errormsg_buf[0] = '\0';
-	state->errormsg_deferred = false;
+	state->errordata.message[0] = '\0';
+	state->errordata_deferred = false;
+	state->errordata.code = XLOG_READER_NO_ERROR;
 }
 
 /*
@@ -1671,7 +1707,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				 DecodedXLogRecord *decoded,
 				 XLogRecord *record,
 				 XLogRecPtr lsn,
-				 char **errormsg)
+				 XLogReaderError *errordata)
 {
 	/*
 	 * read next _size bytes from record buffer, but check for overrun first.
@@ -1754,6 +1790,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (block_id <= decoded->max_block_id)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "out-of-order block_id %u at %X/%X",
 									  block_id,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1778,6 +1815,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (blk->has_data && blk->data_len == 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
 				goto err;
@@ -1785,6 +1823,7 @@ DecodeXLogRecord(XLogReaderState *state,
 			if (!blk->has_data && blk->data_len != 0)
 			{
 				report_invalid_record(state,
+									  XLOG_READER_INVALID_DATA,
 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
 									  (unsigned int) blk->data_len,
 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1821,6 +1860,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					 blk->bimg_len == BLCKSZ))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1837,6 +1877,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					(blk->hole_offset != 0 || blk->hole_length != 0))
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
 										  (unsigned int) blk->hole_offset,
 										  (unsigned int) blk->hole_length,
@@ -1851,6 +1892,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len == BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
 										  (unsigned int) blk->bimg_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1866,6 +1908,7 @@ DecodeXLogRecord(XLogReaderState *state,
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
@@ -1882,6 +1925,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				if (rlocator == NULL)
 				{
 					report_invalid_record(state,
+										  XLOG_READER_INVALID_DATA,
 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -1894,6 +1938,7 @@ DecodeXLogRecord(XLogReaderState *state,
 		else
 		{
 			report_invalid_record(state,
+								  XLOG_READER_INVALID_DATA,
 								  "invalid block_id %u at %X/%X",
 								  block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
 			goto err;
@@ -1961,10 +2006,12 @@ DecodeXLogRecord(XLogReaderState *state,
 
 shortdata_err:
 	report_invalid_record(state,
+						  XLOG_READER_INVALID_DATA,
 						  "record with invalid length at %X/%X",
 						  LSN_FORMAT_ARGS(state->ReadRecPtr));
 err:
-	*errormsg = state->errormsg_buf;
+	errordata->message = state->errordata.message;
+	errordata->code = state->errordata.code;
 
 	return false;
 }
@@ -2071,6 +2118,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		!record->record->blocks[block_id].in_use)
 	{
 		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
 							  "could not restore image at %X/%X with invalid block %d specified",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
@@ -2078,7 +2126,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	}
 	if (!record->record->blocks[block_id].has_image)
 	{
-		report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d",
+		report_invalid_record(record,
+							  XLOG_READER_INVALID_DATA,
+							  "could not restore image at %X/%X with invalid state, block %d",
 							  LSN_FORMAT_ARGS(record->ReadRecPtr),
 							  block_id);
 		return false;
@@ -2105,7 +2155,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "LZ4",
 								  block_id);
@@ -2122,7 +2174,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 			if (ZSTD_isError(decomp_result))
 				decomp_success = false;
 #else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with %s not supported by build, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  "zstd",
 								  block_id);
@@ -2131,7 +2185,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		}
 		else
 		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not restore image at %X/%X compressed with unknown method, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
@@ -2139,7 +2195,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 		if (!decomp_success)
 		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
+			report_invalid_record(record,
+								  XLOG_READER_INVALID_DATA,
+								  "could not decompress image at %X/%X, block %d",
 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
 								  block_id);
 			return false;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..68100bfa4a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -2454,7 +2454,7 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		if (!RestoreBlockImage(record, block_id, primary_image_masked))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * If masking function is defined, mask both the primary and replay
@@ -3062,9 +3062,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 
 	for (;;)
 	{
-		char	   *errormsg;
+		XLogReaderError errordata = {0};
 
-		record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg);
+		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
 			/*
@@ -3098,9 +3098,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * StandbyMode that only happens if we have been triggered, so we
 			 * shouldn't loop anymore in that case.
 			 */
-			if (errormsg)
+			if (errordata.message)
 				ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-						(errmsg_internal("%s", errormsg) /* already translated */ ));
+						(errmsg_internal("%s", errordata.message) /* already translated */ ));
 		}
 
 		/*
@@ -3385,9 +3385,9 @@ retry:
 		 * Emit this error right now then retry this page immediately. Use
 		 * errmsg_internal() because the message was already translated.
 		 */
-		if (xlogreader->errormsg_buf[0])
+		if (xlogreader->errordata.message[0])
 			ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr),
-					(errmsg_internal("%s", xlogreader->errormsg_buf)));
+					(errmsg_internal("%s", xlogreader->errordata.message)));
 
 		/* reset any error XLogReaderValidatePageHeader() might have set */
 		XLogReaderResetError(xlogreader);
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 43f7b31205..a50fc9cb97 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -395,7 +395,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
 		if (!RestoreBlockImage(record, block_id, page))
 			ereport(ERROR,
 					(errcode(ERRCODE_INTERNAL_ERROR),
-					 errmsg_internal("%s", record->errormsg_buf)));
+					 errmsg_internal("%s", record->errordata.message)));
 
 		/*
 		 * The page may be uninitialized. If so, we can't set the LSN because
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187..f48feab944 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -641,12 +641,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	for (;;)
 	{
 		XLogRecord *record;
-		char	   *err = NULL;
+		XLogReaderError errordata = {0};
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, &err);
-		if (err)
-			elog(ERROR, "could not find logical decoding starting point: %s", err);
+		record = XLogReadRecord(ctx->reader, &errordata);
+		if (errordata.message)
+			elog(ERROR, "could not find logical decoding starting point: %s",
+				 errordata.message);
 		if (!record)
 			elog(ERROR, "could not find logical decoding starting point");
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 197169d6b0..ca372e5f66 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,11 +244,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		while (ctx->reader->EndRecPtr < end_of_wal)
 		{
 			XLogRecord *record;
-			char	   *errm = NULL;
+			XLogReaderError errordata = {0};
 
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record for logical decoding: %s", errm);
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
+				elog(ERROR, "could not find record for logical decoding: %s",
+					 errordata.message);
 
 			/*
 			 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6035cf4816..4fa4e6bfed 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -503,17 +503,17 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		/* Decode at least one record, until we run out of records */
 		while (ctx->reader->EndRecPtr < moveto)
 		{
-			char	   *errm = NULL;
 			XLogRecord *record;
+			XLogReaderError errordata = {0};
 
 			/*
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
+			record = XLogReadRecord(ctx->reader, &errordata);
+			if (errordata.message)
 				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
+					 errordata.message);
 
 			/*
 			 * Process the record.  Storage-level changes are ignored in
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e250b0567e..55109bfa51 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3045,7 +3045,7 @@ static void
 XLogSendLogical(void)
 {
 	XLogRecord *record;
-	char	   *errm;
+	XLogReaderError errordata = {0};
 
 	/*
 	 * We'll use the current flush point to determine whether we've caught up.
@@ -3063,12 +3063,12 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+	record = XLogReadRecord(logical_decoding_ctx->reader, &errordata);
 
 	/* xlog record was invalid */
-	if (errm != NULL)
+	if (errordata.message != NULL)
 		elog(ERROR, "could not find record while sending logically-decoded data: %s",
-			 errm);
+			 errordata.message);
 
 	if (record != NULL)
 	{
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 0233ece88b..e30fb311f8 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -68,7 +68,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	private.tliIndex = tliIndex;
@@ -82,16 +82,16 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	XLogBeginRead(xlogreader, startpoint);
 	do
 	{
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
 			XLogRecPtr	errptr = xlogreader->EndRecPtr;
 
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not read WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(errptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not read WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(errptr));
@@ -126,7 +126,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 {
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 	XLogRecPtr	endptr;
 
@@ -139,12 +139,12 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
 		pg_fatal("out of memory while allocating a WAL reading processor");
 
 	XLogBeginRead(xlogreader, ptr);
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 	if (record == NULL)
 	{
-		if (errormsg)
+		if (errordata.message)
 			pg_fatal("could not read WAL record at %X/%X: %s",
-					 LSN_FORMAT_ARGS(ptr), errormsg);
+					 LSN_FORMAT_ARGS(ptr), errordata.message);
 		else
 			pg_fatal("could not read WAL record at %X/%X",
 					 LSN_FORMAT_ARGS(ptr));
@@ -173,7 +173,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	XLogRecord *record;
 	XLogRecPtr	searchptr;
 	XLogReaderState *xlogreader;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 	XLogPageReadPrivate private;
 
 	/*
@@ -204,14 +204,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 		uint8		info;
 
 		XLogBeginRead(xlogreader, searchptr);
-		record = XLogReadRecord(xlogreader, &errormsg);
+		record = XLogReadRecord(xlogreader, &errordata);
 
 		if (record == NULL)
 		{
-			if (errormsg)
+			if (errordata.message)
 				pg_fatal("could not find previous WAL record at %X/%X: %s",
 						 LSN_FORMAT_ARGS(searchptr),
-						 errormsg);
+						 errordata.message);
 			else
 				pg_fatal("could not find previous WAL record at %X/%X",
 						 LSN_FORMAT_ARGS(searchptr));
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index a3535bdfa9..880c93b51b 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -512,7 +512,7 @@ XLogRecordSaveFPWs(XLogReaderState *record, const char *savepath)
 
 		/* Full page exists, so let's save it */
 		if (!RestoreBlockImage(record, block_id, page))
-			pg_fatal("%s", record->errormsg_buf);
+			pg_fatal("%s", record->errordata.message);
 
 		(void) XLogRecGetBlockTagExtended(record, block_id,
 										  &rnode, &fork, &blk, NULL);
@@ -800,7 +800,7 @@ main(int argc, char **argv)
 	XLogRecord *record;
 	XLogRecPtr	first_record;
 	char	   *waldir = NULL;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
 	static struct option long_options[] = {
 		{"bkp-details", no_argument, NULL, 'b'},
@@ -1243,7 +1243,7 @@ main(int argc, char **argv)
 		}
 
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, &errormsg);
+		record = XLogReadRecord(xlogreader_state, &errordata);
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
@@ -1308,10 +1308,10 @@ main(int argc, char **argv)
 	if (time_to_stop)
 		exit(0);
 
-	if (errormsg)
+	if (errordata.message)
 		pg_fatal("error in WAL record at %X/%X: %s",
 				 LSN_FORMAT_ARGS(xlogreader_state->ReadRecPtr),
-				 errormsg);
+				 errordata.message);
 
 	XLogReaderFree(xlogreader_state);
 
diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..e7d30554ed 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -146,9 +146,9 @@ static XLogRecord *
 ReadNextXLogRecord(XLogReaderState *xlogreader)
 {
 	XLogRecord *record;
-	char	   *errormsg;
+	XLogReaderError errordata = {0};
 
-	record = XLogReadRecord(xlogreader, &errormsg);
+	record = XLogReadRecord(xlogreader, &errordata);
 
 	if (record == NULL)
 	{
@@ -161,11 +161,12 @@ ReadNextXLogRecord(XLogReaderState *xlogreader)
 		if (private_data->end_of_wal)
 			return NULL;
 
-		if (errormsg)
+		if (errordata.message)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr),
+							errordata.message)));
 		else
 			ereport(ERROR,
 					(errcode_for_file_access(),
@@ -384,7 +385,7 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 			if (!RestoreBlockImage(record, block_id, page))
 				ereport(ERROR,
 						(errcode(ERRCODE_INTERNAL_ERROR),
-						 errmsg_internal("%s", record->errormsg_buf)));
+						 errmsg_internal("%s", record->errordata.message)));
 
 			block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
 			SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8de90c4958..9cb62b00d5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3081,6 +3081,8 @@ XLogPageReadResult
 XLogPrefetchStats
 XLogPrefetcher
 XLogPrefetcherFilter
+XLogReaderError
+XLogReaderErrorCode
 XLogReaderRoutine
 XLogReaderState
 XLogRecData
-- 
2.42.0

From 134f203907083f557075afda695b778af627b318 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Tue, 26 Sep 2023 15:23:37 +0900
Subject: [PATCH v5 3/4] Make WAL replay more robust on OOM failures

This takes advantage of the new error facility for WAL readers, allowing
WAL replay to loop when an out-of-memory happens when reading a record.
This was the origin of potential data loss scenarios, making crash
recovery more robust by acting the same way as a standby here: each time
a record cannot be read because of an OOM, loop and try to read again
the record.
---
 src/backend/access/transam/xlogrecovery.c | 75 ++++++++++++++++-------
 1 file changed, 52 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 68100bfa4a..ed5ac06938 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3067,29 +3067,50 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 		record = XLogPrefetcherReadRecord(xlogprefetcher, &errordata);
 		if (record == NULL)
 		{
-			/*
-			 * When we find that WAL ends in an incomplete record, keep track
-			 * of that record.  After recovery is done, we'll write a record
-			 * to indicate to downstream WAL readers that that portion is to
-			 * be ignored.
-			 *
-			 * However, when ArchiveRecoveryRequested = true, we're going to
-			 * switch to a new timeline at the end of recovery. We will only
-			 * copy WAL over to the new timeline up to the end of the last
-			 * complete record, so if we did this, we would later create an
-			 * overwrite contrecord in the wrong place, breaking everything.
-			 */
-			if (!ArchiveRecoveryRequested &&
-				!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			switch (errordata.code)
 			{
-				abortedRecPtr = xlogreader->abortedRecPtr;
-				missingContrecPtr = xlogreader->missingContrecPtr;
-			}
+				case XLOG_READER_NO_ERROR:
+					/* Possible when XLogPageRead() has failed */
+					Assert(!errordata.message);
+					/* FALLTHROUGH */
 
-			if (readFile >= 0)
-			{
-				close(readFile);
-				readFile = -1;
+				case XLOG_READER_INVALID_DATA:
+
+					/*
+					 * When we find that WAL ends in an incomplete record,
+					 * keep track of that record.  After recovery is done,
+					 * we'll write a record to indicate to downstream WAL
+					 * readers that that portion is to be ignored.
+					 *
+					 * However, when ArchiveRecoveryRequested = true, we're
+					 * going to switch to a new timeline at the end of
+					 * recovery. We will only copy WAL over to the new
+					 * timeline up to the end of the last complete record, so
+					 * if we did this, we would later create an overwrite
+					 * contrecord in the wrong place, breaking everything.
+					 */
+					if (!ArchiveRecoveryRequested &&
+						!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+					{
+						abortedRecPtr = xlogreader->abortedRecPtr;
+						missingContrecPtr = xlogreader->missingContrecPtr;
+					}
+
+					if (readFile >= 0)
+					{
+						close(readFile);
+						readFile = -1;
+					}
+					break;
+				case XLOG_READER_OOM:
+
+					/*
+					 * If we failed because of an out-of-memory problem, just
+					 * give up and retry recovery later.  It may be posible
+					 * that the WAL record to decode required a larger memory
+					 * allocation than what the host can offer.
+					 */
+					break;
 			}
 
 			/*
@@ -3147,9 +3168,12 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			 * WAL from the archive, even if pg_wal is completely empty, but
 			 * we'd have no idea how far we'd have to replay to reach
 			 * consistency.  So err on the safe side and give up.
+			 *
+			 * It may be possible that the record was not decoded because of
+			 * an out-of-memory failure.  In this case, just loop.
 			 */
 			if (!InArchiveRecovery && ArchiveRecoveryRequested &&
-				!fetching_ckpt)
+				!fetching_ckpt && errordata.code != XLOG_READER_OOM)
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
@@ -3173,9 +3197,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 				continue;
 			}
 
-			/* In standby mode, loop back to retry. Otherwise, give up. */
+			/*
+			 * In standby mode or if the WAL record failed on an
+			 * out-of-memory, loop back and retry.  Otherwise, give up.
+			 */
 			if (StandbyMode && !CheckForStandbyTrigger())
 				continue;
+			else if (errordata.code == XLOG_READER_OOM)
+				continue;
 			else
 				return NULL;
 		}
-- 
2.42.0

From 1b2c50fd98a062d0b4617f8d618cdca4d6428e5a Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Tue, 26 Sep 2023 15:23:50 +0900
Subject: [PATCH v5 4/4] Tweak to force OOM behavior when replaying records

---
 src/backend/access/transam/xlogreader.c | 31 +++++++++++++++++++++----
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index fd1413b6d3..854f584e30 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -541,6 +541,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	int			readOff;
 	DecodedXLogRecord *decoded;
 	XLogReaderError errordata = {0};	/* not used */
+	bool        trigger_oom = false;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -690,7 +691,29 @@ restart:
 	decoded = XLogReadRecordAlloc(state,
 								  total_len,
 								  false /* allow_oversized */ );
-	if (decoded == NULL && nonblocking)
+
+#ifndef FRONTEND
+	/*
+	 * Trick to emulate an OOM after a hardcoded number of records
+	 * replayed.
+	 */
+	{
+		struct stat fstat;
+		static int counter = 0;
+		if (stat("/tmp/xlogreader_oom", &fstat) == 0)
+		{
+			counter++;
+			if (counter >= 100)
+			{
+				trigger_oom = true;
+				/* Reset counter, to not fail when shutting down WAL */
+				counter = 0;
+			}
+		}
+	}
+#endif
+
+	if ((decoded == NULL || trigger_oom) && nonblocking)
 	{
 		/*
 		 * There is no space in the circular decode buffer, and the caller is
@@ -833,7 +856,7 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				if (!allocate_recordbuf(state, total_len))
+				if (!allocate_recordbuf(state, total_len) || trigger_oom)
 				{
 					/* We treat this as an out-of-memory error */
 					report_invalid_record(state,
@@ -891,13 +914,13 @@ restart:
 	 * If we got here without a DecodedXLogRecord, it means we needed to
 	 * validate total_len before trusting it, but by now now we've done that.
 	 */
-	if (decoded == NULL)
+	if (decoded == NULL || trigger_oom)
 	{
 		Assert(!nonblocking);
 		decoded = XLogReadRecordAlloc(state,
 									  total_len,
 									  true /* allow_oversized */ );
-		if (decoded == NULL)
+		if (decoded == NULL || trigger_oom)
 		{
 			/*
 			 * We failed to allocate memory for an oversized record.  As
-- 
2.42.0

Attachment: signature.asc
Description: PGP signature

Reply via email to