On 2019-Sep-23, Alvaro Herrera wrote:

> I spent a couple of hours on this patchset today.  I merged 0001 and
> 0002, and decided the result was still messier than I would have liked,
> so I played with it a bit more -- see attached.


-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477709bbc2..0635a17eae 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1386,8 +1386,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
 
-	xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
-									NULL);
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									&read_local_xlog_page, NULL);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 501f46fd52..6c69eb6dd7 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -885,8 +885,7 @@ static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-						 TimeLineID *readTLI);
+						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -1195,7 +1194,8 @@ XLogInsertRecord(XLogRecData *rdata,
 			appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
 
 		if (!debug_reader)
-			debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
+											  NULL, NULL);
 
 		if (!debug_reader)
 		{
@@ -4296,7 +4296,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 			XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
 			offset = XLogSegmentOffset(xlogreader->latestPagePtr,
 									   wal_segment_size);
-			XLogFileName(fname, xlogreader->readPageTLI, segno,
+			XLogFileName(fname, xlogreader->seg.ws_tli, segno,
 						 wal_segment_size);
 			ereport(emode_for_corrupt_record(emode,
 											 RecPtr ? RecPtr : EndRecPtr),
@@ -6353,7 +6353,8 @@ StartupXLOG(void)
 
 	/* Set up XLOG reader facility */
 	MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-	xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									&XLogPageRead, &private);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -7355,7 +7356,7 @@ StartupXLOG(void)
 	 * and we were reading the old WAL from a segment belonging to a higher
 	 * timeline.
 	 */
-	EndOfLogTLI = xlogreader->readPageTLI;
+	EndOfLogTLI = xlogreader->seg.ws_tli;
 
 	/*
 	 * Complain if we did not roll forward far enough to render the backup
@@ -11523,7 +11524,7 @@ CancelBackup(void)
  */
 static int
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
-			 XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+			 XLogRecPtr targetRecPtr, char *readBuf)
 {
 	XLogPageReadPrivate *private =
 	(XLogPageReadPrivate *) xlogreader->private_data;
@@ -11640,7 +11641,7 @@ retry:
 	Assert(targetPageOff == readOff);
 	Assert(reqLen <= readLen);
 
-	*readTLI = curFileTLI;
+	xlogreader->seg.ws_tli = curFileTLI;
 
 	/*
 	 * Check the page header immediately, so that we can retry immediately if
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a66e3324b1..27c27303d6 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -68,8 +68,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  * Returns NULL if the xlogreader couldn't be allocated.
  */
 XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
-				   void *private_data)
+XLogReaderAllocate(int wal_segment_size, const char *waldir,
+				   XLogPageReadCB pagereadfunc, void *private_data)
 {
 	XLogReaderState *state;
 
@@ -96,7 +96,10 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
 		return NULL;
 	}
 
-	state->wal_segment_size = wal_segment_size;
+	/* Initialize segment info. */
+	WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
+					   waldir);
+
 	state->read_page = pagereadfunc;
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
@@ -198,6 +201,23 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 	return true;
 }
 
+/*
+ * Initialize the passed segment structs.
+ */
+void
+WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+				   int segsize, const char *waldir)
+{
+	seg->ws_file = -1;
+	seg->ws_segno = 0;
+	seg->ws_off = 0;
+	seg->ws_tli = 0;
+
+	segcxt->ws_segsize = segsize;
+	if (waldir)
+		snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
+}
+
 /*
  * Attempt to read an XLOG record.
  *
@@ -490,8 +510,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
 	{
 		/* Pretend it extends to end of segment */
-		state->EndRecPtr += state->wal_segment_size - 1;
-		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
+		state->EndRecPtr += state->segcxt.ws_segsize - 1;
+		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
 	}
 
 	if (DecodeXLogRecord(state, record, errormsg))
@@ -533,12 +553,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
 	Assert((pageptr % XLOG_BLCKSZ) == 0);
 
-	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
-	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+	XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+	targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
 
 	/* check whether we have all the requested data already */
-	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
-		reqLen <= state->readLen)
+	if (targetSegNo == state->seg.ws_segno &&
+		targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
 		return state->readLen;
 
 	/*
@@ -553,13 +573,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
 	 */
-	if (targetSegNo != state->readSegNo && targetPageOff != 0)
+	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
 	{
 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
 
 		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
 								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
+								   state->readBuf);
 		if (readLen < 0)
 			goto err;
 
@@ -577,7 +597,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 */
 	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
 							   state->currRecPtr,
-							   state->readBuf, &state->readPageTLI);
+							   state->readBuf);
 	if (readLen < 0)
 		goto err;
 
@@ -596,7 +616,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	{
 		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
 								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
+								   state->readBuf);
 		if (readLen < 0)
 			goto err;
 	}
@@ -608,8 +628,8 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 		goto err;
 
 	/* update read state information */
-	state->readSegNo = targetSegNo;
-	state->readOff = targetPageOff;
+	state->seg.ws_segno = targetSegNo;
+	state->seg.ws_off = targetPageOff;
 	state->readLen = readLen;
 
 	return readLen;
@@ -625,8 +645,8 @@ err:
 static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
-	state->readSegNo = 0;
-	state->readOff = 0;
+	state->seg.ws_segno = 0;
+	state->seg.ws_off = 0;
 	state->readLen = 0;
 }
 
@@ -745,16 +765,16 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 
 	Assert((recptr % XLOG_BLCKSZ) == 0);
 
-	XLByteToSeg(recptr, segno, state->wal_segment_size);
-	offset = XLogSegmentOffset(recptr, state->wal_segment_size);
+	XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
+	offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
-	XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
+	XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
 
 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
 							  "invalid magic number %04X in log segment %s, offset %u",
@@ -768,7 +788,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
 							  "invalid info bits %04X in log segment %s, offset %u",
@@ -791,7 +811,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 								  (unsigned long long) state->system_identifier);
 			return false;
 		}
-		else if (longhdr->xlp_seg_size != state->wal_segment_size)
+		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
 		{
 			report_invalid_record(state,
 								  "WAL file is from different database system: incorrect segment size in page header");
@@ -808,7 +828,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		/* hmm, first page of file doesn't have a long header? */
 		report_invalid_record(state,
@@ -828,7 +848,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 	{
 		char		fname[MAXFNAMELEN];
 
-		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 		report_invalid_record(state,
 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
@@ -853,7 +873,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
 		{
 			char		fname[MAXFNAMELEN];
 
-			XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
+			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
 
 			report_invalid_record(state,
 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
@@ -997,7 +1017,6 @@ out:
 
 #endif							/* FRONTEND */
 
-
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 1fc39333f1..d3967c64b2 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-	const XLogRecPtr lastReadPage = state->readSegNo *
-	state->wal_segment_size + state->readOff;
+	const XLogRecPtr lastReadPage = state->seg.ws_segno *
+	state->segcxt.ws_segsize + state->seg.ws_off;
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -847,8 +847,8 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	if (state->currTLIValidUntil != InvalidXLogRecPtr &&
 		state->currTLI != ThisTimeLineID &&
 		state->currTLI != 0 &&
-		((wantPage + wantLength) / state->wal_segment_size) <
-		(state->currTLIValidUntil / state->wal_segment_size))
+		((wantPage + wantLength) / state->segcxt.ws_segsize) <
+		(state->currTLIValidUntil / state->segcxt.ws_segsize))
 		return;
 
 	/*
@@ -869,12 +869,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 		 * by a promotion or replay from a cascaded replica.
 		 */
 		List	   *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+		XLogRecPtr	endOfSegment;
 
-		XLogRecPtr	endOfSegment = (((wantPage / state->wal_segment_size) + 1)
-									* state->wal_segment_size) - 1;
-
-		Assert(wantPage / state->wal_segment_size ==
-			   endOfSegment / state->wal_segment_size);
+		endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
+			state->segcxt.ws_segsize - 1;
+		Assert(wantPage / state->segcxt.ws_segsize ==
+			   endOfSegment / state->segcxt.ws_segsize);
 
 		/*
 		 * Find the timeline of the last LSN on the segment containing
@@ -909,12 +909,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  */
 int
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
-					 TimeLineID *pageTLI)
+					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
 	XLogRecPtr	read_upto,
 				loc;
 	int			count;
+	TimeLineID	pageTLI;
 
 	loc = targetPagePtr + reqLen;
 
@@ -934,7 +934,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-		*pageTLI = ThisTimeLineID;
+		pageTLI = ThisTimeLineID;
 
 		/*
 		 * Check which timeline to get the record from.
@@ -991,7 +991,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			 * nothing cares so long as the timeline doesn't go backwards.  We
 			 * should read the page header instead; FIXME someday.
 			 */
-			*pageTLI = state->currTLI;
+			pageTLI = state->currTLI;
 
 			/* No need to wait on a historical timeline */
 			break;
@@ -1022,7 +1022,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
+	state->seg.ws_tli = pageTLI;
+	XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
 			 XLOG_BLCKSZ);
 
 	/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f8b9020081..da265f5294 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -173,7 +173,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..d1cf80d441 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -116,10 +116,10 @@ check_permissions(void)
 
 int
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 {
 	return read_local_xlog_page(state, targetPagePtr, reqLen,
-								targetRecPtr, cur_page, pageTLI);
+								targetRecPtr, cur_page);
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23870a25a5..eb4a98cc91 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -128,16 +128,8 @@ bool		log_replication_commands = false;
  */
 bool		wake_wal_senders = false;
 
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int	sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static WALOpenSegment *sendSeg = NULL;
+static WALSegmentContext *sendCxt = NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -285,6 +277,13 @@ InitWalSender(void)
 
 	/* Initialize empty timestamp buffer for lag tracking. */
 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+	/* Make sure we can remember the current read position in XLOG. */
+	sendSeg = (WALOpenSegment *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
+	sendCxt = (WALSegmentContext *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
+	WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
 }
 
 /*
@@ -301,10 +300,10 @@ WalSndErrorCleanup(void)
 	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 
-	if (sendFile >= 0)
+	if (sendSeg->ws_file >= 0)
 	{
-		close(sendFile);
-		sendFile = -1;
+		close(sendSeg->ws_file);
+		sendSeg->ws_file = -1;
 	}
 
 	if (MyReplicationSlot != NULL)
@@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd)
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+					   XLogRecPtr targetRecPtr, char *cur_page)
 {
 	XLogRecPtr	flushptr;
 	int			count;
@@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
 
 	return count;
 }
@@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2382,17 +2381,18 @@ retry:
 		int			segbytes;
 		int			readbytes;
 
-		startoff = XLogSegmentOffset(recptr, wal_segment_size);
+		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
 
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
+		if (sendSeg->ws_file < 0 ||
+			!XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
 		{
 			char		path[MAXPGPATH];
 
 			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
+			if (sendSeg->ws_file >= 0)
+				close(sendSeg->ws_file);
 
-			XLByteToSeg(recptr, sendSegNo, wal_segment_size);
+			XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
 
 			/*-------
 			 * When reading from a historic timeline, and there is a timeline
@@ -2420,20 +2420,20 @@ retry:
 			 * used portion of the old segment is copied to the new file.
 			 *-------
 			 */
-			curFileTimeLine = sendTimeLine;
+			sendSeg->ws_tli = sendTimeLine;
 			if (sendTimeLineIsHistoric)
 			{
 				XLogSegNo	endSegNo;
 
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
-				if (sendSegNo == endSegNo)
-					curFileTimeLine = sendTimeLineNextTLI;
+				XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+				if (sendSeg->ws_segno == endSegNo)
+					sendSeg->ws_tli = sendTimeLineNextTLI;
 			}
 
-			XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
+			XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
 
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendFile < 0)
+			sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+			if (sendSeg->ws_file < 0)
 			{
 				/*
 				 * If the file is not found, assume it's because the standby
@@ -2444,58 +2444,58 @@ retry:
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(curFileTimeLine, sendSegNo))));
+									XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("could not open file \"%s\": %m",
 									path)));
 			}
-			sendOff = 0;
+			sendSeg->ws_off = 0;
 		}
 
 		/* Need to seek in the file? */
-		if (sendOff != startoff)
+		if (sendSeg->ws_off != startoff)
 		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+			if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
 				ereport(ERROR,
 						(errcode_for_file_access(),
 						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(curFileTimeLine, sendSegNo),
+								XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
 								startoff)));
-			sendOff = startoff;
+			sendSeg->ws_off = startoff;
 		}
 
 		/* How many bytes are within this segment? */
-		if (nbytes > (wal_segment_size - startoff))
-			segbytes = wal_segment_size - startoff;
+		if (nbytes > (segcxt->ws_segsize - startoff))
+			segbytes = segcxt->ws_segsize - startoff;
 		else
 			segbytes = nbytes;
 
 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
+		readbytes = read(sendSeg->ws_file, p, segbytes);
 		pgstat_report_wait_end();
 		if (readbytes < 0)
 		{
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, (Size) segbytes)));
+							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+							sendSeg->ws_off, (Size) segbytes)));
 		}
 		else if (readbytes == 0)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, readbytes, (Size) segbytes)));
+							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
+							sendSeg->ws_off, readbytes, (Size) segbytes)));
 		}
 
 		/* Update state for read */
 		recptr += readbytes;
 
-		sendOff += readbytes;
+		sendSeg->ws_off += readbytes;
 		nbytes -= readbytes;
 		p += readbytes;
 	}
@@ -2507,7 +2507,7 @@ retry:
 	 * read() succeeds in that case, but the data we tried to read might
 	 * already have been overwritten with new WAL records.
 	 */
-	XLByteToSeg(startptr, segno, wal_segment_size);
+	XLByteToSeg(startptr, segno, segcxt->ws_segsize);
 	CheckXLogRemoved(segno, ThisTimeLineID);
 
 	/*
@@ -2526,10 +2526,10 @@ retry:
 		walsnd->needreload = false;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (reload && sendFile >= 0)
+		if (reload && sendSeg->ws_file >= 0)
 		{
-			close(sendFile);
-			sendFile = -1;
+			close(sendSeg->ws_file);
+			sendSeg->ws_file = -1;
 
 			goto retry;
 		}
@@ -2695,9 +2695,9 @@ XLogSendPhysical(void)
 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
 	{
 		/* close the current file. */
-		if (sendFile >= 0)
-			close(sendFile);
-		sendFile = -1;
+		if (sendSeg->ws_file >= 0)
+			close(sendSeg->ws_file);
+		sendSeg->ws_file = -1;
 
 		/* Send CopyDone */
 		pq_putmessage_noblock('c', NULL, 0);
@@ -2768,7 +2768,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 63c3879ead..264a8f4db5 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -43,14 +43,12 @@ static char xlogfpath[MAXPGPATH];
 
 typedef struct XLogPageReadPrivate
 {
-	const char *datadir;
 	int			tliIndex;
 } XLogPageReadPrivate;
 
 static int	SimpleXLogPageRead(XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
-							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-							   TimeLineID *pageTLI);
+							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 
 /*
  * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -66,9 +64,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	char	   *errormsg;
 	XLogPageReadPrivate private;
 
-	private.datadir = datadir;
 	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
@@ -119,9 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	XLogPageReadPrivate private;
 	XLogRecPtr	endptr;
 
-	private.datadir = datadir;
 	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
@@ -177,9 +173,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 			forkptr += SizeOfXLogShortPHD;
 	}
 
-	private.datadir = datadir;
 	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
+	xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
 									&private);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
@@ -237,8 +232,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 /* XLogReader callback function, to read a WAL page */
 static int
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-				   TimeLineID *pageTLI)
+				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf)
 {
 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 	uint32		targetPageOff;
@@ -283,7 +277,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 		XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
 					 xlogreadsegno, WalSegSz);
 
-		snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+		snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s",
+				 xlogreader->segcxt.ws_dir, xlogfname);
 
 		xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
 
@@ -321,7 +316,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
 	Assert(targetSegNo == xlogreadsegno);
 
-	*pageTLI = targetHistory[private->tliIndex].tli;
+	xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli;
 	return XLOG_BLCKSZ;
 }
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b95d467805..b79208cd73 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -33,7 +33,6 @@ static int	WalSegSz;
 typedef struct XLogDumpPrivate
 {
 	TimeLineID	timeline;
-	char	   *inpath;
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	bool		endptr_reached;
@@ -224,7 +223,7 @@ search_directory(const char *directory, const char *fname)
 }
 
 /*
- * Identify the target directory and set WalSegSz.
+ * Identify the target directory.
  *
  * Try to find the file in several places:
  * if directory != NULL:
@@ -235,29 +234,22 @@ search_directory(const char *directory, const char *fname)
  *	 XLOGDIR /
  *	 $PGDATA / XLOGDIR /
  *
- * Set the valid target directory in private->inpath.
+ * The valid target directory is returned.
  */
-static void
-identify_target_directory(XLogDumpPrivate *private, char *directory,
-						  char *fname)
+static char *
+identify_target_directory(char *directory, char *fname)
 {
 	char		fpath[MAXPGPATH];
 
 	if (directory != NULL)
 	{
 		if (search_directory(directory, fname))
-		{
-			private->inpath = pg_strdup(directory);
-			return;
-		}
+			return pg_strdup(directory);
 
 		/* directory / XLOGDIR */
 		snprintf(fpath, MAXPGPATH, "%s/%s", directory, XLOGDIR);
 		if (search_directory(fpath, fname))
-		{
-			private->inpath = pg_strdup(fpath);
-			return;
-		}
+			return pg_strdup(fpath);
 	}
 	else
 	{
@@ -265,16 +257,10 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 
 		/* current directory */
 		if (search_directory(".", fname))
-		{
-			private->inpath = pg_strdup(".");
-			return;
-		}
+			return pg_strdup(".");
 		/* XLOGDIR */
 		if (search_directory(XLOGDIR, fname))
-		{
-			private->inpath = pg_strdup(XLOGDIR);
-			return;
-		}
+			return pg_strdup(XLOGDIR);
 
 		datadir = getenv("PGDATA");
 		/* $PGDATA / XLOGDIR */
@@ -282,10 +268,7 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 		{
 			snprintf(fpath, MAXPGPATH, "%s/%s", datadir, XLOGDIR);
 			if (search_directory(fpath, fname))
-			{
-				private->inpath = pg_strdup(fpath);
-				return;
-			}
+				return pg_strdup(fpath);
 		}
 	}
 
@@ -294,6 +277,8 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 		fatal_error("could not locate WAL file \"%s\"", fname);
 	else
 		fatal_error("could not find any WAL file");
+
+	return NULL;				/* not reached */
 }
 
 /*
@@ -423,7 +408,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
  */
 static int
 XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-				 XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+				 XLogRecPtr targetPtr, char *readBuff)
 {
 	XLogDumpPrivate *private = state->private_data;
 	int			count = XLOG_BLCKSZ;
@@ -441,7 +426,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
+	XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
 					 readBuff, count);
 
 	return count;
@@ -820,6 +805,7 @@ main(int argc, char **argv)
 	XLogDumpStats stats;
 	XLogRecord *record;
 	XLogRecPtr	first_record;
+	char	   *waldir = NULL;
 	char	   *errormsg;
 
 	static struct option long_options[] = {
@@ -912,7 +898,7 @@ main(int argc, char **argv)
 				}
 				break;
 			case 'p':
-				private.inpath = pg_strdup(optarg);
+				waldir = pg_strdup(optarg);
 				break;
 			case 'r':
 				{
@@ -994,13 +980,13 @@ main(int argc, char **argv)
 		goto bad_argument;
 	}
 
-	if (private.inpath != NULL)
+	if (waldir != NULL)
 	{
 		/* validate path points to directory */
-		if (!verify_directory(private.inpath))
+		if (!verify_directory(waldir))
 		{
 			pg_log_error("path \"%s\" could not be opened: %s",
-						 private.inpath, strerror(errno));
+						 waldir, strerror(errno));
 			goto bad_argument;
 		}
 	}
@@ -1015,17 +1001,17 @@ main(int argc, char **argv)
 
 		split_path(argv[optind], &directory, &fname);
 
-		if (private.inpath == NULL && directory != NULL)
+		if (waldir == NULL && directory != NULL)
 		{
-			private.inpath = directory;
+			waldir = directory;
 
-			if (!verify_directory(private.inpath))
+			if (!verify_directory(waldir))
 				fatal_error("could not open directory \"%s\": %s",
-							private.inpath, strerror(errno));
+							waldir, strerror(errno));
 		}
 
-		identify_target_directory(&private, private.inpath, fname);
-		fd = open_file_in_directory(private.inpath, fname);
+		waldir = identify_target_directory(waldir, fname);
+		fd = open_file_in_directory(waldir, fname);
 		if (fd < 0)
 			fatal_error("could not open file \"%s\"", fname);
 		close(fd);
@@ -1056,7 +1042,7 @@ main(int argc, char **argv)
 			/* ignore directory, already have that */
 			split_path(argv[optind + 1], &directory, &fname);
 
-			fd = open_file_in_directory(private.inpath, fname);
+			fd = open_file_in_directory(waldir, fname);
 			if (fd < 0)
 				fatal_error("could not open file \"%s\"", fname);
 			close(fd);
@@ -1088,7 +1074,7 @@ main(int argc, char **argv)
 		}
 	}
 	else
-		identify_target_directory(&private, private.inpath, NULL);
+		waldir = identify_target_directory(waldir, NULL);
 
 	/* we don't know what to print */
 	if (XLogRecPtrIsInvalid(private.startptr))
@@ -1100,7 +1086,7 @@ main(int argc, char **argv)
 	/* done with argument parsing, do the actual work */
 
 	/* we have everything we need, start reading */
-	xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
+	xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
 										  &private);
 	if (!xlogreader_state)
 		fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 735b1bd2fd..c5604f6841 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -31,6 +31,22 @@
 
 #include "access/xlogrecord.h"
 
+/* WALOpenSegment represents a WAL segment being read. */
+typedef struct WALOpenSegment
+{
+	int			ws_file;		/* segment file descriptor */
+	XLogSegNo	ws_segno;		/* segment number */
+	uint32		ws_off;			/* offset in the segment */
+	TimeLineID	ws_tli;			/* timeline ID of the currently open file */
+} WALOpenSegment;
+
+/* WALSegmentContext carries context information about WAL segments to read */
+typedef struct WALSegmentContext
+{
+	char		ws_dir[MAXPGPATH];
+	int			ws_segsize;
+} WALSegmentContext;
+
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
@@ -38,8 +54,7 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
-							   char *readBuf,
-							   TimeLineID *pageTLI);
+							   char *readBuf);
 
 typedef struct
 {
@@ -77,11 +92,6 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/*
-	 * Segment size of the to-be-parsed data (mandatory).
-	 */
-	int			wal_segment_size;
-
 	/*
 	 * Data input callback (mandatory).
 	 *
@@ -99,9 +109,8 @@ struct XLogReaderState
 	 * actual WAL record it's interested in.  In that case, targetRecPtr can
 	 * be used to determine which timeline to read the page from.
 	 *
-	 * The callback shall set *pageTLI to the TLI of the file the page was
-	 * read from.  It is currently used only for error reporting purposes, to
-	 * reconstruct the name of the WAL file where an error occurred.
+	 * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+	 * read from.
 	 */
 	XLogPageReadCB read_page;
 
@@ -156,10 +165,9 @@ struct XLogReaderState
 	char	   *readBuf;
 	uint32		readLen;
 
-	/* last read segment, segment offset, TLI for data currently in readBuf */
-	XLogSegNo	readSegNo;
-	uint32		readOff;
-	TimeLineID	readPageTLI;
+	/* last read XLOG position for data currently in readBuf */
+	WALSegmentContext segcxt;
+	WALOpenSegment seg;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -202,12 +210,17 @@ struct XLogReaderState
 
 /* Get a new XLogReader */
 extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
+										   const char *waldir,
 										   XLogPageReadCB pagereadfunc,
 										   void *private_data);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
+/* Initialize supporting structures */
+extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+							   int segsize, const char *waldir);
+
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
 extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
 										 XLogRecPtr recptr, char **errormsg);
@@ -219,6 +232,7 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904..e85aa0bdc8 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -49,10 +49,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
-								 XLogRecPtr targetRecPtr, char *cur_page,
-								 TimeLineID *pageTLI);
+								 XLogRecPtr targetRecPtr, char *cur_page);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
-
 #endif
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e6..012096f183 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -14,6 +14,6 @@
 extern int	logical_read_local_xlog_page(XLogReaderState *state,
 										 XLogRecPtr targetPagePtr,
 										 int reqLen, XLogRecPtr targetRecPtr,
-										 char *cur_page, TimeLineID *pageTLI);
+										 char *cur_page);
 
 #endif

Reply via email to