On Fri, Mar 11, 2022 at 6:31 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> Thanks for your review of 0001!  It gave me a few things to think
> about and some good improvements.

And just in case it's useful, here's what changed between v21 and v22..
diff --git a/src/backend/access/transam/xlogreader.c 
b/src/backend/access/transam/xlogreader.c
index 86a7b4c5c8..0d0c556b7c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -90,8 +90,8 @@ XLogReaderSetDecodeBuffer(XLogReaderState *state, void 
*buffer, size_t size)
 
        state->decode_buffer = buffer;
        state->decode_buffer_size = size;
-       state->decode_buffer_head = buffer;
        state->decode_buffer_tail = buffer;
+       state->decode_buffer_head = buffer;
 }
 
 /*
@@ -271,7 +271,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
 
 /*
  * See if we can release the last record that was returned by
- * XLogNextRecord(), to free up space.
+ * XLogNextRecord(), if any, to free up space.
  */
 void
 XLogReleasePreviousRecord(XLogReaderState *state)
@@ -283,16 +283,16 @@ XLogReleasePreviousRecord(XLogReaderState *state)
 
        /*
         * Remove it from the decoded record queue.  It must be the oldest item
-        * decoded, decode_queue_tail.
+        * decoded, decode_queue_head.
         */
        record = state->record;
-       Assert(record == state->decode_queue_tail);
+       Assert(record == state->decode_queue_head);
        state->record = NULL;
-       state->decode_queue_tail = record->next;
+       state->decode_queue_head = record->next;
 
-       /* It might also be the newest item decoded, decode_queue_head. */
-       if (state->decode_queue_head == record)
-               state->decode_queue_head = NULL;
+       /* It might also be the newest item decoded, decode_queue_tail. */
+       if (state->decode_queue_tail == record)
+               state->decode_queue_tail = NULL;
 
        /* Release the space. */
        if (unlikely(record->oversized))
@@ -302,11 +302,11 @@ XLogReleasePreviousRecord(XLogReaderState *state)
        }
        else
        {
-               /* It must be the tail record in the decode buffer. */
-               Assert(state->decode_buffer_tail == (char *) record);
+               /* It must be the head (oldest) record in the decode buffer. */
+               Assert(state->decode_buffer_head == (char *) record);
 
                /*
-                * We need to update tail to point to the next record that is 
in the
+                * We need to update head to point to the next record that is 
in the
                 * decode buffer, if any, being careful to skip oversized ones
                 * (they're not in the decode buffer).
                 */
@@ -316,8 +316,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
 
                if (record)
                {
-                       /* Adjust tail to release space up to the next record. 
*/
-                       state->decode_buffer_tail = (char *) record;
+                       /* Adjust head to release space up to the next record. 
*/
+                       state->decode_buffer_head = (char *) record;
                }
                else
                {
@@ -327,8 +327,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
                         * we'll keep overwriting the same piece of memory if 
we're not
                         * doing any prefetching.
                         */
-                       state->decode_buffer_tail = state->decode_buffer;
                        state->decode_buffer_head = state->decode_buffer;
+                       state->decode_buffer_tail = state->decode_buffer;
                }
        }
 }
@@ -351,7 +351,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
        /* Release the last record returned by XLogNextRecord(). */
        XLogReleasePreviousRecord(state);
 
-       if (state->decode_queue_tail == NULL)
+       if (state->decode_queue_head == NULL)
        {
                *errormsg = NULL;
                if (state->errormsg_deferred)
@@ -376,7 +376,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg)
         * XLogRecXXX(xlogreader) macros, which work with the decoder rather 
than
         * the record for historical reasons.
         */
-       state->record = state->decode_queue_tail;
+       state->record = state->decode_queue_head;
 
        /*
         * Update the pointers to the beginning and one-past-the-end of this
@@ -428,12 +428,12 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
        if (!XLogReaderHasQueuedRecordOrError(state))
                XLogReadAhead(state, false /* nonblocking */ );
 
-       /* Consume the tail record or error. */
+       /* Consume the head record or error. */
        decoded = XLogNextRecord(state, errormsg);
        if (decoded)
        {
                /*
-                * XLogReadRecord() returns a pointer to the record's header, 
not the
+                * This function returns a pointer to the record's header, not 
the
                 * actual decoded record.  The caller will access the decoded 
record
                 * through the XLogRecGetXXX() macros, which reach the decoded
                 * recorded as xlogreader->record.
@@ -451,6 +451,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
  * decoded record wouldn't fit in the decode buffer and must eventually be
  * freed explicitly.
  *
+ * The caller is responsible for adjusting decode_buffer_tail with the real
+ * size after successfully decoding a record into this space.  This way, if
+ * decoding fails, then there is nothing to undo unless the 'oversized' flag
+ * was set and pfree() must be called.
+ *
  * Return NULL if there is no space in the decode buffer and allow_oversized
  * is false, or if memory allocation fails for an oversized buffer.
  */
@@ -470,21 +475,23 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t 
xl_tot_len, bool allow_oversi
                state->decode_buffer_tail = state->decode_buffer;
                state->free_decode_buffer = true;
        }
-       if (state->decode_buffer_head >= state->decode_buffer_tail)
+
+       /* Try to allocate space in the circular decode buffer. */
+       if (state->decode_buffer_tail >= state->decode_buffer_head)
        {
-               /* Empty, or head is to the right of tail. */
-               if (state->decode_buffer_head + required_space <=
+               /* Empty, or tail is to the right of head. */
+               if (state->decode_buffer_tail + required_space <=
                        state->decode_buffer + state->decode_buffer_size)
                {
-                       /* There is space between head and end. */
-                       decoded = (DecodedXLogRecord *) 
state->decode_buffer_head;
+                       /* There is space between tail and end. */
+                       decoded = (DecodedXLogRecord *) 
state->decode_buffer_tail;
                        decoded->oversized = false;
                        return decoded;
                }
                else if (state->decode_buffer + required_space <
-                                state->decode_buffer_tail)
+                                state->decode_buffer_head)
                {
-                       /* There is space between start and tail. */
+                       /* There is space between start and head. */
                        decoded = (DecodedXLogRecord *) state->decode_buffer;
                        decoded->oversized = false;
                        return decoded;
@@ -492,12 +499,12 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t 
xl_tot_len, bool allow_oversi
        }
        else
        {
-               /* Head is to the left of tail. */
-               if (state->decode_buffer_head + required_space <
-                       state->decode_buffer_tail)
+               /* Tail is to the left of head. */
+               if (state->decode_buffer_tail + required_space <
+                       state->decode_buffer_head)
                {
-                       /* There is space between head and tail. */
-                       decoded = (DecodedXLogRecord *) 
state->decode_buffer_head;
+                       /* There is space between tail and heade. */
+                       decoded = (DecodedXLogRecord *) 
state->decode_buffer_tail;
                        decoded->oversized = false;
                        return decoded;
                }
@@ -513,7 +520,7 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t 
xl_tot_len, bool allow_oversi
                return decoded;
        }
 
-       return decoded;
+       return NULL;
 }
 
 static XLogPageReadResult
@@ -748,7 +755,6 @@ restart:
                        if (pageHeader->xlp_info & 
XLP_FIRST_IS_OVERWRITE_CONTRECORD)
                        {
                                state->overwrittenRecPtr = RecPtr;
-                               //ResetDecoder(state);
                                RecPtr = targetPagePtr;
                                goto restart;
                        }
@@ -865,18 +871,18 @@ restart:
                        /* The new decode buffer head must be MAXALIGNed. */
                        Assert(decoded->size == MAXALIGN(decoded->size));
                        if ((char *) decoded == state->decode_buffer)
-                               state->decode_buffer_head = 
state->decode_buffer + decoded->size;
+                               state->decode_buffer_tail = 
state->decode_buffer + decoded->size;
                        else
-                               state->decode_buffer_head += decoded->size;
+                               state->decode_buffer_tail += decoded->size;
                }
 
                /* Insert it into the queue of decoded records. */
-               Assert(state->decode_queue_head != decoded);
-               if (state->decode_queue_head)
-                       state->decode_queue_head->next = decoded;
-               state->decode_queue_head = decoded;
-               if (!state->decode_queue_tail)
-                       state->decode_queue_tail = decoded;
+               Assert(state->decode_queue_tail != decoded);
+               if (state->decode_queue_tail)
+                       state->decode_queue_tail->next = decoded;
+               state->decode_queue_tail = decoded;
+               if (!state->decode_queue_head)
+                       state->decode_queue_head = decoded;
                return XLREAD_SUCCESS;
        }
        else
@@ -935,8 +941,8 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
        result = XLogDecodeNextRecord(state, nonblocking);
        if (result == XLREAD_SUCCESS)
        {
-               Assert(state->decode_queue_head != NULL);
-               return state->decode_queue_head;
+               Assert(state->decode_queue_tail != NULL);
+               return state->decode_queue_tail;
        }
 
        return NULL;
@@ -946,8 +952,14 @@ XLogReadAhead(XLogReaderState *state, bool nonblocking)
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
  * via the page_read() callback.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns XLREAD_FAIL if the required page cannot be read for some
+ * reason; errormsg_buf is set in that case (unless the error occurs in the
+ * page_read callback).
+ *
+ * Returns XLREAD_WOULDBLOCK if he requested data can't be read without
+ * waiting.  This can be returned only if the installed page_read callback
+ * respects the state->nonblocking flag, and cannot read the requested data
+ * immediately.
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -1334,6 +1346,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr 
RecPtr)
 
        Assert(!XLogRecPtrIsInvalid(RecPtr));
 
+       /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
+       state->nonblocking = false;
+
        /*
         * skip over potential continuation data, keeping in mind that it may 
span
         * multiple pages
@@ -1544,19 +1559,19 @@ ResetDecoder(XLogReaderState *state)
        DecodedXLogRecord *r;
 
        /* Reset the decoded record queue, freeing any oversized records. */
-       while ((r = state->decode_queue_tail))
+       while ((r = state->decode_queue_head) != NULL)
        {
-               state->decode_queue_tail = r->next;
+               state->decode_queue_head = r->next;
                if (r->oversized)
                        pfree(r);
        }
-       state->decode_queue_head = NULL;
        state->decode_queue_tail = NULL;
+       state->decode_queue_head = NULL;
        state->record = NULL;
 
        /* Reset the decode buffer to empty. */
-       state->decode_buffer_head = state->decode_buffer;
        state->decode_buffer_tail = state->decode_buffer;
+       state->decode_buffer_head = state->decode_buffer;
 
        /* Clear error state. */
        state->errormsg_buf[0] = '\0';
diff --git a/src/backend/access/transam/xlogutils.c 
b/src/backend/access/transam/xlogutils.c
index 44d9313422..ea22577b41 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -373,7 +373,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
         * going to initialize it. And vice versa.
         */
        zeromode = (mode == RBM_ZERO_AND_LOCK || mode == 
RBM_ZERO_AND_CLEANUP_LOCK);
-       willinit = (record->record->blocks[block_id].flags & 
BKPBLOCK_WILL_INIT) != 0;
+       willinit = (XLogRecGetBlock(record, block_id)->flags & 
BKPBLOCK_WILL_INIT) != 0;
        if (willinit && !zeromode)
                elog(PANIC, "block with WILL_INIT flag in WAL record must be 
zeroed by redo routine");
        if (!willinit && zeromode)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index c129df44ac..a33ad034c0 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -403,14 +403,13 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 
*rec_len, uint32 *fpi_len)
         * Calculate the amount of FPI data in the record.
         *
         * XXX: We peek into xlogreader's private decoded backup blocks for the
-        * bimg_len indicating the length of FPI data. It doesn't seem worth it 
to
-        * add an accessor macro for this.
+        * bimg_len indicating the length of FPI data.
         */
        *fpi_len = 0;
        for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
        {
                if (XLogRecHasBlockImage(record, block_id))
-                       *fpi_len += record->record->blocks[block_id].bimg_len;
+                       *fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
        }
 
        /*
@@ -552,7 +551,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                   blk);
                        if (XLogRecHasBlockImage(record, block_id))
                        {
-                               uint8           bimg_info = 
record->record->blocks[block_id].bimg_info;
+                               uint8           bimg_info = 
XLogRecGetBlock(record, block_id)->bimg_info;
 
                                if (BKPIMAGE_COMPRESSED(bimg_info))
                                {
@@ -569,11 +568,11 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                                   "compression saved: %u, 
method: %s",
                                                   
XLogRecBlockImageApply(record, block_id) ?
                                                   "" : " for WAL verification",
-                                                  
record->record->blocks[block_id].hole_offset,
-                                                  
record->record->blocks[block_id].hole_length,
+                                                  XLogRecGetBlock(record, 
block_id)->hole_offset,
+                                                  XLogRecGetBlock(record, 
block_id)->hole_length,
                                                   BLCKSZ -
-                                                  
record->record->blocks[block_id].hole_length -
-                                                  
record->record->blocks[block_id].bimg_len,
+                                                  XLogRecGetBlock(record, 
block_id)->hole_length -
+                                                  XLogRecGetBlock(record, 
block_id)->bimg_len,
                                                   method);
                                }
                                else
@@ -581,8 +580,8 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, 
XLogReaderState *record)
                                        printf(" (FPW%s); hole: offset: %u, 
length: %u",
                                                   
XLogRecBlockImageApply(record, block_id) ?
                                                   "" : " for WAL verification",
-                                                  
record->record->blocks[block_id].hole_offset,
-                                                  
record->record->blocks[block_id].hole_length);
+                                                  XLogRecGetBlock(record, 
block_id)->hole_offset,
+                                                  XLogRecGetBlock(record, 
block_id)->hole_length);
                                }
                        }
                        putchar('\n');
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 86a26a9231..8446050225 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -249,16 +249,16 @@ struct XLogReaderState
        char       *decode_buffer;
        size_t          decode_buffer_size;
        bool            free_decode_buffer; /* need to free? */
-       char       *decode_buffer_head; /* write head */
-       char       *decode_buffer_tail; /* read head */
+       char       *decode_buffer_head; /* data is read from the head */
+       char       *decode_buffer_tail; /* new data is written at the tail */
 
        /*
         * Queue of records that have been decoded.  This is a linked list that
         * usually consists of consecutive records in decode_buffer, but may 
also
         * contain oversized records allocated with palloc().
         */
-       DecodedXLogRecord *decode_queue_head;   /* newest decoded record */
-       DecodedXLogRecord *decode_queue_tail;   /* oldest decoded record */
+       DecodedXLogRecord *decode_queue_head;   /* oldest decoded record */
+       DecodedXLogRecord *decode_queue_tail;   /* newest decoded record */
 
        /*
         * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at 
least
@@ -350,7 +350,7 @@ extern XLogRecPtr XLogFindNextRecord(XLogReaderState 
*state, XLogRecPtr RecPtr);
 #endif                                                 /* FRONTEND */
 
 /* Return values from XLogPageReadCB. */
-typedef enum XLogPageReadResultResult
+typedef enum XLogPageReadResult
 {
        XLREAD_SUCCESS = 0,                     /* record is successfully read 
*/
        XLREAD_FAIL = -1,                       /* failed during reading a 
record */

Reply via email to