While working on the instance encryption I found it annoying to apply
decyption of XLOG page to three different functions. Attached is a patch that
tries to merge them all into one function, XLogRead(). The existing
implementations differ in the way new segment is opened. So I added a pointer
to callback function as a new argument. This callback handles the specific
ways to determine segment file name and to open the file.

I can split the patch into multiple diffs to make detailed review easier, but
first I'd like to hear if anything is seriously wrong about this
design. Thanks.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index f9a4960f8a..444b5bf910 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1369,7 +1369,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
 	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 }
 
-
+static	XLogReadPos	*readPos = NULL;
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1386,8 +1386,17 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
 
-	xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
-									NULL);
+
+	/* First time through? */
+	if (readPos == NULL)
+		readPos = XLogReadInitPos();
+
+	/*
+	 * read_local_xlog_page() eventually calls XLogRead(), so pass the initial
+	 * position.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, read_local_xlog_page,
+									readPos);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 9196aa3aae..7d0fdfba87 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlogrecord.h"
 #include "access/xlog_internal.h"
@@ -26,6 +28,7 @@
 #include "replication/origin.h"
 
 #ifndef FRONTEND
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -1005,6 +1008,191 @@ out:
 
 #endif							/* FRONTEND */
 
+/*
+ * Initialize XLOG file position for callers of XLogRead().
+ */
+XLogReadPos *
+XLogReadInitPos(void)
+{
+	XLogReadPos *pos = (XLogReadPos *) palloc(sizeof(XLogReadPos));
+
+	pos->segFile = -1;
+	pos->segNo = 0;
+	pos->segOff = 0;
+	pos->tli = 0;
+	pos->dir = NULL;
+
+	return pos;
+}
+
+#ifdef FRONTEND
+/*
+ * Currently only pg_waldump.c is supposed to set these variables.
+ */
+const char *progname;
+int	WalSegSz;
+
+/*
+ * This is a front-end counterpart of XLogFileNameP.
+ */
+static char *
+XLogFileNameFE(TimeLineID tli, XLogSegNo segno)
+{
+	char	   *result = palloc(MAXFNAMELEN);
+
+	XLogFileName(result, tli, segno, WalSegSz);
+	return result;
+}
+
+/*
+ * XXX pg_waldump.c needs this function. Is there a smart way to put it into
+ * src/common?
+ */
+static void fatal_error(const char *fmt,...) pg_attribute_printf(1, 2);
+
+static void
+fatal_error(const char *fmt,...)
+{
+	va_list		args;
+
+	fflush(stdout);
+
+	fprintf(stderr, _("%s: FATAL:  "), progname);
+	va_start(args, fmt);
+	vfprintf(stderr, _(fmt), args);
+	va_end(args);
+	fputc('\n', stderr);
+
+	exit(EXIT_FAILURE);
+}
+#endif	/* FRONTEND */
+
+/*
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'. If
+ * tli is passed, get the data from timeline *tli. 'pos' is the current
+ * position in the XLOG file and openSegment is a callback that opens the next
+ * segment for reading.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ *
+ * Will open, and keep open, one WAL segment stored in the global file
+ * descriptor sendFile. This means if XLogRead is used once, there will
+ * always be one descriptor left open until the process ends, but never
+ * more than one.
+ */
+void
+XLogRead(char *buf, XLogRecPtr startptr, Size count,
+		 TimeLineID *tli, XLogReadPos *pos, XLogOpenSegment openSegment)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	while (nbytes > 0)
+	{
+		uint32		startoff;
+		int			segbytes;
+		int			readbytes;
+		int	segsize;
+
+#ifndef FRONTEND
+		segsize = wal_segment_size;
+#else
+		segsize = WalSegSz;
+#endif
+
+		startoff = XLogSegmentOffset(recptr, segsize);
+
+		if (pos->segFile < 0 ||
+			!XLByteInSeg(recptr, pos->segNo, segsize) ||
+			(tli != NULL && *tli != pos->tli))
+		{
+			XLogSegNo	nextSegNo;
+
+			/* Switch to another logfile segment */
+			if (pos->segFile >= 0)
+				close(pos->segFile);
+
+			XLByteToSeg(recptr, nextSegNo, segsize);
+
+			/* Open the next segment in the caller's way. */
+			openSegment(nextSegNo, tli, pos);
+		}
+
+		/* Need to seek in the file? */
+		if (pos->segOff != startoff)
+		{
+			if (lseek(pos->segFile, (off_t) startoff, SEEK_SET) < 0)
+#ifndef FRONTEND
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not seek in log segment %s to offset %u: %m",
+								XLogFileNameP(pos->tli, pos->segNo),
+								startoff)));
+#else
+			fatal_error("could not seek in log segment %s to offset %u",
+						XLogFileNameFE(pos->tli, pos->segNo), startoff);
+#endif
+			pos->segOff = startoff;
+		}
+
+		/* How many bytes are within this segment? */
+		if (nbytes > (segsize - startoff))
+			segbytes = segsize - startoff;
+		else
+			segbytes = nbytes;
+
+#ifndef FRONTEND
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+		readbytes = read(pos->segFile, p, segbytes);
+
+#ifndef FRONTEND
+		pgstat_report_wait_end();
+#endif
+		if (readbytes < 0)
+		{
+#ifndef FRONTEND
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+							XLogFileNameP(pos->tli, pos->segNo), pos->segOff,
+							(Size) segbytes)));
+#else
+			fatal_error("could not read from log segment %s, offset %u, length %zu",
+						XLogFileNameFE(pos->tli, pos->segNo), pos->segOff,
+						(Size) segbytes);
+#endif
+		}
+		else if (readbytes == 0)
+		{
+#ifndef FRONTEND
+			ereport(ERROR,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+							XLogFileNameP(pos->tli, pos->segNo), pos->segOff,
+							readbytes, (Size) segbytes)));
+#else
+			fatal_error("could not read from log segment %s, offset %u: read %d of %zu",
+						XLogFileNameFE(pos->tli, pos->segNo), pos->segOff,
+						readbytes, (Size) segbytes);
+#endif
+		}
+
+		/* Update state for read */
+		recptr += readbytes;
+
+		pos->segOff += readbytes;
+		nbytes -= readbytes;
+		p += readbytes;
+	}
+}
 
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 10a663bae6..4f29c89c06 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 	forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-		 Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	/* state maintained across calls */
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static TimeLineID sendTLI = 0;
-	static uint32 sendOff = 0;
-
-	Assert(segsize == wal_segment_size);
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segsize);
-
-		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-			sendTLI != tli)
-		{
-			char		path[MAXPGPATH];
-
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, segsize);
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-			if (sendFile < 0)
-			{
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									path)));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-			sendTLI = tli;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-				int			save_errno = errno;
-
-				XLogFilePath(path, tli, sendSegNo, segsize);
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								path, startoff)));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segsize - startoff))
-			segbytes = segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
@@ -896,6 +774,37 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+static void
+read_local_xlog_page_open_segment(XLogSegNo segNo, TimeLineID *tli,
+								  XLogReadPos *pos)
+{
+	char	path[MAXPGPATH];
+
+	XLogFilePath(path, *tli, segNo, wal_segment_size);
+	pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (pos->segFile < 0)
+	{
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							path)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+
+	pos->segNo = segNo;
+	pos->segOff = 0;
+	pos->tli = *tli;
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -1017,14 +926,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		count = read_upto - targetPagePtr;
 	}
 
+	Assert(state->wal_segment_size == wal_segment_size);
+
 	/*
 	 * Even though we just determined how much of the page can be validly read
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
-			 XLOG_BLCKSZ);
-
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, pageTLI,
+			 (XLogReadPos *) state->private_data,
+			 read_local_xlog_page_open_segment);
 	/* number of valid bytes in the buffer */
 	return count;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 424fe86a1b..20c1ad4a35 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,6 +124,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   bool need_full_snapshot,
 					   bool fast_forward,
 					   XLogPageReadCB read_page,
+					   void *read_page_arg,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
 					   LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -172,14 +173,13 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, read_page,
+									 read_page_arg);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory")));
 
-	ctx->reader->private_data = ctx;
-
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
@@ -234,6 +234,7 @@ CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
+						  void *read_page_arg,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -330,8 +331,8 @@ CreateInitDecodingContext(char *plugin,
 
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
-								 read_page, prepare_write, do_write,
-								 update_progress);
+								 read_page, read_page_arg, prepare_write,
+								 do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -376,6 +377,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
 					  XLogPageReadCB read_page,
+					  void *read_page_arg,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -428,8 +430,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
-								 fast_forward, read_page, prepare_write,
-								 do_write, update_progress);
+								 fast_forward, read_page, read_page_arg,
+								 prepare_write, do_write, update_progress);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..b2f30d53f5 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -248,13 +248,20 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 	PG_TRY();
 	{
-		/* restart at slot's confirmed_flush */
+		/*
+		 * Restart at slot's confirmed_flush.
+		 *
+		 * logical_read_local_xlog_page() eventually calls XLogRead(), so set
+		 * the initial position.
+		 */
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									options,
 									false,
 									logical_read_local_xlog_page,
+									XLogReadInitPos(),
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite,
+									NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 182fe5bc82..dbcaa9c1d8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -14,6 +14,7 @@
 
 #include "access/htup_details.h"
 #include "access/xlog_internal.h"
+#include "access/xlogreader.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "replication/decode.h"
@@ -144,8 +145,9 @@ create_logical_replication_slot(char *name, char *plugin,
 	ctx = CreateInitDecodingContext(plugin, NIL,
 									false,	/* do not build snapshot */
 									restart_lsn,
-									logical_read_local_xlog_page, NULL, NULL,
-									NULL);
+									logical_read_local_xlog_page,
+									XLogReadInitPos(),
+									NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
@@ -401,11 +403,15 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		 * Create our decoding context in fast_forward mode, passing start_lsn
 		 * as InvalidXLogRecPtr, so that we start processing from my slot's
 		 * confirmed_flush.
+		 *
+		 * logical_read_local_xlog_page() eventually calls XLogRead(), so set
+		 * the initial position.
 		 */
 		ctx = CreateDecodingContext(InvalidXLogRecPtr,
 									NIL,
 									true,	/* fast_forward */
 									logical_read_local_xlog_page,
+									XLogReadInitPos(),
 									NULL, NULL, NULL);
 
 		/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aae6adc15c..56f9ae88b1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -128,16 +128,7 @@ bool		log_replication_commands = false;
  */
 bool		wake_wal_senders = false;
 
-/*
- * These variables are used similarly to openLogFile/SegNo/Off,
- * but for walsender to read the XLOG.
- */
-static int	sendFile = -1;
-static XLogSegNo sendSegNo = 0;
-static uint32 sendOff = 0;
-
-/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static XLogReadPos	*sendPos= NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -256,7 +247,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli,
+							  XLogReadPos *pos);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -285,6 +277,9 @@ InitWalSender(void)
 
 	/* Initialize empty timestamp buffer for lag tracking. */
 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
+
+	/* Make sure we can remember the current read position in XLOG. */
+	sendPos = XLogReadInitPos();
 }
 
 /*
@@ -301,10 +296,10 @@ WalSndErrorCleanup(void)
 	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 
-	if (sendFile >= 0)
+	if (sendPos && sendPos->segFile >= 0)
 	{
-		close(sendFile);
-		sendFile = -1;
+		close(sendPos->segFile);
+		sendPos->segFile = -1;
 	}
 
 	if (MyReplicationSlot != NULL)
@@ -787,7 +782,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, NULL, sendPos,
+			 WalSndOpenSegment);
 
 	return count;
 }
@@ -933,9 +929,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 			need_full_snapshot = true;
 		}
 
+		/*
+		 * logical_read_xlog_page() eventually calls XLogRead(), so pass the
+		 * initial position.
+		 */
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										InvalidXLogRecPtr,
-										logical_read_xlog_page,
+										logical_read_xlog_page, sendPos,
 										WalSndPrepareWrite, WalSndWriteData,
 										WalSndUpdateProgress);
 
@@ -1083,10 +1083,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * position.
 	 *
 	 * Do this before sending CopyBoth, so that any errors are reported early.
+	 *
+	 * logical_read_xlog_page() eventually calls XLogRead(), so pass the
+	 * initial position.
 	 */
 	logical_decoding_ctx =
 		CreateDecodingContext(cmd->startpoint, cmd->options, false,
-							  logical_read_xlog_page,
+							  logical_read_xlog_page, sendPos,
 							  WalSndPrepareWrite, WalSndWriteData,
 							  WalSndUpdateProgress);
 
@@ -2344,187 +2347,76 @@ WalSndKill(int code, Datum arg)
 }
 
 /*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
+ * Callback for XLogRead() to open the next segment.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+void
+WalSndOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos)
 {
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-	XLogSegNo	segno;
+	char		path[MAXPGPATH];
 
-retry:
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
+	/*
+	 * The timeline is determined below, caller should not do anything about
+	 * it.
+	 */
+	Assert(tli == NULL);
 
-	while (nbytes > 0)
+	/*-------
+	 * When reading from a historic timeline, and there is a timeline switch
+	 * within this segment, read from the WAL segment belonging to the new
+	 * timeline.
+	 *
+	 * For example, imagine that this server is currently on timeline 5, and
+	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+	 * 0/13002088. In pg_wal, we have these files:
+	 *
+	 * ...
+	 * 000000040000000000000012
+	 * 000000040000000000000013
+	 * 000000050000000000000013
+	 * 000000050000000000000014
+	 * ...
+	 *
+	 * In this situation, when requested to send the WAL from segment 0x13, on
+	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+	 * recovery prefers files from newer timelines, so if the segment was
+	 * restored from the archive on this server, the file belonging to the old
+	 * timeline, 000000040000000000000013, might not exist. Their contents are
+	 * equal up to the switchpoint, because at a timeline switch, the used
+	 * portion of the old segment is copied to the new file.  -------
+	 */
+	pos->tli = sendTimeLine;
+	if (sendTimeLineIsHistoric)
 	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, wal_segment_size);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
-		{
-			char		path[MAXPGPATH];
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
+		XLogSegNo	endSegNo;
 
-			XLByteToSeg(recptr, sendSegNo, wal_segment_size);
-
-			/*-------
-			 * When reading from a historic timeline, and there is a timeline
-			 * switch within this segment, read from the WAL segment belonging
-			 * to the new timeline.
-			 *
-			 * For example, imagine that this server is currently on timeline
-			 * 5, and we're streaming timeline 4. The switch from timeline 4
-			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
-			 *
-			 * ...
-			 * 000000040000000000000012
-			 * 000000040000000000000013
-			 * 000000050000000000000013
-			 * 000000050000000000000014
-			 * ...
-			 *
-			 * In this situation, when requested to send the WAL from
-			 * segment 0x13, on timeline 4, we read the WAL from file
-			 * 000000050000000000000013. Archive recovery prefers files from
-			 * newer timelines, so if the segment was restored from the
-			 * archive on this server, the file belonging to the old timeline,
-			 * 000000040000000000000013, might not exist. Their contents are
-			 * equal up to the switchpoint, because at a timeline switch, the
-			 * used portion of the old segment is copied to the new file.
-			 *-------
-			 */
-			curFileTimeLine = sendTimeLine;
-			if (sendTimeLineIsHistoric)
-			{
-				XLogSegNo	endSegNo;
-
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
-				if (sendSegNo == endSegNo)
-					curFileTimeLine = sendTimeLineNextTLI;
-			}
-
-			XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendFile < 0)
-			{
-				/*
-				 * If the file is not found, assume it's because the standby
-				 * asked for a too old WAL segment that has already been
-				 * removed or recycled.
-				 */
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(curFileTimeLine, sendSegNo))));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(curFileTimeLine, sendSegNo),
-								startoff)));
-			sendOff = startoff;
-		}
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
+		if (pos->segNo == endSegNo)
+			pos->tli = sendTimeLineNextTLI;
+	}
 
-		/* How many bytes are within this segment? */
-		if (nbytes > (wal_segment_size - startoff))
-			segbytes = wal_segment_size - startoff;
-		else
-			segbytes = nbytes;
+	XLogFilePath(path, pos->tli, segNo, wal_segment_size);
+	pos->segFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
 
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes < 0)
-		{
+	if (pos->segFile < 0)
+	{
+		/*
+		 * If the file is not found, assume it's because the standby asked for
+		 * a too old WAL segment that has already been removed or recycled.
+		 */
+		if (errno == ENOENT)
 			ereport(ERROR,
 					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, (Size) segbytes)));
-		}
-		else if (readbytes == 0)
-		{
+					 errmsg("requested WAL segment %s has already been removed",
+							XLogFileNameP(pos->tli, pos->segNo))));
+		else
 			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(curFileTimeLine, sendSegNo),
-							sendOff, readbytes, (Size) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
 	}
 
-	/*
-	 * After reading into the buffer, check that what we read was valid. We do
-	 * this after reading, because even though the segment was present when we
-	 * opened it, it might get recycled or removed while we read it. The
-	 * read() succeeds in that case, but the data we tried to read might
-	 * already have been overwritten with new WAL records.
-	 */
-	XLByteToSeg(startptr, segno, wal_segment_size);
-	CheckXLogRemoved(segno, ThisTimeLineID);
-
-	/*
-	 * During recovery, the currently-open WAL file might be replaced with the
-	 * file of the same name retrieved from archive. So we always need to
-	 * check what we read was valid after reading into the buffer. If it's
-	 * invalid, we try to open and read the file again.
-	 */
-	if (am_cascading_walsender)
-	{
-		WalSnd	   *walsnd = MyWalSnd;
-		bool		reload;
-
-		SpinLockAcquire(&walsnd->mutex);
-		reload = walsnd->needreload;
-		walsnd->needreload = false;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (reload && sendFile >= 0)
-		{
-			close(sendFile);
-			sendFile = -1;
-
-			goto retry;
-		}
-	}
+	pos->segNo = segNo;
+	pos->segOff = 0;
 }
 
 /*
@@ -2544,6 +2436,7 @@ XLogSendPhysical(void)
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	Size		nbytes;
+	XLogSegNo	segno;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2686,9 +2579,9 @@ XLogSendPhysical(void)
 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
 	{
 		/* close the current file. */
-		if (sendFile >= 0)
-			close(sendFile);
-		sendFile = -1;
+		if (sendPos->segFile >= 0)
+			close(sendPos->segFile);
+		sendPos->segFile = -1;
 
 		/* Send CopyDone */
 		pq_putmessage_noblock('c', NULL, 0);
@@ -2759,7 +2652,48 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes,
+			 NULL,				/* WalSndOpenSegment will determine TLI */
+			 sendPos,
+			 WalSndOpenSegment);
+
+	/*
+	 * After reading into the buffer, check that what we read was valid. We do
+	 * this after reading, because even though the segment was present when we
+	 * opened it, it might get recycled or removed while we read it. The
+	 * read() succeeds in that case, but the data we tried to read might
+	 * already have been overwritten with new WAL records.
+	 */
+	XLByteToSeg(startptr, segno, wal_segment_size);
+	CheckXLogRemoved(segno, ThisTimeLineID);
+
+	/*
+	 * During recovery, the currently-open WAL file might be replaced with the
+	 * file of the same name retrieved from archive. So we always need to
+	 * check what we read was valid after reading into the buffer. If it's
+	 * invalid, we try to open and read the file again.
+	 */
+	if (am_cascading_walsender)
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+		bool		reload;
+
+		SpinLockAcquire(&walsnd->mutex);
+		reload = walsnd->needreload;
+		walsnd->needreload = false;
+		SpinLockRelease(&walsnd->mutex);
+
+		if (reload && sendPos->segFile >= 0)
+		{
+			close(sendPos->segFile);
+			sendPos->segFile = -1;
+
+			goto retry;
+		}
+	}
+
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e106fb2ed1..7dd63dd735 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -26,9 +26,9 @@
 #include "rmgrdesc.h"
 
 
-static const char *progname;
+const char *progname;
 
-static int	WalSegSz;
+int	WalSegSz;
 
 typedef struct XLogDumpPrivate
 {
@@ -37,6 +37,7 @@ typedef struct XLogDumpPrivate
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	bool		endptr_reached;
+	XLogReadPos	*pos;
 } XLogDumpPrivate;
 
 typedef struct XLogDumpConfig
@@ -296,126 +297,45 @@ identify_target_directory(XLogDumpPrivate *private, char *directory,
 		fatal_error("could not find any WAL file");
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
 static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-				 XLogRecPtr startptr, char *buf, Size count)
+XLogDumpOpenSegment(XLogSegNo segNo, TimeLineID *tli, XLogReadPos *pos)
 {
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static uint32 sendOff = 0;
+	char		fname[MAXPGPATH];
+	int	tries;
 
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
+	XLogFileName(fname, *tli, segNo, WalSegSz);
 
-	while (nbytes > 0)
+	/*
+	 * In follow mode there is a short period of time after the server has
+	 * written the end of the previous file before the new file is
+	 * available. So we loop for 5 seconds looking for the file to appear
+	 * before giving up.
+	 */
+	for (tries = 0; tries < 10; tries++)
 	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-		{
-			char		fname[MAXFNAMELEN];
-			int			tries;
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-			/*
-			 * In follow mode there is a short period of time after the server
-			 * has written the end of the previous file before the new file is
-			 * available. So we loop for 5 seconds looking for the file to
-			 * appear before giving up.
-			 */
-			for (tries = 0; tries < 10; tries++)
-			{
-				sendFile = open_file_in_directory(directory, fname);
-				if (sendFile >= 0)
-					break;
-				if (errno == ENOENT)
-				{
-					int			save_errno = errno;
-
-					/* File not there yet, try again */
-					pg_usleep(500 * 1000);
-
-					errno = save_errno;
-					continue;
-				}
-				/* Any other error, fall through and fail */
-				break;
-			}
-
-			if (sendFile < 0)
-				fatal_error("could not find file \"%s\": %s",
-							fname, strerror(errno));
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				int			err = errno;
-				char		fname[MAXPGPATH];
-
-				XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-				fatal_error("could not seek in log file %s to offset %u: %s",
-							fname, startoff, strerror(err));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (WalSegSz - startoff))
-			segbytes = WalSegSz - startoff;
-		else
-			segbytes = nbytes;
-
-		readbytes = read(sendFile, p, segbytes);
-		if (readbytes <= 0)
+		pos->segFile = open_file_in_directory(pos->dir, fname);
+		if (pos->segFile >= 0)
+			break;
+		if (errno == ENOENT)
 		{
-			int			err = errno;
-			char		fname[MAXPGPATH];
 			int			save_errno = errno;
 
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-			errno = save_errno;
+			/* File not there yet, try again */
+			pg_usleep(500 * 1000);
 
-			if (readbytes < 0)
-				fatal_error("could not read from log file %s, offset %u, length %d: %s",
-							fname, sendOff, segbytes, strerror(err));
-			else if (readbytes == 0)
-				fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-							fname, sendOff, readbytes, (Size) segbytes);
+			errno = save_errno;
+			continue;
 		}
+		/* Any other error, fall through and fail */
+		break;
+	}
 
-		/* Update state for read */
-		recptr += readbytes;
+	if (pos->segFile < 0)
+		fatal_error("could not find file \"%s\": %s",
+					fname, strerror(errno));
 
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
+	pos->segNo = segNo;
+	pos->segOff = 0;
 }
 
 /*
@@ -441,8 +361,8 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
-					 readBuff, count);
+	XLogRead(readBuff, targetPagePtr, count, &private->timeline,
+			 private->pos, XLogDumpOpenSegment);
 
 	return count;
 }
@@ -852,6 +772,7 @@ main(int argc, char **argv)
 	private.startptr = InvalidXLogRecPtr;
 	private.endptr = InvalidXLogRecPtr;
 	private.endptr_reached = false;
+	private.pos = XLogReadInitPos();
 
 	config.bkp_details = false;
 	config.stop_after_records = -1;
@@ -1083,6 +1004,9 @@ main(int argc, char **argv)
 	else
 		identify_target_directory(&private, private.inpath, NULL);
 
+	/* The XLOG position can be used separate from "private". */
+	private.pos->dir = private.inpath;
+
 	/* we don't know what to print */
 	if (XLogRecPtrIsInvalid(private.startptr))
 	{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f3bae0bf49..9bddbd3042 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -219,6 +219,31 @@ extern void XLogReaderInvalReadState(XLogReaderState *state);
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
 
+/*
+ * Position in XLOG file while reading it.
+ */
+typedef struct XLogReadPos
+{
+	int	segFile;				/* segment file descriptor */
+	XLogSegNo	segNo;			/* segment number */
+	uint32 segOff;				/* offset in the segment */
+	TimeLineID tli;		/* timeline ID of the currently open file */
+
+	char	*dir;				/* directory (only needed by frontends) */
+} XLogReadPos;
+
+/*
+ * Callback to open the specified XLOG segment 'segNo' in timeline 'tli' for
+ * reading and update the position accordingly.
+ */
+typedef void (*XLogOpenSegment) (XLogSegNo segNo,  TimeLineID *tli,
+								 XLogReadPos *pos);
+
+extern XLogReadPos *XLogReadInitPos(void);
+extern void XLogRead(char *buf, XLogRecPtr startptr, Size count,
+					 TimeLineID *tli, XLogReadPos *pos,
+					 XLogOpenSegment openSegment);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 0a2a63a48c..59b29433eb 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,6 +99,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
 						  XLogPageReadCB read_page,
+														 void *read_page_arg,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress);
@@ -107,6 +108,7 @@ extern LogicalDecodingContext *CreateDecodingContext(
 					  List *output_plugin_options,
 					  bool fast_forward,
 					  XLogPageReadCB read_page,
+					  void *read_page_arg,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress);

Reply via email to