On 12/07/2019 10:10, Kyotaro Horiguchi wrote:
At Tue, 28 May 2019 04:45:24 -0700, Andres Freund <and...@anarazel.de> wrote in 
<20190528114524.dvj6ymap2virl...@alap3.anarazel.de>
Hi,

On 2019-04-18 21:02:57 +0900, Kyotaro HORIGUCHI wrote:
Hello. As mentioned before [1], read_page callback in
XLogReaderState is a cause of headaches. Adding another
remote-controlling stuff to xlog readers makes things messier [2].

I refactored XLOG reading functions so that we don't need the
callback. In short, ReadRecrod now calls XLogPageRead directly
with the attached patch set.

|     while (XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)
|            == XLREAD_NEED_DATA)
|         XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess);

On the other hand, XLogReadRecord became a bit complex. The patch
makes XLogReadRecord a state machine. I'm not confident that the
additional complexity is worth doing. Anyway I'll gegister this
to the next CF.

Just FYI, to me this doesn't clearly enough look like an improvement,
for a change of this size.

Thanks for the opiniton. I kinda agree about size but it is a
decision between "having multiple callbacks called under the
hood" vs "just calling a series of functions".  I think the
patched XlogReadRecord is easy to use in many situations.

It would be better if I could completely refactor the function
without the syntax tricks but I think the current patch is still
smaller and clearer than overhauling it.

I like the idea of refactoring XLogReadRecord() to not use a callback, and return a XLREAD_NEED_DATA value instead. It feels like a nicer, easier-to-use, interface, given that all the page-read functions need quite a bit of state and internal logic themselves. I remember that I felt that that would be a nicer interface when we originally extracted xlogreader.c into a reusable module, but I didn't want to make such big changes to XLogReadRecord() at that point.

I don't much like the "continuation" style of implementing the state machine. Nothing wrong with such a style in principle, but we don't do that anywhere else, and the macros seem like overkill, and turning the local variables static is pretty ugly. But I think XLogReadRecord() could be rewritten into a more traditional state machine.

I started hacking on that, to get an idea of what it would look like and came up with the attached patch, to be applied on top of all your patches. It's still very messy, it needs quite a lot of cleanup before it can be committed, but I think the resulting switch-case state machine in XLogReadRecord() is quite straightforward at high level, with four states.

I made some further changes to the XLogReadRecord() interface:

* If you pass a valid ReadPtr (i.e. the starting point to read from) argument to XLogReadRecord(), it always restarts reading from that record, even if it was in the middle of reading another record previously. (Perhaps it would be more convenient to provide a separate function to set the starting point, and remove the RecPtr argument from XLogReadRecord altogther?)

* XLogReaderState->readBuf is now allocated and controlled by the caller, not by xlogreader.c itself. When XLogReadRecord() needs data, the caller makes the data available in readBuf, which can point to the same buffer in all calls, or the caller may allocate a new buffer, or it may point to a part of a larger buffer, whatever is convenient for the caller. (Currently, all callers just allocate a BLCKSZ'd buffer, though). The caller also sets readPagPtr, readLen and readPageTLI to tell XLogReadRecord() what's in the buffer. So all these read* fields are now set by the caller, XLogReadRecord() only reads them.

* In your patch, if XLogReadRecord() was called with state->readLen == -1, XLogReadRecord() returned an error. That seemed a bit silly; if an error happened while reading the data, why call XLogReadRecord() at all? You could just report the error directly. So I removed that.

I'm not sure how intelligible this patch is in its current state. But I think the general idea is good. I plan to clean it up further next week, but feel free to work on it before that, either based on this patch or by starting afresh from your patch set.

- Heikki
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e7a086b71e8..8f07450f503 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1385,6 +1385,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
+	XLogRecPtr ptr;
 
 	xlogreader = XLogReaderAllocate(wal_segment_size);
 	if (!xlogreader)
@@ -1392,10 +1393,15 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
+	xlogreader->readBuf = palloc(XLOG_BLCKSZ);
 
-	while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) ==
+	ptr = lsn;
+	while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
 		   XLREAD_NEED_DATA)
+	{
 		read_local_xlog_page(xlogreader);
+		ptr = InvalidXLogRecPtr;
+	}
 
 	if (record == NULL)
 		ereport(ERROR,
@@ -1418,6 +1424,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
 	memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
 
+	pfree(xlogreader->readBuf);
 	XLogReaderFree(xlogreader);
 }
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 12336247ed5..7d19ad7e80b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4248,9 +4248,20 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	{
 		char	   *errormsg;
 
-		while (XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)
-			   == XLREAD_NEED_DATA)
-			XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess);
+		if (XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)
+			== XLREAD_NEED_DATA)
+		{
+			do
+			{
+				XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess);
+				if (xlogreader->readLen == -1)
+				{
+					record = NULL;
+					break;
+				}
+			} while (XLogReadRecord(xlogreader, InvalidXLogRecPtr, &record, &errormsg)
+					 == XLREAD_NEED_DATA);
+		}
 
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
@@ -6344,6 +6355,7 @@ StartupXLOG(void)
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
+	xlogreader->readBuf = palloc(XLOG_BLCKSZ);
 	xlogreader->system_identifier = ControlFile->system_identifier;
 
 	/*
@@ -7715,6 +7727,7 @@ StartupXLOG(void)
 		close(readFile);
 		readFile = -1;
 	}
+	pfree(xlogreader->readBuf);
 	XLogReaderFree(xlogreader);
 
 	/*
@@ -11512,7 +11525,7 @@ XLogPageRead(XLogReaderState *xlogreader,
 {
 	XLogRecPtr targetPagePtr	= xlogreader->loadPagePtr;
 	int reqLen					= xlogreader->loadLen;
-	XLogRecPtr targetRecPtr		= xlogreader->currRecPtr;
+	XLogRecPtr targetRecPtr		= xlogreader->ReadRecPtr;
 	char *readBuf				= xlogreader->readBuf;
 	TimeLineID *readTLI			= &xlogreader->readPageTLI;
 	uint32		targetPageOff;
@@ -11567,6 +11580,7 @@ retry:
 			readLen = 0;
 			readSource = 0;
 
+			xlogreader->readPagePtr = InvalidXLogRecPtr;
 			xlogreader->readLen = -1;
 			return;
 		}
@@ -11663,6 +11677,7 @@ retry:
 		goto next_record_is_invalid;
 	}
 
+	xlogreader->readPagePtr = targetPagePtr;
 	xlogreader->readLen = readLen;
 	return;
 
@@ -11679,6 +11694,7 @@ next_record_is_invalid:
 	if (StandbyMode)
 		goto retry;
 
+	xlogreader->readPagePtr = InvalidXLogRecPtr;
 	xlogreader->readLen = -1;
 	return;
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 0201b34e372..0837544bd2a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -30,93 +30,10 @@
 #include "utils/memutils.h"
 #endif
 
-/*
- * Use computed-goto-based state dispatch when computed gotos are available.
- * But use a separate symbol so that it's easy to adjust locally in this file
- * for development and testing.
- */
-#ifdef HAVE_COMPUTED_GOTO
-#define XLR_USE_COMPUTED_GOTO
-#endif							/* HAVE_COMPUTED_GOTO */
-
-/*
- * The state machine functions relies on static local variables. They cannot
- * be reentered after non-local exit using ereport/elog for consistency. The
- * assertion macros protect the functions from reenter after non-local exit.
- */
-#ifdef USE_ASSERT_CHECKING
-#define XLR_REENT_PROTECT_ENTER() \
-	do { Assert(!__xlr_running); __xlr_running = true; } while (0)
-#define XLR_REENT_PROTECT_LEAVE()\
-	do { __xlr_running = false; } while (0)
-#else
-#define XLR_REENT_PROTECT_ENTER()
-#define XLR_REENT_PROTECT_LEAVE()
-#endif
-
-/*
- * Macros for state dispatch.
- *
- * XLR_SWITCH - prologue code for state machine including switch itself.
- * XLR_CASE - labels the implementation of named state.
- * XLR_LEAVE - leave the function and return here at the next call.
- * XLR_RETURN - return from the function and set state to initial state.
- * XLR_END - just hides the closing brace if not in use.
- */
-#if defined(XLR_USE_COMPUTED_GOTO)
-#define XLR_SWITCH(name)										\
-	static bool __xlr_running PG_USED_FOR_ASSERTS_ONLY = false; \
-	static void *__xlr_init_state = &&name;								\
-	static void *__xlr_state = &&name;									\
-	do {																\
-		XLR_REENT_PROTECT_ENTER();										\
-		goto *__xlr_state;												\
-		XLR_CASE(name);													\
-	} while (0)
-#define XLR_CASE(name)		name:
-#define XLR_LEAVE(name, code)											\
-	do {																\
-		__xlr_state = (&&name);											\
-		XLR_REENT_PROTECT_LEAVE();										\
-		return (code);													\
-		XLR_CASE(name);													\
-	} while (0)
-#define XLR_RETURN(code)										\
-	do {														\
-		__xlr_state = __xlr_init_state;							\
-		XLR_REENT_PROTECT_LEAVE();								\
-		return (code);											\
-	} while (0)
-#define XLR_SWITCH_END()
-#else							/* !XLR_USE_COMPUTED_GOTO */
-#define XLR_SWITCH(name)												\
-	static bool __xlr_running = false PG_USED_FOR_ASSERTS_ONLY;			\
-	static int __xlr_init_state = name;									\
-	static int __xlr_state = name;										\
-	XLR_REENT_PROTECT_ENTER();											\
-	switch (__xlr_state) {												\
-	XLR_CASE(name)
-#define XLR_CASE(name)		case name:
-#define XLR_LEAVE(name, code)											\
-	do {																\
-		__xlr_state = (name);											\
-		XLR_REENT_PROTECT_LEAVE();										\
-		return (code);													\
-		XLR_CASE(name);													\
-	} while (0)
-#define XLR_RETURN(code)						\
-	do {										\
-		__xlr_state = __xlr_init_state;									\
-		XLR_REENT_PROTECT_LEAVE();										\
-		return (code);													\
-	} while (0)
-#define XLR_SWITCH_END()	}
-#endif							/* XLR_USE_COMPUTED_GOTO */
-
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 
 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+								  XLogRecPtr PrevRecPtr, XLogRecord *record);
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 							XLogRecPtr recptr);
 static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
@@ -162,28 +79,12 @@ XLogReaderAllocate(int wal_segment_size)
 
 	state->max_block_id = -1;
 
-	/*
-	 * Permanently allocate readBuf.  We do it this way, rather than just
-	 * making a static array, for two reasons: (1) no need to waste the
-	 * storage in most instantiations of the backend; (2) a static char array
-	 * isn't guaranteed to have any particular alignment, whereas
-	 * palloc_extended() will provide MAXALIGN'd storage.
-	 */
-	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
-											  MCXT_ALLOC_NO_OOM);
-	if (!state->readBuf)
-	{
-		pfree(state);
-		return NULL;
-	}
-
 	state->wal_segment_size = wal_segment_size;
 	/* All members are initialized to zeroes above */
 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
 										  MCXT_ALLOC_NO_OOM);
 	if (!state->errormsg_buf)
 	{
-		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
 	}
@@ -196,7 +97,6 @@ XLogReaderAllocate(int wal_segment_size)
 	if (!allocate_recordbuf(state, 0))
 	{
 		pfree(state->errormsg_buf);
-		pfree(state->readBuf);
 		pfree(state);
 		return NULL;
 	}
@@ -220,7 +120,6 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
-	pfree(state->readBuf);
 	pfree(state);
 }
 
@@ -281,319 +180,396 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
  * This function runs a state machine and may need to call several times until
  * a record is read.
  *
- * At the initial state, if called with valid pRecPtr, try to read a record at
- * that position.  If invalid pRecPtr is given try to read a record just after
+ * At the initial state, if called with valid RecPtr, try to read a record at
+ * that position.  If invalid RecPtr is given try to read a record just after
  * the last one previously read.
  *
  * When a record is successfully read, returns XLREAD_SUCCESS with result
- * record being stored in *record then the state machine is reset to initial
- * state.
+ * record being stored in *record.
  *
- * Returns XLREAD_NEED_DATA if needs more data fed.  In that case loadPagePtr
- * and loadLen in state is set to inform the required WAL data. The caller
- * shall read in the requested data into readBuf and set readLen and
- * readPageTLI to the length of the data actually read and the TLI for the
- * data read in respectively. In case of failure the caller shall call the
- * function setting readLen to -1 and storing error message in errormsg_buf to
- * inform error.
+ * Returns XLREAD_NEED_DATA if more data is needed to finish reading the current
+ * record.  In that case, state->loadPagePtr and state->loadLen are set to inform
+ * the caller the WAL position and minimum length of data needed. The caller
+ * shall read in the requested data and set state->readBuf to point to a buffer
+ * containing it. The caller must also set state->readPagePtr, state->readPageTLI,
+ * and state->readLen to indicate the starting position of the read data (which
+ * must equal loadPagePtr), the timeline that it was read from, and the length
+ * of data that is now available (which must be >= loadLen), respectively.
  *
- * If the reading fails for some reasons including caller-side error mentioned
- * above, returns XLREAD_FAIL with *record being set to NULL. *errormsg is set
- * to a string with details of the failure. The state machine is reset to
- * initial state.
+ * If invalid data is encountered, XLogReadRecord returns XLREAD_FAIL with *record
+ * being set to NULL. *errormsg is set to a string with details of the failure.
  *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  *
- * Note: This function is not reentrant. The state is maintained internally in
+ * Note: This function is not reentrant (FIXME: is that still true?).
+ * The state is maintained internally in
  * the function. DO NOT non-local exit (ereport) from inside of this function.
  */
-XLogReadRecordResult
-XLogReadRecord(XLogReaderState *state, XLogRecPtr pRecPtr,
-			   XLogRecord **record, char **errormsg)
+enum XLogReadRecordState
 {
-	/*
-	 * This function is a state machine that can exit and reenter at any place
-	 * marked as XLR_LEAVE. All internal state needs to be preserved through
-	 * multiple calls.
-	 */
-	static XLogRecPtr	targetPagePtr;
-	static bool			randAccess;
-	static uint32		len,
-						total_len;
-	static uint32		targetRecOff;
-	static uint32		pageHeaderSize;
-	static bool			gotheader;
-	static XLogRecPtr	RecPtr;
-
-	XLR_SWITCH(XLREAD_STATE_INIT);
-
-	RecPtr = pRecPtr;
-
-	/*
-	 * randAccess indicates whether to verify the previous-record pointer of
-	 * the record we're reading.  We only do this if we're reading
-	 * sequentially, which is what we initially assume.
+	/* between records */
+	XLREAD_NEXT_RECORD,
+	/* ReadRecPtr has been set to point to beginning of record, nothing has
+	 * been read yet.
 	 */
-	randAccess = false;
+	XLREAD_NEED_TOT_LEN,
+	/* We have read the length of the record (xl_tot_len), but that's all.
+	 * recordRemainLen has been set accordingly. */
+	XLREAD_NEED_FIRST_FRAGMENT,
+	/* We have read a record partially, but need to read one or more continuation
+	 * records to complete it. recordGotLen indicates how much we have
+	 * read already, and recordRemainLen indicates how much is left. readRecordBuf
+	 * holds the partially assembled record. recordContRecPtr points to the beginning
+	 * of the next page where to continue. */
+	XLREAD_NEED_CONTINUATION
+};
 
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+			   XLogRecord **record, char **errormsg)
+{
 	/* reset error state */
 	*errormsg = NULL;
 	state->errormsg_buf[0] = '\0';
 
-	ResetDecoder(state);
-
-	if (RecPtr == InvalidXLogRecPtr)
+	if (RecPtr != InvalidXLogRecPtr)
 	{
-		/* No explicit start point; read the record after the one we just read */
-		RecPtr = state->EndRecPtr;
-
-		if (state->ReadRecPtr == InvalidXLogRecPtr)
-			randAccess = true;
+		ResetDecoder(state);
 
-		/*
-		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
-		 * at a page boundary, no more records can fit on the current page. We
-		 * must skip over the page header, but we can't do that until we've
-		 * read in the page, since the header size is variable.
-		 */
-	}
-	else
-	{
 		/*
 		 * Caller supplied a position to start at.
 		 *
 		 * In this case, the passed-in record pointer should already be
 		 * pointing to a valid record starting position.
 		 */
-		Assert(XRecOffIsValid(RecPtr));
-		randAccess = true;
-	}
-
-	state->currRecPtr = RecPtr;
-
-	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
-	targetRecOff = RecPtr % XLOG_BLCKSZ;
+		state->ReadRecPtr = RecPtr;
 
-	/*
-	 * Read the page containing the record into state->readBuf. Request enough
-	 * byte to cover the whole record header, or at least the part of it that
-	 * fits on the same page.
-	 */
-	while (XLogNeedData(state, targetPagePtr,
-						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
-		XLR_LEAVE(XLREAD_STATE_PAGE, XLREAD_NEED_DATA);
+		/*
+		 * We cannot verify the previous-record pointer when we're seeking to a
+		 * particular record. Reset ReadRecPtr so that we won't try doing that.
+		 */
+		state->PrevRecPtr = InvalidXLogRecPtr;
 
-	if (state->readLen < 0)
-		goto err;
+		state->EndRecPtr = InvalidXLogRecPtr; 		/* to be tidy */
 
-	/*
-	 * We have loaded at least the page header, so we can examine it now.
-	 */
-	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-	if (targetRecOff == 0)
-	{
 		/*
-		 * At page start, so skip over page header.
+		 * Reset verifiedPagePtr to force reading the page again. (Not sure if this
+		 * is needed, but seems better to play it safe.)
 		 */
-		RecPtr += pageHeaderSize;
-		targetRecOff = pageHeaderSize;
-	}
-	else if (targetRecOff < pageHeaderSize)
-	{
-		report_invalid_record(state, "invalid record offset at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		goto err;
+		state->verifiedPagePtr = InvalidXLogRecPtr;
+		state->verifiedPageLen = 0;
+
+		state->readRecordState = XLREAD_NEED_TOT_LEN;
 	}
 
-	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-		targetRecOff == pageHeaderSize)
+again:
+	switch (state->readRecordState)
 	{
-		report_invalid_record(state, "contrecord is requested by %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		goto err;
-	}
+		case XLREAD_NEXT_RECORD:
+			ResetDecoder(state);
 
-	/* XLogNeedData has verified the page header */
-	Assert(pageHeaderSize <= state->readLen);
+			/*
+			 * Read the record after the one we just read. (Or the first record,
+			 * if this is the first call. In that case, EndRecPtr was set to the
+			 * desired starting point above.)
+			 */
 
-	/*
-	 * Read the record length.
-	 *
-	 * NB: Even though we use an XLogRecord pointer here, the whole record
-	 * header might not fit on this page. xl_tot_len is the first field of the
-	 * struct, so it must be on this page (the records are MAXALIGNed), but we
-	 * cannot access any other fields until we've verified that we got the
-	 * whole header.
-	 */
-	*record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = (*record)->xl_tot_len;
+			/*
+			 * EndRecPtr is pointing to end+1 of the previous WAL record.  If we're
+			 * at a page boundary, no more records can fit on the current page. We
+			 * must skip over the page header on the next page, but we can't do that until we've
+			 * read in the page, since the header size is variable.
+			 */
+			state->PrevRecPtr = state->ReadRecPtr;
+			state->ReadRecPtr = state->EndRecPtr;
 
-	/*
-	 * If the whole record header is on this page, validate it immediately.
-	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-	 * rest of the header after reading it from the next page.  The xl_tot_len
-	 * check is necessary here to ensure that we enter the "Need to reassemble
-	 * record" code path below; otherwise we might fail to apply
-	 * ValidXLogRecordHeader at all.
-	 */
-	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-	{
-		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, *record,
-								   randAccess))
-			goto err;
-		gotheader = true;
-	}
-	else
-	{
-		/* XXX: more validation should be done here */
-		if (total_len < SizeOfXLogRecord)
-		{
-			report_invalid_record(state,
-								  "invalid record length at %X/%X: wanted %u, got %u",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
-								  (uint32) SizeOfXLogRecord, total_len);
-			goto err;
-		}
-		gotheader = false;
-	}
+			state->readRecordState = XLREAD_NEED_TOT_LEN;
+			/* fall through */
 
-	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
-	{
-		/* Need to reassemble record */
-		static char	   *contdata;
-		static XLogPageHeader pageHeader;
-		static char	   *buffer;
-		static uint32	gotlen;
+		case XLREAD_NEED_TOT_LEN:
+			{
+				uint32		total_len;
+				uint32		pageHeaderSize;
+				XLogRecPtr	targetPagePtr;
+				uint32		targetRecOff;
 
-		/*
-		 * Enlarge readRecordBuf as needed.
-		 */
-		if (total_len > state->readRecordBufSize &&
-			!allocate_recordbuf(state, total_len))
-		{
-			/* We treat this as a "bogus data" condition */
-			report_invalid_record(state, "record length %u at %X/%X too long",
-								  total_len,
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-			goto err;
-		}
+				/*
+				 * Read the page containing the record into state->readBuf. Request enough
+				 * byte to cover the whole record header, or at least the part of it that
+				 * fits on the same page.
+				 */
+				targetPagePtr = state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+				targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
 
-		/* Copy the first fragment of the record from the first page. */
-		memcpy(state->readRecordBuf,
-			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
-		buffer = state->readRecordBuf + len;
-		gotlen = len;
+				if (XLogNeedData(state, targetPagePtr,
+								 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
+					return XLREAD_NEED_DATA;
 
-		do
-		{
-			/* Calculate pointer to beginning of next page */
-			targetPagePtr += XLOG_BLCKSZ;
+				if (state->readLen < 0)
+					goto err;
 
-			/* Wait for the next page to become available */
-			while (XLogNeedData(state, targetPagePtr,
-								Min(total_len - gotlen + SizeOfXLogShortPHD,
-									XLOG_BLCKSZ)))
-				XLR_LEAVE(XLREAD_STATE_CONTPAGE, XLREAD_NEED_DATA);
+				/*
+				 * We have loaded at least the page header, so we can examine it now.
+				 * (XLogNeedData() ensures that, even if we ask for fewer bytes)
+				 */
+				pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+				if (targetRecOff == 0)
+				{
+					/*
+					 * At page start, so skip over page header. In the XLogNeedData()
+					 * call above, we requested for SizeOfXLogRecord bytes, but that
+					 * was bogus because that didn't include the page header. So loop
+					 * back to check again if we have the actual record header in
+					 * the buffer already. (XXX: this is a bit silly. Perhaps
+					 * pass XLogNeedData() another argument to indicate if the requested
+					 * length includes the size of the page header or not.)
+					 */
+					state->ReadRecPtr += pageHeaderSize;
+					goto again;
+				}
+				else if (targetRecOff < pageHeaderSize)
+				{
+					report_invalid_record(state, "invalid record offset at %X/%X",
+										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+					goto err;
+				}
 
-			if (state->readLen < 0)
-				goto err;
+				if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+					targetRecOff == pageHeaderSize)
+				{
+					report_invalid_record(state, "contrecord is requested by %X/%X",
+										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+					goto err;
+				}
 
-			Assert(SizeOfXLogShortPHD <= state->readLen);
+				/* XLogNeedData has verified the page header */
+				Assert(pageHeaderSize <= state->readLen);
 
-			/* Check that the continuation on next page looks valid */
-			pageHeader = (XLogPageHeader) state->readBuf;
-			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
-			{
-				report_invalid_record(state,
-									  "there is no contrecord flag at %X/%X",
-									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-				goto err;
+				/*
+				 * Read the record length.
+				 *
+				 * NB: Even though we use an XLogRecord pointer here, the whole record
+				 * header might not fit on this page. xl_tot_len is the first field of the
+				 * struct, so it must be on this page (the records are MAXALIGNed), but we
+				 * cannot access any other fields until we've verified that we got the
+				 * whole header.
+				 */
+				*record = (XLogRecord *) (state->readBuf + state->ReadRecPtr % XLOG_BLCKSZ);
+				total_len = (*record)->xl_tot_len;
+
+				/*
+				 * If the whole record header is on this page, validate it immediately.
+				 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+				 * rest of the header after reading it from the next page.  The xl_tot_len
+				 * check is necessary here to ensure that we enter the "Need to reassemble
+				 * record" code path below; otherwise we might fail to apply
+				 * ValidXLogRecordHeader at all.
+				 */
+				if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+				{
+					if (!ValidXLogRecordHeader(state, state->ReadRecPtr, state->PrevRecPtr, *record))
+						goto err;
+				}
+				else
+				{
+					/* XXX: more validation should be done here */
+					if (total_len < SizeOfXLogRecord)
+					{
+						report_invalid_record(state,
+											  "invalid record length at %X/%X: wanted %u, got %u",
+											  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr,
+											  (uint32) SizeOfXLogRecord, total_len);
+						goto err;
+					}
+				}
+
+				/*
+				 * Wait for the rest of the record, or the part of the record that fit on the
+				 * first page if crossed a page boundary, to become available.
+				 */
+				state->recordGotLen = 0;
+				state->recordRemainLen = total_len;
+				state->readRecordState = XLREAD_NEED_FIRST_FRAGMENT;
 			}
+			/* fall through */
 
-			/*
-			 * Cross-check that xlp_rem_len agrees with how much of the record
-			 * we expect there to be left.
-			 */
-			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+		case XLREAD_NEED_FIRST_FRAGMENT:
 			{
-				report_invalid_record(state,
-									  "invalid contrecord length %u at %X/%X",
-									  pageHeader->xlp_rem_len,
-									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-				goto err;
-			}
+				uint32		total_len = state->recordRemainLen;
+				uint32		pageRemain;
+				uint32		len;
+				XLogRecPtr	targetPagePtr;
 
-			/* Append the continuation from this page to the buffer */
-			pageHeaderSize = XLogPageHeaderSize(pageHeader);
+				/* Wait for the rest of the record on the first page to become available */
+				targetPagePtr = state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
 
-			if (state->readLen < pageHeaderSize)
-			{
-				while (XLogNeedData(state, targetPagePtr, pageHeaderSize))
-					XLR_LEAVE(XLREAD_STATE_CONTPAGE_HEADER, XLREAD_NEED_DATA);
-			}
+				pageRemain = XLOG_BLCKSZ - state->ReadRecPtr % XLOG_BLCKSZ;
+				len = Min(pageRemain, total_len);
 
-			Assert(pageHeaderSize <= state->readLen);
+				if (XLogNeedData(state, targetPagePtr, len))
+					return XLREAD_NEED_DATA;
 
-			contdata = (char *) state->readBuf + pageHeaderSize;
-			len = XLOG_BLCKSZ - pageHeaderSize;
-			if (pageHeader->xlp_rem_len < len)
-				len = pageHeader->xlp_rem_len;
+				if (state->readLen < 0)
+					goto err;
 
-			if (state->readLen < pageHeaderSize + len)
-			{
-				if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len))
-					XLR_LEAVE(XLREAD_STATE_CONTRECORD, XLREAD_NEED_DATA);
+				if (state->recordRemainLen <= len)
+				{
+					/* Record does not cross a page boundary */
+					if (!ValidXLogRecord(state, *record, state->ReadRecPtr))
+						goto err;
+					/* We already checked the header earlier */
+					state->EndRecPtr = state->ReadRecPtr + MAXALIGN(total_len);
+					state->readRecordState = XLREAD_NEXT_RECORD;
+					*record = (XLogRecord *) (state->readBuf + state->ReadRecPtr % XLOG_BLCKSZ);
+					break;
+				}
+				else
+				{
+					/*
+					 * The record continues on the next page. Need to reassemble
+					 * record
+					 */
+					/* Enlarge readRecordBuf as needed. */
+					if (total_len > state->readRecordBufSize &&
+						!allocate_recordbuf(state, total_len))
+					{
+						/* We treat this as a "bogus data" condition */
+						report_invalid_record(state, "record length %u at %X/%X too long",
+											  total_len,
+											  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+						goto err;
+					}
+
+					/* Copy the first fragment of the record from the first page. */
+					memcpy(state->readRecordBuf,
+						   state->readBuf + state->ReadRecPtr % XLOG_BLCKSZ, len);
+					state->recordGotLen += len;
+					state->recordRemainLen -= len;
+
+					/* Calculate pointer to beginning of next page */
+					state->recordContRecPtr = state->ReadRecPtr + len;
+					Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
+
+					state->readRecordState = XLREAD_NEED_CONTINUATION;
+				}
 			}
+			/* fall through */
 
-			memcpy(buffer, (char *) contdata, len);
-			buffer += len;
-			gotlen += len;
-
-			/* If we just reassembled the record header, validate it. */
-			if (!gotheader)
+		case XLREAD_NEED_CONTINUATION:
 			{
-				*record = (XLogRecord *) state->readRecordBuf;
-				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
-										   *record, randAccess))
+				XLogRecPtr	targetPagePtr;
+				char	   *contdata;
+				XLogPageHeader pageHeader;
+				uint32		pageHeaderSize;
+				uint32		len;
+
+				/* Wait for the next page to become available */
+				targetPagePtr = state->recordContRecPtr;
+				if (XLogNeedData(state, targetPagePtr,
+								 Min(state->recordRemainLen + SizeOfXLogShortPHD,
+									 XLOG_BLCKSZ)))
+					return XLREAD_NEED_DATA;
+
+				if (state->readLen < 0)
 					goto err;
-				gotheader = true;
-			}
-		} while (gotlen < total_len);
 
-		Assert(gotheader);
+				Assert(SizeOfXLogShortPHD <= state->readLen);
 
-		*record = (XLogRecord *) state->readRecordBuf;
-		if (!ValidXLogRecord(state, *record, RecPtr))
-			goto err;
+				/* Check that the continuation on next page looks valid */
+				pageHeader = (XLogPageHeader) state->readBuf;
+				if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+				{
+					/* XXX: should this report recordContRecPtr rather than beginning of the
+					 * record? */
+					report_invalid_record(state,
+										  "there is no contrecord flag at %X/%X",
+										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+					goto err;
+				}
 
-		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-		state->ReadRecPtr = RecPtr;
-		state->EndRecPtr = targetPagePtr + pageHeaderSize
-			+ MAXALIGN(pageHeader->xlp_rem_len);
-	}
-	else
-	{
-		/* Wait for the record data to become available */
-		while (XLogNeedData(state, targetPagePtr,
-							Min(targetRecOff + total_len, XLOG_BLCKSZ)))
-			XLR_LEAVE(XLREAD_STATE_RECORD, XLREAD_NEED_DATA);
+				/*
+				 * Cross-check that xlp_rem_len agrees with how much of the record
+				 * we expect there to be left.
+				 */
+				if (pageHeader->xlp_rem_len == 0 ||
+					pageHeader->xlp_rem_len != state->recordRemainLen)
+				{
+					/* XXX: should this report recordContRecPtr rather than beginning of the
+					 * record? */
+					report_invalid_record(state,
+										  "invalid contrecord length %u at %X/%X",
+										  pageHeader->xlp_rem_len,
+										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+					goto err;
+				}
 
-		if (state->readLen < 0)
-			goto err;
+				/* Append the continuation from this page to the buffer */
+				pageHeaderSize = XLogPageHeaderSize(pageHeader);
+				/* XLogNeedData should have ensured that the whole page header was read */
+				Assert(pageHeaderSize <= state->readLen);
 
-		/* Record does not cross a page boundary */
-		if (!ValidXLogRecord(state, *record, RecPtr))
-			goto err;
+				contdata = (char *) state->readBuf + pageHeaderSize;
+				len = XLOG_BLCKSZ - pageHeaderSize;
+				if (pageHeader->xlp_rem_len < len)
+					len = pageHeader->xlp_rem_len;
+
+				if (state->readLen < pageHeaderSize + len)
+				{
+					/*
+					 * Read the rest of the page containing the record, if we
+					 * didn't get it already. (If we didn't get it yet, we'll
+					 * read the page header again on next invocation. In
+					 * practice, this should happen very rarely, assuming that
+					 * the caller makes the whole page available to us even
+					 * when we request just a part of it. XXX: This would also
+					 * become less ugly, if we passed a flag to XLogNeedData()
+					 * on the first call to tell it that the remaining length
+					 * includes the page header)
+					 */
+					if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len))
+						return XLREAD_NEED_DATA;
+
+					if (state->readLen < 0)
+						goto err;
+				}
 
-		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+				memcpy(state->readRecordBuf + state->recordGotLen,
+					   (char *) contdata, len);
+				state->recordGotLen += len;
+				state->recordRemainLen -= len;
 
-		state->ReadRecPtr = RecPtr;
-	}
+				/* If we just reassembled the record header, validate it. */
+				if (state->recordGotLen >= sizeof(XLogRecord) &&
+					state->recordGotLen - len < sizeof(XLogRecord))
+				{
+					*record = (XLogRecord *) state->readRecordBuf;
+					if (!ValidXLogRecordHeader(state, state->ReadRecPtr, state->PrevRecPtr,
+											   *record))
+						goto err;
+				}
+
+				if (state->recordRemainLen > 0)
+				{
+					/* Calculate pointer to beginning of next page, and continue */
+					state->recordContRecPtr = targetPagePtr + XLOG_BLCKSZ;
+					goto again;
+				}
+				else
+				{
+					*record = (XLogRecord *) state->readRecordBuf;
+					if (!ValidXLogRecord(state, *record, state->ReadRecPtr))
+						goto err;
+
+					pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+					state->EndRecPtr = targetPagePtr + pageHeaderSize
+						+ MAXALIGN(pageHeader->xlp_rem_len);
 
-	XLR_SWITCH_END();
+					state->readRecordState = XLREAD_NEXT_RECORD;
+					break;
+				}
+			}
+	}
 
 	/*
 	 * Special processing if it's an XLOG SWITCH record
@@ -606,11 +582,14 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr pRecPtr,
 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
 	}
 
+	state->loadPagePtr = InvalidXLogRecPtr;
+	state->loadLen = 0;
+
 	if (DecodeXLogRecord(state, *record, errormsg))
-		XLR_RETURN(XLREAD_SUCCESS);
+		return XLREAD_SUCCESS;
 
 	*record = NULL;
-	XLR_RETURN(XLREAD_FAIL);
+	return XLREAD_FAIL;
 
 err:
 
@@ -624,7 +603,9 @@ err:
 		*errormsg = state->errormsg_buf;
 
 	*record = NULL;
-	XLR_RETURN(XLREAD_FAIL);
+	state->loadPagePtr = InvalidXLogRecPtr;
+	state->loadLen = 0;
+	return XLREAD_FAIL;
 }
 
 /*
@@ -649,21 +630,58 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * marked as XLR_LEAVE. All internal state is preserved through multiple
 	 * calls.
 	 */
-	static uint32		targetPageOff;
-	static XLogSegNo	targetSegNo;
-	static XLogPageHeader hdr;
+	uint32		targetPageOff;
+	XLogSegNo	targetSegNo;
+	XLogPageHeader hdr;
+
+	/* check whether we have all the requested data already */
+	if (pageptr == state->verifiedPagePtr && reqLen <= state->verifiedPageLen)
+		return false;
+
+	/* data was not in our buffer. But perhaps the caller just placed it there, we
+	 * just haven't verified it? */
+	if (state->loadLen > 0 && state->readPagePtr == state->loadPagePtr && state->readLen >= state->loadLen)
+	{
+		Assert(reqLen >= SizeOfXLogShortPHD);
+
+		/* Do we have enough data to check the header length? */
+		hdr = (XLogPageHeader) state->readBuf;
+
+		/* still not enough */
+		if (state->readLen < XLogPageHeaderSize(hdr))
+		{
+			state->loadPagePtr = pageptr;
+			state->loadLen = XLogPageHeaderSize(hdr);
+			return true;
+		}
+
+		/*
+		 * Now that we know we have the full header, validate it.
+		 */
+		if (!XLogReaderValidatePageHeader(state, state->readPagePtr, (char *) state->readBuf))
+		{
+			XLogReaderInvalReadState(state);
+			state->readLen = -1;
 
-	XLR_SWITCH (XLND_STATE_INIT);
+			return false;
+		}
+
+		state->verifiedPagePtr = state->readPagePtr;
+		state->verifiedPageLen = state->readLen;
+		/* XXX: Assert that we read the first page in the segment first */
+		XLByteToSeg(state->verifiedPagePtr, state->verifiedSegNo, state->wal_segment_size);
+		state->loadPagePtr = InvalidXLogRecPtr;
+		state->loadLen = 0;
+
+		/* we might have the requested data now. */
+		if (pageptr == state->verifiedPagePtr && reqLen <= state->verifiedPageLen)
+			return false;
+	}
 
 	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
 	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
 	Assert((pageptr % XLOG_BLCKSZ) == 0);
 
-	/* check whether we have all the requested data already */
-	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
-		reqLen <= state->readLen)
-		XLR_RETURN(false);
-
 	/*
 	 * Data is not in our buffer.
 	 *
@@ -676,74 +694,26 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->readSegNo && targetPageOff != 0)
+	if (targetSegNo != state->verifiedSegNo && targetPageOff != 0)
 	{
 		state->loadPagePtr = pageptr - targetPageOff;
 		state->loadLen = XLOG_BLCKSZ;
-		XLR_LEAVE(XLND_STATE_SEGHEADER, true);
-
-		if (state->readLen < 0)
-			goto err;
-
-		/* we can be sure to have enough WAL available, we scrolled back */
-		Assert(state->readLen == XLOG_BLCKSZ);
-
-		if (!XLogReaderValidatePageHeader(state, state->loadPagePtr,
-										  state->readBuf))
-			goto err;
+		return true;
+		/* on the next invocation, we'll verify the page header on the first page.
+		 * Then we'll see that the targetSegNo now matches the readSegNo, and will
+		 * not come back here, but will request the actual target page.
+		 */
 	}
-
-	/*
-	 * First, read the requested data length, but at least a short page header
-	 * so that we can validate it.
-	 */
-	state->loadPagePtr = pageptr;
-	state->loadLen = Max(reqLen, SizeOfXLogShortPHD);
-	XLR_LEAVE(XLND_STATE_PAGEHEADER, true);
-
-	if (state->readLen < 0)
-		goto err;
-
-	Assert(state->readLen <= XLOG_BLCKSZ);
-
-	/* Do we have enough data to check the header length? */
-	if (state->readLen <= SizeOfXLogShortPHD)
-		goto err;
-
-	Assert(state->readLen >= state->loadLen);
-
-	hdr = (XLogPageHeader) state->readBuf;
-
-	/* still not enough */
-	if (state->readLen < XLogPageHeaderSize(hdr))
+	else
 	{
+		/*
+		 * First, read the requested data length, but at least a short page header
+		 * so that we can validate it.
+		 */
 		state->loadPagePtr = pageptr;
-		state->loadLen = XLogPageHeaderSize(hdr);
-		XLR_LEAVE(XLND_STATE_PAGELONGHEADER, true);
-
-		if (state->readLen < 0)
-			goto err;
+		state->loadLen = Max(reqLen, SizeOfXLogShortPHD);
+		return true;
 	}
-
-	XLR_SWITCH_END();
-
-	/*
-	 * Now that we know we have the full header, validate it.
-	 */
-	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) state->readBuf))
-			goto err;
-
-	/* update read state information */
-	state->readSegNo = targetSegNo;
-	state->readOff = targetPageOff;
-
-	XLR_RETURN(false);
-
-err:
-	XLogReaderInvalReadState(state);
-	state->readLen = -1;
-
-	XLR_RETURN(false);
 }
 
 /*
@@ -752,9 +722,10 @@ err:
 void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	state->loadPagePtr = InvalidXLogRecPtr;
+	state->loadLen = 0;
+	state->verifiedPagePtr = InvalidXLogRecPtr;
+	state->verifiedPageLen = 0;
 }
 
 /*
@@ -762,11 +733,12 @@ XLogReaderInvalReadState(XLogReaderState *state)
  *
  * This is just a convenience subroutine to avoid duplicated code in
  * XLogReadRecord.  It's not intended for use from anywhere else.
+ *
+ * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
  */
 static bool
 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-					  XLogRecPtr PrevRecPtr, XLogRecord *record,
-					  bool randAccess)
+					  XLogRecPtr PrevRecPtr, XLogRecord *record)
 {
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
@@ -784,7 +756,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) RecPtr);
 		return false;
 	}
-	if (randAccess)
+	if (PrevRecPtr == InvalidXLogRecPtr)
 	{
 		/*
 		 * We can't exactly verify the prev-link, but surely it should be less
@@ -1051,7 +1023,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 		targetPagePtr = tmpRecPtr - targetRecOff;
 
 		/* Read the page containing the record */
-		while(XLogNeedData(state, targetPagePtr, targetRecOff))
+		while (XLogNeedData(state, targetPagePtr, targetRecOff))
 			read_page(state, private);
 
 		if (state->readLen < 0)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fd461f16fce..d6c4b6cf730 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,7 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-	const XLogRecPtr lastReadPage = state->readSegNo *
-	state->wal_segment_size + state->readOff;
+	const XLogRecPtr lastReadPage = state->readPagePtr;
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -1011,6 +1010,7 @@ read_local_xlog_page(XLogReaderState *state)
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
+		state->readPagePtr = InvalidXLogRecPtr;
 		state->readLen = -1;
 		return;
 	}
@@ -1029,6 +1029,7 @@ read_local_xlog_page(XLogReaderState *state)
 			 XLOG_BLCKSZ);
 
 	/* number of valid bytes in the buffer */
+	state->readPagePtr = targetPagePtr;
 	state->readLen = count;
 	return;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3724cff0fcc..3afb8d2bcef 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -177,7 +177,7 @@ StartupDecodingContext(List *output_plugin_options,
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory")));
-
+	ctx->reader->readBuf = palloc(XLOG_BLCKSZ);
 	ctx->read_page = read_page;
 
 	ctx->reorder = ReorderBufferAllocate();
@@ -520,6 +520,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx)
 
 	ReorderBufferFree(ctx->reorder);
 	FreeSnapshotBuilder(ctx->snapshot_builder);
+	pfree(ctx->reader->readBuf);
 	XLogReaderFree(ctx->reader);
 	MemoryContextDelete(ctx->context);
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 2a3f6d3cde4..7825ec3d5ab 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -289,7 +289,10 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 			while (XLogReadRecord(ctx->reader, startptr, &record, &errm) ==
 				   XLREAD_NEED_DATA)
+			{
 				ctx->read_page(ctx);
+				startptr = InvalidXLogRecPtr;
+			}
 
 			if (errm)
 				elog(ERROR, "%s", errm);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5bf8646c809..b0a80ebd4c8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -784,6 +784,7 @@ logical_read_xlog_page(LogicalDecodingContext *ctx)
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
 	{
+		state->readPagePtr = InvalidXLogRecPtr;
 		state->readLen = -1;
 		return;
 	}
@@ -796,6 +797,7 @@ logical_read_xlog_page(LogicalDecodingContext *ctx)
 	/* now actually read the data, we know it's there */
 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
 
+	state->readPagePtr = targetPagePtr;
 	state->readLen = count;
 	return;
 }
@@ -2833,8 +2835,10 @@ XLogSendLogical(void)
 
 	while (XLogReadRecord(logical_decoding_ctx->reader,
 						  logical_startptr, &record, &errm) == XLREAD_NEED_DATA)
+	{
 		logical_decoding_ctx->read_page(logical_decoding_ctx);
-
+		logical_startptr = InvalidXLogRecPtr;
+	}
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 65420d0e4a4..ac79ce6373c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -60,18 +60,20 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
+	xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
 
 	do
 	{
 		while (XLogReadRecord(xlogreader, startpoint, &record, &errormsg) ==
 			   XLREAD_NEED_DATA)
+		{
 			SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+			startpoint = InvalidXLogRecPtr;
+		}
 
 		if (record == NULL)
 		{
-			XLogRecPtr	errptr;
-
-			errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+			XLogRecPtr	errptr = xlogreader->EndRecPtr;
 
 			if (errormsg)
 				pg_fatal("could not read WAL record at %X/%X: %s",
@@ -79,8 +81,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 						 errormsg);
 			else
 				pg_fatal("could not read WAL record at %X/%X",
-						 (uint32) (startpoint >> 32),
-						 (uint32) (startpoint));
+						 (uint32) (errptr >> 32),
+						 (uint32) (errptr));
 		}
 
 		extractPageInfo(xlogreader);
@@ -89,6 +91,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	} while (xlogreader->ReadRecPtr != endpoint);
 
+	pg_free(xlogreader->readBuf);
 	XLogReaderFree(xlogreader);
 	if (xlogreadfd != -1)
 	{
@@ -112,10 +115,14 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
+	xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
 
 	while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
 		   XLREAD_NEED_DATA)
+	{
 		SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+		ptr = InvalidXLogRecPtr;
+	}
 
 	if (record == NULL)
 	{
@@ -128,6 +135,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	}
 	endptr = xlogreader->EndRecPtr;
 
+	pg_free(xlogreader->readBuf);
 	XLogReaderFree(xlogreader);
 	if (xlogreadfd != -1)
 	{
@@ -169,15 +177,20 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
+	xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
 
 	searchptr = forkptr;
 	for (;;)
 	{
 		uint8		info;
+		XLogRecPtr	ptr = searchptr;
 
-		while (XLogReadRecord(xlogreader, searchptr, &record, &errormsg) ==
+		while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
 			   XLREAD_NEED_DATA)
+		{
 			SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+			ptr = InvalidXLogRecPtr;
+		}
 
 		if (record == NULL)
 		{
@@ -214,6 +227,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 		searchptr = record->xl_prev;
 	}
 
+	pg_free(xlogreader->readBuf);
 	XLogReaderFree(xlogreader);
 	if (xlogreadfd != -1)
 	{
@@ -279,6 +293,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader,
 		if (xlogreadfd < 0)
 		{
 			pg_log_error("could not open file \"%s\": %m", xlogfpath);
+			xlogreader->readPagePtr = InvalidXLogRecPtr;
 			xlogreader->readLen = -1;
 			return;
 		}
@@ -293,6 +308,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader,
 	if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
 	{
 		pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
+		xlogreader->readPagePtr = InvalidXLogRecPtr;
 		xlogreader->readLen = -1;
 		return;
 	}
@@ -307,6 +323,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader,
 			pg_log_error("could not read file \"%s\": read %d of %zu",
 						 xlogfpath, r, (Size) XLOG_BLCKSZ);
 
+		xlogreader->readPagePtr = InvalidXLogRecPtr;
 		xlogreader->readLen = -1;
 		return;
 	}
@@ -315,6 +332,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader,
 
 	*pageTLI = targetHistory[*tliIndex].tli;
 
+	xlogreader->readPagePtr = targetPagePtr;
 	xlogreader->readLen = XLOG_BLCKSZ;
 	return;
 }
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 54fff797fcb..183b214c30c 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -439,6 +439,7 @@ XLogDumpReadPage(XLogReaderState *state, void *priv)
 		else
 		{
 			private->endptr_reached = true;
+			state->readPagePtr = InvalidXLogRecPtr;
 			state->readLen = -1;
 			return;
 		}
@@ -447,6 +448,7 @@ XLogDumpReadPage(XLogReaderState *state, void *priv)
 	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
 					 readBuff, count);
 
+	state->readPagePtr = targetPagePtr;
 	state->readLen = count;
 	return;
 }
@@ -1107,6 +1109,7 @@ main(int argc, char **argv)
 	xlogreader_state = XLogReaderAllocate(WalSegSz);
 	if (!xlogreader_state)
 		fatal_error("out of memory");
+	xlogreader_state->readBuf = palloc(XLOG_BLCKSZ);
 
 	/* first find a valid recptr to start from */
 	first_record = XLogFindNextRecord(xlogreader_state, private.startptr,
@@ -1137,7 +1140,10 @@ main(int argc, char **argv)
 		while (XLogReadRecord(xlogreader_state,
 							  first_record, &record, &errormsg) ==
 			   XLREAD_NEED_DATA)
+		{
 			XLogDumpReadPage(xlogreader_state, (void *) &private);
+			first_record = InvalidXLogRecPtr;
+		}
 
 		if (!record)
 		{
@@ -1184,6 +1190,7 @@ main(int argc, char **argv)
 					(uint32) xlogreader_state->ReadRecPtr,
 					errormsg);
 
+	pfree(xlogreader_state->readBuf);
 	XLogReaderFree(xlogreader_state);
 
 	return EXIT_SUCCESS;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index bbf06039b91..e952c4ae204 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -112,8 +112,9 @@ struct XLogReaderState
 	 * Start and end point of last record read.  EndRecPtr is also used as the
 	 * position to read next, if XLogReadRecord receives an invalid recptr.
 	 */
-	XLogRecPtr	ReadRecPtr;		/* start of last record read */
+	XLogRecPtr	ReadRecPtr;		/* start of last record read or being read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
+	XLogRecPtr	PrevRecPtr;		/* start of previous record read */
 
 
 	/* ----------------------------------------
@@ -124,13 +125,13 @@ struct XLogReaderState
 	/* parameters to page reader */
 	XLogRecPtr	loadPagePtr;	/* Pointer to the page  */
 	int			loadLen;		/* wanted length in bytes */
-	char	   *readBuf;		/* buffer to store data */
-	XLogRecPtr	currRecPtr;		/* beginning of the WAL record being read */
 
 	/* return from page reader */
+	XLogRecPtr	readPagePtr;
 	int32		readLen;		/* bytes actually read, must be at least
 								 * loadLen. -1 on error. */
 	TimeLineID	readPageTLI;	/* TLI for data currently in readBuf */
+	char	   *readBuf;		/* buffer to store data */
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
@@ -158,8 +159,9 @@ struct XLogReaderState
 	 */
 
 	/* last read segment and segment offset for data currently in readBuf */
-	XLogSegNo	readSegNo;
-	uint32		readOff;
+	XLogRecPtr	verifiedPagePtr;
+	uint32		verifiedPageLen;
+	XLogSegNo	verifiedSegNo;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -194,6 +196,14 @@ struct XLogReaderState
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+	/*
+	 * XLogReadRecord() state
+	 */
+	int			readRecordState;		/* enum */
+	int			recordGotLen;			/* amount of current record that has already been read */
+	int			recordRemainLen;		/* length of current record that remains */
+	XLogRecPtr	recordContRecPtr;		/* where the current record continues */
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };

Reply via email to