At Tue, 01 Aug 2023 15:28:54 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> I thoght that the failure on a stanby results in continuing to retry
> reading the next record. However, I found that there's a case where
> start process stops in response to OOM [1].

I've examined the calls to
MemoryContextAllocExtended(..,MCXT_ALLOC_NO_OOM). In server recovery
path, XLogDecodeNextRecord is the only function that uses it.

So, there doesn't seem to be a problem here. I proceeded to test the
idea of only varifying headers after an allocation failure, and I've
attached a PoC.

- allocate_recordbuf() ensures a minimum of SizeOfXLogRecord bytes
  when it reutnrs false, indicating an allocation failure.

- If allocate_recordbuf() returns false, XLogDecodeNextRecord()
  continues to read pages and perform header checks until the
  total_len reached, but not copying data (except for the logical
  record header, when the first page didn't store the entire header).

- If all relevant WAL pages are consistent, ReadRecord concludes with
  an 'out of memory' ERROR, which then escalates to FATAL.

I believe this approach is sufficient to determine whether the error
is OOM or not. If total_len is currupted and has an excessively large
value, it's highly unlikely that all subsequent pages for that length
will be consistent.

Do you have any thoughts on this?

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index c9f9f6e98f..a6c57d50bf 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -150,6 +150,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
                return NULL;
        }
        state->errormsg_buf[0] = '\0';
+       state->errormsg_fatal =false;
 
        /*
         * Allocate an initial readRecordBuf of minimal size, which can later be
@@ -196,6 +197,7 @@ XLogReaderFree(XLogReaderState *state)
 static bool
 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 {
+       static char fallback_buf[SizeOfXLogRecord];
        uint32          newSize = reclength;
 
        newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
@@ -224,9 +226,12 @@ allocate_recordbuf(XLogReaderState *state, uint32 
reclength)
                pfree(state->readRecordBuf);
        state->readRecordBuf =
                (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
+
        if (state->readRecordBuf == NULL)
        {
-               state->readRecordBufSize = 0;
+               /* failed to allocate buffer. use the fallback buffer instead */
+               state->readRecordBufSize = SizeOfXLogRecord;
+               state->readRecordBuf = fallback_buf;
                return false;
        }
        state->readRecordBufSize = newSize;
@@ -547,6 +552,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool 
nonblocking)
        int                     readOff;
        DecodedXLogRecord *decoded;
        char       *errormsg;           /* not used */
+       bool            verify_only;
 
        /*
         * randAccess indicates whether to verify the previous-record pointer of
@@ -589,6 +595,7 @@ XLogDecodeNextRecord(XLogReaderState *state, bool 
nonblocking)
        }
 
 restart:
+       verify_only = false;
        state->nonblocking = nonblocking;
        state->currRecPtr = RecPtr;
        assembled = false;
@@ -702,8 +709,12 @@ restart:
 
                /* We failed to allocate memory for an oversized record. */
                report_invalid_record(state,
-                                                         "out of memory while 
trying to decode a record of length %u", total_len);
-               goto err;
+                                                         "out of memory while 
trying to decode a record of length %u at %X/%X",
+                                                         total_len, 
LSN_FORMAT_ARGS(RecPtr));
+               if (gotheader)
+                       state->errormsg_fatal = true;
+                                       
+               verify_only = true;
        }
 
        len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
@@ -719,19 +730,19 @@ restart:
 
                /*
                 * Enlarge readRecordBuf as needed.
+                * If failed, continue only performing verification.
                 */
-               if (total_len > state->readRecordBufSize &&
+               if (!verify_only &&
+                       total_len > state->readRecordBufSize &&
                        !allocate_recordbuf(state, total_len))
-               {
-                       /* We treat this as a "bogus data" condition */
-                       report_invalid_record(state, "record length %u at %X/%X 
too long",
-                                                                 total_len, 
LSN_FORMAT_ARGS(RecPtr));
-                       goto err;
-               }
+                       verify_only = true;
 
                /* Copy the first fragment of the record from the first page. */
-               memcpy(state->readRecordBuf,
-                          state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+               Assert(!verify_only || gotheader || len < SizeOfXLogRecord);
+               if (!verify_only || !gotheader)
+                       memcpy(state->readRecordBuf,
+                                  state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+                       
                buffer = state->readRecordBuf + len;
                gotlen = len;
 
@@ -811,13 +822,23 @@ restart:
                                readOff = ReadPageInternal(state, targetPagePtr,
                                                                                
   pageHeaderSize + len);
 
-                       memcpy(buffer, (char *) contdata, len);
+                       if (!verify_only)
+                               memcpy(buffer, (char *) contdata, len);
+                               
                        buffer += len;
                        gotlen += len;
 
                        /* If we just reassembled the record header, validate 
it. */
                        if (!gotheader)
                        {
+                               if (verify_only)
+                               {
+                                       Assert(gotlen - len < SizeOfXLogRecord 
&&
+                                                  gotlen + len >= 
SizeOfXLogRecord);
+                                       memcpy(buffer, (char *) contdata,
+                                                  SizeOfXLogRecord - (gotlen - 
len));
+                               }
+                                       
                                record = (XLogRecord *) state->readRecordBuf;
                                if (!ValidXLogRecordHeader(state, RecPtr, 
state->DecodeRecPtr,
                                                                                
   record, randAccess))
@@ -828,6 +849,15 @@ restart:
 
                Assert(gotheader);
 
+               if (verify_only)
+               {
+                       report_invalid_record(state,
+                                                                 "out of 
memory while trying to decode a record of length %u at %X/%X",
+                                                                 total_len, 
LSN_FORMAT_ARGS(RecPtr));
+                       state->errormsg_fatal = true;
+                       goto err;
+               }
+
                record = (XLogRecord *) state->readRecordBuf;
                if (!ValidXLogRecord(state, record, RecPtr))
                        goto err;
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index becc2bda62..f37d85254f 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3099,8 +3099,16 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
                         * shouldn't loop anymore in that case.
                         */
                        if (errormsg)
-                               ereport(emode_for_corrupt_record(emode, 
xlogreader->EndRecPtr),
+                       {
+                               int tmp_emode =
+                                       emode_for_corrupt_record(emode, 
xlogreader->EndRecPtr);
+
+                               if (xlogreader->errormsg_fatal)
+                                       tmp_emode = ERROR;
+                               ereport(tmp_emode,
                                                (errmsg_internal("%s", 
errormsg) /* already translated */ ));
+                       }
+                       
                }
 
                /*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index da32c7db77..a2fca907e4 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -310,7 +310,8 @@ struct XLogReaderState
        /* Buffer to hold error message */
        char       *errormsg_buf;
        bool            errormsg_deferred;
-
+       bool            errormsg_fatal;
+       
        /*
         * Flag to indicate to XLogPageReadCB that it should not block waiting 
for
         * data.

Reply via email to