On Wed, Aug 02, 2023 at 01:16:02PM +0900, Kyotaro Horiguchi wrote: > I believe this approach is sufficient to determine whether the error > is OOM or not. If total_len is currupted and has an excessively large > value, it's highly unlikely that all subsequent pages for that length > will be consistent. > > Do you have any thoughts on this?
This could be more flexible IMO, and actually in some cases errormsg_fatal may be eaten if using the WAL prefetcher as the error message is reported with the caller of XLogPrefetcherReadRecord(), no? Anything that has been discussed on this thread now involves a change in XLogReaderState that induces an ABI breakage. For HEAD, we are likely going in this direction, but if we are going to bite the bullet we'd better be a bit more aggressive with the integration and report an error code side-by-side with the error message returned by XLogPrefetcherReadRecord(), XLogReadRecord() and XLogNextRecord() so as all of the callers can decide what they want to do on an invalid record or just an OOM. Attached is the idea of infrastructure I have in mind, as of 0001, where this adds an error code to report_invalid_record(). For now this includes three error codes appended to the error messages generated that can be expanded if need be: no error, OOM and invalid data. The invalid data part may needs to be much more verbose, and could be improved to make this stuff "less scary" as the other thread proposes, but what I have here would be enough to trigger a different decision in the startup process if a record cannot be fetched on OOM or if there's a different reason behind that. 0002 is an example of decision that can be taken in WAL replay if we see an OOM, based on the error code received. One argument is that we may want to improve emode_for_corrupt_record() so as it reacts better on OOM, upgrading the emode wanted, but this needs more analysis depending on the code path involved. 0003 is my previous trick to inject an OOM failure at replay. Reusing the previous script, this would be enough to prevent an early redo creating a loss of data. Note that we have a few other things going in the tree. As one example, pg_walinspect would consider an OOM as the end of WAL. Not critical, still slightly incorrect as the end of WAL may not have been reached yet so it can report some incorrect information depending on what the WAL reader faces. This could be improved with the additions of 0001. Thoughts or comments? -- Michael
From acc80d3a6199dd7b28848579cb723e8af837a198 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 8 Aug 2023 16:13:56 +0900 Subject: [PATCH v1 1/3] Add infrastructure to report error codes in WAL reader This adds a field named errorcode to XLogReaderState, while the APIs in charge of reading the next WAL records report an error code in parallel of the error read. --- src/include/access/xlogprefetcher.h | 3 +- src/include/access/xlogreader.h | 16 +++- src/backend/access/transam/twophase.c | 3 +- src/backend/access/transam/xlogprefetcher.c | 5 +- src/backend/access/transam/xlogreader.c | 86 +++++++++++++++---- src/backend/access/transam/xlogrecovery.c | 3 +- src/backend/replication/logical/logical.c | 3 +- .../replication/logical/logicalfuncs.c | 3 +- src/backend/replication/slotfuncs.c | 3 +- src/backend/replication/walsender.c | 3 +- src/bin/pg_rewind/parsexlog.c | 9 +- src/bin/pg_waldump/pg_waldump.c | 3 +- contrib/pg_walinspect/pg_walinspect.c | 3 +- 13 files changed, 112 insertions(+), 31 deletions(-) diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h index 7dd7f20ad0..7f80ed922f 100644 --- a/src/include/access/xlogprefetcher.h +++ b/src/include/access/xlogprefetcher.h @@ -48,7 +48,8 @@ extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr); extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, - char **errmsg); + char **errmsg, + XLogReaderError *errorcode); extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index da32c7db77..24554de10f 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -58,6 +58,14 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Values for XLogReaderState.reason */ +typedef enum XLogReaderError +{ + XLOG_READER_NONE = 0, + XLOG_READER_OOM, /* out-of-memory */ + XLOG_READER_INVALID_DATA, /* record data */ +} XLogReaderError; + /* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -310,6 +318,8 @@ struct XLogReaderState /* Buffer to hold error message */ char *errormsg_buf; bool errormsg_deferred; + /* Error code when filling errormsg_buf */ + XLogReaderError errorcode; /* * Flag to indicate to XLogPageReadCB that it should not block waiting for @@ -355,11 +365,13 @@ typedef enum XLogPageReadResult /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); + char **errormsg, + XLogReaderError *errorcode); /* Consume the next record or error. */ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, - char **errormsg); + char **errormsg, + XLogReaderError *errorcode); /* Release the previously returned record, if necessary. */ extern XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e..79ed829e7a 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1400,6 +1400,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; + XLogReaderError errorcode; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.page_read = &read_local_xlog_page, @@ -1413,7 +1414,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg, &errorcode); if (record == NULL) { diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index 539928cb85..87ed7aa7b1 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -984,7 +984,8 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) * tries to initiate I/O for blocks referenced in future WAL records. */ XLogRecord * -XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) +XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg, + XLogReaderError *errcode) { DecodedXLogRecord *record; XLogRecPtr replayed_up_to; @@ -1052,7 +1053,7 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) } /* Read the next record. */ - record = XLogNextRecord(prefetcher->reader, errmsg); + record = XLogNextRecord(prefetcher->reader, errmsg, errcode); if (!record) return NULL; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c9f9f6e98f..8fbf2d3513 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -41,8 +41,10 @@ #include "common/logging.h" #endif -static void report_invalid_record(XLogReaderState *state, const char *fmt,...) - pg_attribute_printf(2, 3); +static void report_invalid_record(XLogReaderState *state, + XLogReaderError errorcode, + const char *fmt,...) + pg_attribute_printf(3, 4); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen); @@ -70,7 +72,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, * the current record being read. */ static void -report_invalid_record(XLogReaderState *state, const char *fmt,...) +report_invalid_record(XLogReaderState *state, XLogReaderError errorcode, + const char *fmt,...) { va_list args; @@ -81,6 +84,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) va_end(args); state->errormsg_deferred = true; + state->errorcode = errorcode; } /* @@ -355,7 +359,8 @@ XLogReleasePreviousRecord(XLogReaderState *state) * valid until the next call to XLogNextRecord. */ DecodedXLogRecord * -XLogNextRecord(XLogReaderState *state, char **errormsg) +XLogNextRecord(XLogReaderState *state, char **errormsg, + XLogReaderError *errorcode) { /* Release the last record returned by XLogNextRecord(). */ XLogReleasePreviousRecord(state); @@ -367,7 +372,10 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) { if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; + if (state->errorcode != XLOG_READER_NONE) + *errorcode = state->errorcode; state->errormsg_deferred = false; + state->errorcode = XLOG_READER_NONE; } /* @@ -419,7 +427,8 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) * valid until the next call to XLogReadRecord. */ XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecord(XLogReaderState *state, char **errormsg, + XLogReaderError *errorcode) { DecodedXLogRecord *decoded; @@ -437,7 +446,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) XLogReadAhead(state, false /* nonblocking */ ); /* Consume the head record or error. */ - decoded = XLogNextRecord(state, errormsg); + decoded = XLogNextRecord(state, errormsg, errorcode); if (decoded) { /* @@ -623,7 +632,9 @@ restart: } else if (targetRecOff < pageHeaderSize) { - report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "invalid record offset at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), pageHeaderSize, targetRecOff); goto err; @@ -632,7 +643,9 @@ restart: if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && targetRecOff == pageHeaderSize) { - report_invalid_record(state, "contrecord is requested by %X/%X", + report_invalid_record(state, + XLOG_READER_INVALID_DATA, + "contrecord is requested by %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -673,6 +686,7 @@ restart: if (total_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, total_len); @@ -702,6 +716,7 @@ restart: /* We failed to allocate memory for an oversized record. */ report_invalid_record(state, + XLOG_READER_OOM, "out of memory while trying to decode a record of length %u", total_len); goto err; } @@ -724,7 +739,9 @@ restart: !allocate_recordbuf(state, total_len)) { /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", + report_invalid_record(state, + XLOG_READER_OOM, + "record length %u at %X/%X too long", total_len, LSN_FORMAT_ARGS(RecPtr)); goto err; } @@ -773,6 +790,7 @@ restart: if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "there is no contrecord flag at %X/%X", LSN_FORMAT_ARGS(RecPtr)); goto err; @@ -786,6 +804,7 @@ restart: total_len != (pageHeader->xlp_rem_len + gotlen)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid contrecord length %u (expected %lld) at %X/%X", pageHeader->xlp_rem_len, ((long long) total_len) - gotlen, @@ -1116,6 +1135,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid record length at %X/%X: expected at least %u, got %u", LSN_FORMAT_ARGS(RecPtr), (uint32) SizeOfXLogRecord, record->xl_tot_len); @@ -1124,6 +1144,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!RmgrIdIsValid(record->xl_rmid)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid resource manager ID %u at %X/%X", record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; @@ -1137,6 +1158,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (!(record->xl_prev < RecPtr)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1153,6 +1175,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, if (record->xl_prev != PrevRecPtr) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with incorrect prev-link %X/%X at %X/%X", LSN_FORMAT_ARGS(record->xl_prev), LSN_FORMAT_ARGS(RecPtr)); @@ -1189,6 +1212,7 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) if (!EQ_CRC32C(record->xl_crc, crc)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "incorrect resource manager data checksum in record at %X/%X", LSN_FORMAT_ARGS(recptr)); return false; @@ -1223,6 +1247,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_magic, fname, @@ -1238,6 +1263,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1254,6 +1280,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, longhdr->xlp_sysid != state->system_identifier) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu", (unsigned long long) longhdr->xlp_sysid, (unsigned long long) state->system_identifier); @@ -1262,12 +1289,14 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect segment size in page header"); return false; } else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header"); return false; } @@ -1280,6 +1309,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, /* hmm, first page of file doesn't have a long header? */ report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_info, fname, @@ -1300,6 +1330,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u", LSN_FORMAT_ARGS(hdr->xlp_pageaddr), fname, @@ -1326,6 +1357,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u", hdr->xlp_tli, state->latestPageTLI, @@ -1349,6 +1381,7 @@ XLogReaderResetError(XLogReaderState *state) { state->errormsg_buf[0] = '\0'; state->errormsg_deferred = false; + state->errorcode = XLOG_READER_NONE; } /* @@ -1369,6 +1402,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; char *errormsg; + XLogReaderError errorcode; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -1453,7 +1487,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * or we just jumped over the remaining data of a continuation. */ XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + while (XLogReadRecord(state, &errormsg, &errorcode) != NULL) { /* past the record we've found, break out */ if (RecPtr <= state->ReadRecPtr) @@ -1600,6 +1634,7 @@ ResetDecoder(XLogReaderState *state) /* Clear error state. */ state->errormsg_buf[0] = '\0'; state->errormsg_deferred = false; + state->errorcode = XLOG_READER_NONE; } /* @@ -1732,6 +1767,7 @@ DecodeXLogRecord(XLogReaderState *state, if (block_id <= decoded->max_block_id) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "out-of-order block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1756,6 +1792,7 @@ DecodeXLogRecord(XLogReaderState *state, if (blk->has_data && blk->data_len == 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1763,6 +1800,7 @@ DecodeXLogRecord(XLogReaderState *state, if (!blk->has_data && blk->data_len != 0) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1799,6 +1837,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1815,6 +1854,7 @@ DecodeXLogRecord(XLogReaderState *state, (blk->hole_offset != 0 || blk->hole_length != 0)) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, @@ -1829,6 +1869,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len == BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X", (unsigned int) blk->bimg_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1844,6 +1885,7 @@ DecodeXLogRecord(XLogReaderState *state, blk->bimg_len != BLCKSZ) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X", (unsigned int) blk->data_len, LSN_FORMAT_ARGS(state->ReadRecPtr)); @@ -1860,6 +1902,7 @@ DecodeXLogRecord(XLogReaderState *state, if (rlocator == NULL) { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1872,6 +1915,7 @@ DecodeXLogRecord(XLogReaderState *state, else { report_invalid_record(state, + XLOG_READER_INVALID_DATA, "invalid block_id %u at %X/%X", block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; @@ -1939,6 +1983,7 @@ DecodeXLogRecord(XLogReaderState *state, shortdata_err: report_invalid_record(state, + XLOG_READER_INVALID_DATA, "record with invalid length at %X/%X", LSN_FORMAT_ARGS(state->ReadRecPtr)); err: @@ -2049,6 +2094,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) !record->record->blocks[block_id].in_use) { report_invalid_record(record, + XLOG_READER_INVALID_DATA, "could not restore image at %X/%X with invalid block %d specified", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); @@ -2056,7 +2102,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } if (!record->record->blocks[block_id].has_image) { - report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X with invalid state, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2083,7 +2131,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "LZ4", block_id); @@ -2100,7 +2150,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (ZSTD_isError(decomp_result)) decomp_success = false; #else - report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with %s not supported by build, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), "zstd", block_id); @@ -2109,7 +2161,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) } else { - report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not restore image at %X/%X compressed with unknown method, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; @@ -2117,7 +2171,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) if (!decomp_success) { - report_invalid_record(record, "could not decompress image at %X/%X, block %d", + report_invalid_record(record, + XLOG_READER_INVALID_DATA, + "could not decompress image at %X/%X, block %d", LSN_FORMAT_ARGS(record->ReadRecPtr), block_id); return false; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index becc2bda62..06b00c7c46 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3063,8 +3063,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, for (;;) { char *errormsg; + XLogReaderError errorcode; - record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg); + record = XLogPrefetcherReadRecord(xlogprefetcher, &errormsg, &errorcode); if (record == NULL) { /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..ea39bc5353 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -642,9 +642,10 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecord *record; char *err = NULL; + XLogReaderError code; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); + record = XLogReadRecord(ctx->reader, &err, &code); if (err) elog(ERROR, "could not find logical decoding starting point: %s", err); if (!record) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 55a24c02c9..e411543111 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -245,8 +245,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin { XLogRecord *record; char *errm = NULL; + XLogReaderError errcode; - record = XLogReadRecord(ctx->reader, &errm); + record = XLogReadRecord(ctx->reader, &errm, &errcode); if (errm) elog(ERROR, "could not find record for logical decoding: %s", errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..e09c641a0b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -505,12 +505,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) { char *errm = NULL; XLogRecord *record; + XLogReaderError errcode; /* * Read records. No changes are generated in fast_forward mode, * but snapbuilder/slot statuses are updated properly. */ - record = XLogReadRecord(ctx->reader, &errm); + record = XLogReadRecord(ctx->reader, &errm, &errcode); if (errm) elog(ERROR, "could not find record while advancing replication slot: %s", errm); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d27ef2985d..39b10c7570 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3046,6 +3046,7 @@ XLogSendLogical(void) { XLogRecord *record; char *errm; + XLogReaderError errcode; /* * We'll use the current flush point to determine whether we've caught up. @@ -3063,7 +3064,7 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + record = XLogReadRecord(logical_decoding_ctx->reader, &errm, &errcode); /* xlog record was invalid */ if (errm != NULL) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 27782237d0..2fcbf6f904 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -69,6 +69,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; + XLogReaderError errorcode; XLogPageReadPrivate private; private.tliIndex = tliIndex; @@ -82,7 +83,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogBeginRead(xlogreader, startpoint); do { - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg, &errorcode); if (record == NULL) { @@ -127,6 +128,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; + XLogReaderError errorcode; XLogPageReadPrivate private; XLogRecPtr endptr; @@ -139,7 +141,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, pg_fatal("out of memory while allocating a WAL reading processor"); XLogBeginRead(xlogreader, ptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg, &errorcode); if (record == NULL) { if (errormsg) @@ -174,6 +176,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecPtr searchptr; XLogReaderState *xlogreader; char *errormsg; + XLogReaderError errorcode; XLogPageReadPrivate private; /* @@ -204,7 +207,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, uint8 info; XLogBeginRead(xlogreader, searchptr); - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg, &errorcode); if (record == NULL) { diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e8b5a6cd61..cc38e3752f 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -797,6 +797,7 @@ main(int argc, char **argv) XLogRecPtr first_record; char *waldir = NULL; char *errormsg; + XLogReaderError errorcode; static struct option long_options[] = { {"bkp-details", no_argument, NULL, 'b'}, @@ -1239,7 +1240,7 @@ main(int argc, char **argv) } /* try to read the next record */ - record = XLogReadRecord(xlogreader_state, &errormsg); + record = XLogReadRecord(xlogreader_state, &errormsg, &errorcode); if (!record) { if (!config.follow || private.endptr_reached) diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c index 796a74f322..dea9837611 100644 --- a/contrib/pg_walinspect/pg_walinspect.c +++ b/contrib/pg_walinspect/pg_walinspect.c @@ -147,8 +147,9 @@ ReadNextXLogRecord(XLogReaderState *xlogreader) { XLogRecord *record; char *errormsg; + XLogReaderError errorcode; - record = XLogReadRecord(xlogreader, &errormsg); + record = XLogReadRecord(xlogreader, &errormsg, &errorcode); if (record == NULL) { -- 2.40.1
From c5c0c5d3b5fc43760da0fa4ed9466077af573056 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 8 Aug 2023 16:22:18 +0900 Subject: [PATCH v1 2/3] Force a FATAL when facing OOM in WAL replay --- src/backend/access/transam/xlogrecovery.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 06b00c7c46..e3b156ec15 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3098,9 +3098,15 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, * failed - in that case we already logged something. In * StandbyMode that only happens if we have been triggered, so we * shouldn't loop anymore in that case. + * + * If we failed because of an out-of-memory problem, just give up + * and retry recovery later. It may be posible that the WAL record + * to decode required a larger memory allocation than what the host + * can offer. */ if (errormsg) - ereport(emode_for_corrupt_record(emode, xlogreader->EndRecPtr), + ereport(errorcode == XLOG_READER_OOM ? + FATAL : emode_for_corrupt_record(emode, xlogreader->EndRecPtr), (errmsg_internal("%s", errormsg) /* already translated */ )); } -- 2.40.1
From a8621af2716d92133d91ae372777743643f9b3be Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Tue, 1 Aug 2023 11:49:53 +0900 Subject: [PATCH v1 3/3] Tweak to force OOM behavior when replaying records --- src/backend/access/transam/xlogreader.c | 27 ++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 8fbf2d3513..b9dc5a780b 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -556,6 +556,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) int readOff; DecodedXLogRecord *decoded; char *errormsg; /* not used */ + bool trigger_oom = false; /* * randAccess indicates whether to verify the previous-record pointer of @@ -705,7 +706,31 @@ restart: decoded = XLogReadRecordAlloc(state, total_len, !nonblocking /* allow_oversized */ ); - if (decoded == NULL) + +#ifndef FRONTEND + /* + * Trick to emulate an OOM after a hardcoded number of records + * replayed. + */ + { + struct stat fstat; + static int counter = 0; + + if (stat("/tmp/xlogreader_oom", &fstat) == 0) + { + counter++; + if (counter >= 100) + { + trigger_oom = true; + + /* Reset counter, to not fail when shutting down WAL */ + counter = 0; + } + } + } +#endif + + if (decoded == NULL || trigger_oom) { /* * There is no space in the decode buffer. The caller should help -- 2.40.1
signature.asc
Description: PGP signature