Michael Paquier <mich...@paquier.xyz> wrote:

> On Mon, Nov 11, 2019 at 04:25:56PM +0100, Antonin Houska wrote:
> >> On Fri, Oct 04, 2019 at 12:11:11PM +0200, Antonin Houska wrote:
> >> Your patch removes all the three optional lseek() calls which can
> >> happen in a segment.  Am I missing something but isn't that plain
> >> wrong?  You could reuse the error context for that as well if an error
> >> happens as what's needed is basically the segment name and the LSN
> >> offset.
> > 
> > Explicit call of lseek() is not used because XLogRead() uses pg_pread()
> > now. Nevertheless I found out that in the the last version of the patch I 
> > set
> > ws_off to 0 for a newly opened segment. This was wrong, fixed now.
> 
> Missed that part, thanks.  This was actually not obvious after an
> initial lookup of the patch.  Wouldn't it make sense to split that
> part in a separate patch that we could review and get committed first
> then?  It would have the advantage to make the rest easier to review
> and follow.  And using pread is actually better for performance
> compared to read+lseek.  Now there is also the argument that we don't
> always seek into an opened WAL segment, and that a plain read() is
> actually better than pread() in some cases.

ok, the next version uses explicit lseek(). Maybe the fact that XLOG is mostly
read sequentially (i.e. without frequent seeks) is the reason pread() has't
been adopted so far.

The new version reflects your other suggestions too, except the one about not
renaming "XLOG" -> "WAL" (actually you mentioned that earlier in the
thread). I recall that when working on the preliminary patch (709d003fbd),
Alvaro suggested "WAL" for some structures because these are new. The rule
seemed to be that "XLOG..." should be left for the existing symbols, while the
new ones should be "WAL...":

https://www.postgresql.org/message-id/20190917221521.GA15733%40alvherre.pgsql

So I decided to rename the new symbols and to remove the related comment.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

>From 34c0ae891de7dae3b84de33795c5d0521ccc0a88 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Tue, 12 Nov 2019 11:51:14 +0100
Subject: [PATCH 1/2] Use only xlogreader.c:XLogRead()

The implementations in xlogutils.c and walsender.c are just renamed now, to be
removed by the following diff.
---
 src/backend/access/transam/xlogreader.c | 121 ++++++++++++++++++++
 src/backend/access/transam/xlogutils.c  |  94 ++++++++++++++--
 src/backend/replication/walsender.c     | 143 +++++++++++++++++++++++-
 src/bin/pg_waldump/pg_waldump.c         |  63 ++++++++++-
 src/include/access/xlogreader.h         |  37 ++++++
 src/include/access/xlogutils.h          |   1 +
 6 files changed, 441 insertions(+), 18 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7f24f0cb95..006c6298c9 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -1015,6 +1018,124 @@ out:
 
 #endif							/* FRONTEND */
 
+/*
+ * Read 'count' bytes from WAL fetched from timeline 'tli' into 'buf',
+ * starting at location 'startptr'. 'seg' is the last segment used,
+ * 'openSegment' is a callback to open the next segment and 'segcxt' is
+ * additional segment info that does not fit into 'seg'.
+ *
+ * 'errinfo' should point to XLogReadError structure which will receive error
+ * details in case the read fails.
+ *
+ * Returns true if succeeded, false if failed.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+		 WALOpenSegment *seg, WALSegmentContext *segcxt,
+		 WALSegmentOpen openSegment, WALReadError *errinfo)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	while (nbytes > 0)
+	{
+		uint32		startoff;
+		int			segbytes;
+		int			readbytes;
+
+		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+		if (seg->ws_file < 0 ||
+			!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+			tli != seg->ws_tli)
+		{
+			XLogSegNo	nextSegNo;
+
+			/* Switch to another logfile segment */
+			if (seg->ws_file >= 0)
+				close(seg->ws_file);
+
+			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+
+			/* Open the next segment in the caller's way. */
+			openSegment(nextSegNo, segcxt, &tli, &seg->ws_file);
+
+			/* Update the current segment info. */
+			seg->ws_tli = tli;
+			seg->ws_segno = nextSegNo;
+			seg->ws_off = 0;
+		}
+
+		/* Need to seek in the file? */
+		if (seg->ws_off != startoff)
+		{
+			/*
+			 * Update ws_off unconditionally, it will be useful for error
+			 * message too.
+			 */
+			seg->ws_off = startoff;
+
+			if (lseek(seg->ws_file, (off_t) startoff, SEEK_SET) < 0)
+			{
+				errinfo->xlr_seek = true;
+				errinfo->xlr_errno = errno;
+				errinfo->xlr_req = 0;
+				errinfo->xlr_read = 0;
+				errinfo->xlr_seg = seg;
+				return false;
+			}
+		}
+
+		/* How many bytes are within this segment? */
+		if (nbytes > (segcxt->ws_segsize - seg->ws_off))
+			segbytes = segcxt->ws_segsize - seg->ws_off;
+		else
+			segbytes = nbytes;
+
+#ifndef FRONTEND
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+		/*
+		 * Failure to read the data does not necessarily imply non-zero errno.
+		 * Set it to zero so that caller can distinguish the failure that does
+		 * not affect errno.
+		 */
+		errno = 0;
+
+		readbytes = read(seg->ws_file, p, segbytes);
+
+#ifndef FRONTEND
+		pgstat_report_wait_end();
+#endif
+
+		if (readbytes <= 0)
+		{
+			errinfo->xlr_seek = false;
+			errinfo->xlr_errno = errno;
+			errinfo->xlr_req = segbytes;
+			errinfo->xlr_read = readbytes;
+			errinfo->xlr_seg = seg;
+			return false;
+		}
+
+		/* Update state for read */
+		recptr += readbytes;
+		nbytes -= readbytes;
+		p += readbytes;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 5f1e5ba75d..18e436f4fd 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  * frontend).  Probably these should be merged at some point.
  */
 static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-		 Size count)
+XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
+			Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -896,6 +896,37 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+static void
+wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				 TimeLineID *tli_p, int *file_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+	int			file;
+
+	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+	file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (file < 0)
+	{
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							path)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+
+	*file_p = file;
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -913,7 +944,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 {
 	XLogRecPtr	read_upto,
 				loc;
+	TimeLineID	tli;
 	int			count;
+	WALReadError errinfo;
 
 	loc = targetPagePtr + reqLen;
 
@@ -932,7 +965,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			read_upto = GetFlushRecPtr();
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-		state->seg.ws_tli = ThisTimeLineID;
+		tli = ThisTimeLineID;
 
 		/*
 		 * Check which timeline to get the record from.
@@ -982,14 +1015,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			read_upto = state->currTLIValidUntil;
 
 			/*
-			 * Setting ws_tli to our wanted record's TLI is slightly wrong;
-			 * the page might begin on an older timeline if it contains a
-			 * timeline switch, since its xlog segment will have been copied
-			 * from the prior timeline. This is pretty harmless though, as
-			 * nothing cares so long as the timeline doesn't go backwards.  We
-			 * should read the page header instead; FIXME someday.
+			 * Setting tli to our wanted record's TLI is slightly wrong; the
+			 * page might begin on an older timeline if it contains a timeline
+			 * switch, since its xlog segment will have been copied from the
+			 * prior timeline. This is pretty harmless though, as nothing
+			 * cares so long as the timeline doesn't go backwards.  We should
+			 * read the page header instead; FIXME someday.
 			 */
-			state->seg.ws_tli = state->currTLI;
+			tli = state->currTLI;
 
 			/* No need to wait on a historical timeline */
 			break;
@@ -1020,9 +1053,46 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * as 'count', read the whole page anyway. It's guaranteed to be
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
-	XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
-			 XLOG_BLCKSZ);
+	if (!XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
+				  &state->segcxt, wal_segment_open, &errinfo))
+		WALReadRaiseError(&errinfo);
 
 	/* number of valid bytes in the buffer */
 	return count;
 }
+
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * XLogRead().
+ */
+void
+WALReadRaiseError(WALReadError *errinfo)
+{
+	WALOpenSegment *seg = errinfo->xlr_seg;
+	char	   *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno);
+
+	if (errinfo->xlr_seek)
+	{
+		errno = errinfo->xlr_errno;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek in log segment %s to offset %u: %m",
+						fname, seg->ws_off)));
+	}
+	else if (errinfo->xlr_read < 0)
+	{
+		errno = errinfo->xlr_errno;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
+						fname, seg->ws_off, (Size) errinfo->xlr_req)));
+	}
+	else if (errinfo->xlr_read == 0)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+						fname, seg->ws_off, errinfo->xlr_read,
+						(Size) errinfo->xlr_req)));
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7f5671504f..92a539aa5c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,9 +248,13 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
+static void WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+							  TimeLineID *tli_p, int *file_p);
 
 
+static void XLogReadOld(WALSegmentContext *segcxt, char *buf,
+						XLogRecPtr startptr, Size count);
+
 /* Initialize walsender process before entering the main command loop */
 void
 InitWalSender(void)
@@ -766,6 +770,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 {
 	XLogRecPtr	flushptr;
 	int			count;
+	WALReadError errinfo;
+	XLogSegNo	segno;
 
 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -786,7 +792,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
 	/* now actually read the data, we know it's there */
-	XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+	if (!XLogRead(cur_page,
+				  targetPagePtr,
+				  XLOG_BLCKSZ,
+				  sendSeg->ws_tli,	/* Pass the current TLI because only
+									 * WalSndSegmentOpen controls whether new
+									 * TLI is needed. */
+				  sendSeg,
+				  sendCxt,
+				  WalSndSegmentOpen,
+				  &errinfo))
+		WALReadRaiseError(&errinfo);
+
+	/*
+	 * After reading into the buffer, check that what we read was valid. We do
+	 * this after reading, because even though the segment was present when we
+	 * opened it, it might get recycled or removed while we read it. The
+	 * read() succeeds in that case, but the data we tried to read might
+	 * already have been overwritten with new WAL records.
+	 */
+	XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
 
 	return count;
 }
@@ -2362,7 +2388,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2535,6 +2561,71 @@ retry:
 	}
 }
 
+/*
+ * Callback for XLogRead() to open the next segment.
+ */
+void
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				  TimeLineID *tli_p, int *file_p)
+{
+	char		path[MAXPGPATH];
+
+	/*-------
+	 * When reading from a historic timeline, and there is a timeline switch
+	 * within this segment, read from the WAL segment belonging to the new
+	 * timeline.
+	 *
+	 * For example, imagine that this server is currently on timeline 5, and
+	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+	 * 0/13002088. In pg_wal, we have these files:
+	 *
+	 * ...
+	 * 000000040000000000000012
+	 * 000000040000000000000013
+	 * 000000050000000000000013
+	 * 000000050000000000000014
+	 * ...
+	 *
+	 * In this situation, when requested to send the WAL from segment 0x13, on
+	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+	 * recovery prefers files from newer timelines, so if the segment was
+	 * restored from the archive on this server, the file belonging to the old
+	 * timeline, 000000040000000000000013, might not exist. Their contents are
+	 * equal up to the switchpoint, because at a timeline switch, the used
+	 * portion of the old segment is copied to the new file.  -------
+	 */
+	*tli_p = sendTimeLine;
+	if (sendTimeLineIsHistoric)
+	{
+		XLogSegNo	endSegNo;
+
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+		if (sendSeg->ws_segno == endSegNo)
+			*tli_p = sendTimeLineNextTLI;
+	}
+
+	XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+	*file_p = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+
+	if (*file_p < 0)
+	{
+		/*
+		 * If the file is not found, assume it's because the standby asked for
+		 * a too old WAL segment that has already been removed or recycled.
+		 */
+		if (errno == ENOENT)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("requested WAL segment %s has already been removed",
+							XLogFileNameP(*tli_p, nextSegNo))));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * Send out the WAL in its normal physical/stored form.
  *
@@ -2552,6 +2643,8 @@ XLogSendPhysical(void)
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
 	Size		nbytes;
+	XLogSegNo	segno;
+	WALReadError errinfo;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2767,7 +2860,49 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+	if (!XLogRead(&output_message.data[output_message.len],
+				  startptr,
+				  nbytes,
+				  sendSeg->ws_tli,	/* Pass the current TLI because only
+									 * WalSndSegmentOpen controls whether new
+									 * TLI is needed. */
+				  sendSeg,
+				  sendCxt,
+				  WalSndSegmentOpen,
+				  &errinfo))
+		WALReadRaiseError(&errinfo);
+
+	/* See logical_read_xlog_page(). */
+	XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+	CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+	/*
+	 * During recovery, the currently-open WAL file might be replaced with the
+	 * file of the same name retrieved from archive. So we always need to
+	 * check what we read was valid after reading into the buffer. If it's
+	 * invalid, we try to open and read the file again.
+	 */
+	if (am_cascading_walsender)
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+		bool		reload;
+
+		SpinLockAcquire(&walsnd->mutex);
+		reload = walsnd->needreload;
+		walsnd->needreload = false;
+		SpinLockRelease(&walsnd->mutex);
+
+		if (reload && sendSeg->ws_file >= 0)
+		{
+			close(sendSeg->ws_file);
+			sendSeg->ws_file = -1;
+
+			goto retry;
+		}
+	}
+
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 1524e5eb1e..c0c2590d56 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,6 +280,46 @@ identify_target_directory(char *directory, char *fname)
 	return NULL;				/* not reached */
 }
 
+static void
+WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+				   TimeLineID *tli_p, int *file_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		fname[MAXPGPATH];
+	int			tries;
+
+	XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
+
+	/*
+	 * In follow mode there is a short period of time after the server has
+	 * written the end of the previous file before the new file is available.
+	 * So we loop for 5 seconds looking for the file to appear before giving
+	 * up.
+	 */
+	for (tries = 0; tries < 10; tries++)
+	{
+		*file_p = open_file_in_directory(segcxt->ws_dir, fname);
+		if (*file_p >= 0)
+			break;
+		if (errno == ENOENT)
+		{
+			int			save_errno = errno;
+
+			/* File not there yet, try again */
+			pg_usleep(500 * 1000);
+
+			errno = save_errno;
+			continue;
+		}
+		/* Any other error, fall through and fail */
+		break;
+	}
+
+	if (*file_p < 0)
+		fatal_error("could not find file \"%s\": %s",
+					fname, strerror(errno));
+}
+
 /*
  * Read count bytes from a segment file in the specified directory, for the
  * given timeline, containing the specified record pointer; store the data in
@@ -411,6 +451,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 {
 	XLogDumpPrivate *private = state->private_data;
 	int			count = XLOG_BLCKSZ;
+	WALReadError errinfo;
 
 	if (private->endptr != InvalidXLogRecPtr)
 	{
@@ -425,8 +466,26 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		}
 	}
 
-	XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
-					 readBuff, count);
+	if (!XLogRead(readBuff, targetPagePtr, count, private->timeline,
+				  &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+	{
+		WALOpenSegment *seg = errinfo.xlr_seg;
+		char		fname[MAXPGPATH];
+
+		XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+					 state->segcxt.ws_segsize);
+
+		if (errinfo.xlr_seek)
+			fatal_error("could not seek in file %s to offset %u: %s",
+						fname, seg->ws_off, strerror(errinfo.xlr_errno));
+		else if (errinfo.xlr_errno != 0)
+			fatal_error("could not read in file %s, offset %u, length %zu: %s",
+						fname, seg->ws_off, (Size) errinfo.xlr_req,
+						strerror(errinfo.xlr_errno));
+		else
+			fatal_error("could not read in file %s, offset %u: length: %zu",
+						fname, seg->ws_off, (Size) errinfo.xlr_req);
+	}
 
 	return count;
 }
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 1bbee386e8..02d1514429 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -218,6 +218,26 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 extern void XLogReaderFree(XLogReaderState *state);
 
 /* Initialize supporting structures */
+/*
+ * Callback to open the specified WAL segment for reading.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can use
+ * it to return the TLI that it actually opened.
+ *
+ * "file_p" points to an address the segment file descriptor should be stored
+ * at.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef void (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+								TimeLineID *tli_p, int *file_p);
+
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
 
@@ -232,6 +252,23 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif							/* FRONTEND */
+/*
+ * Error information that both backend and frontend caller can process.
+ */
+typedef struct WALReadError
+{
+	bool		xlr_seek;		/* Error during lseek() ? */
+	int			xlr_errno;		/* errno set by the last read() / lseek() */
+	int			xlr_read;		/* Bytes read by the last read(). */
+	int			xlr_req;		/* Bytes requested to be read. */
+	WALOpenSegment *xlr_seg;	/* Segment we tried to read from. */
+} WALReadError;
+
+extern bool XLogRead(char *buf, XLogRecPtr startptr, Size count,
+					 TimeLineID tli, WALOpenSegment *seg,
+					 WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+					 WALReadError *errinfo);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 2df98e45b2..1ffc765c6a 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -54,4 +54,5 @@ extern int	read_local_xlog_page(XLogReaderState *state,
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
 
+void		WALReadRaiseError(WALReadError *errinfo);
 #endif
-- 
2.20.1

>From 323f34e6193b8682c3ff5ffa5a1f1bf95cb36aa0 Mon Sep 17 00:00:00 2001
From: Antonin Houska <a...@cybertec.at>
Date: Tue, 12 Nov 2019 11:51:14 +0100
Subject: [PATCH 2/2] Remove the old implemenations of XLogRead().

Done in a separate patch because the diff looks harder to read if one function
(XLogRead) is removed and another one (the WALSegmentOpen callback) is added
nearby at the same time (the addition and removal of code can get mixed in the
diff).
---
 src/backend/access/transam/xlogutils.c | 122 ----------------
 src/backend/replication/walsender.c    | 188 -------------------------
 src/bin/pg_waldump/pg_waldump.c        | 122 ----------------
 3 files changed, 432 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 18e436f4fd..98118f484e 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 	forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-			Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	/* state maintained across calls */
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static TimeLineID sendTLI = 0;
-	static uint32 sendOff = 0;
-
-	Assert(segsize == wal_segment_size);
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segsize);
-
-		/* Do we need to switch to a different xlog segment? */
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-			sendTLI != tli)
-		{
-			char		path[MAXPGPATH];
-
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, segsize);
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-
-			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-			if (sendFile < 0)
-			{
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									path)));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendOff = 0;
-			sendTLI = tli;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				char		path[MAXPGPATH];
-				int			save_errno = errno;
-
-				XLogFilePath(path, tli, sendSegNo, segsize);
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								path, startoff)));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segsize - startoff))
-			segbytes = segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendFile, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes <= 0)
-		{
-			char		path[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFilePath(path, tli, sendSegNo, segsize);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-							path, sendOff, (unsigned long) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 92a539aa5c..80ef5eb909 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -252,9 +252,6 @@ static void WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 							  TimeLineID *tli_p, int *file_p);
 
 
-static void XLogReadOld(WALSegmentContext *segcxt, char *buf,
-						XLogRecPtr startptr, Size count);
-
 /* Initialize walsender process before entering the main command loop */
 void
 InitWalSender(void)
@@ -2376,191 +2373,6 @@ WalSndKill(int code, Datum arg)
 	SpinLockRelease(&walsnd->mutex);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-	XLogSegNo	segno;
-
-retry:
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
-		if (sendSeg->ws_file < 0 ||
-			!XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
-		{
-			char		path[MAXPGPATH];
-
-			/* Switch to another logfile segment */
-			if (sendSeg->ws_file >= 0)
-				close(sendSeg->ws_file);
-
-			XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			/*-------
-			 * When reading from a historic timeline, and there is a timeline
-			 * switch within this segment, read from the WAL segment belonging
-			 * to the new timeline.
-			 *
-			 * For example, imagine that this server is currently on timeline
-			 * 5, and we're streaming timeline 4. The switch from timeline 4
-			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
-			 *
-			 * ...
-			 * 000000040000000000000012
-			 * 000000040000000000000013
-			 * 000000050000000000000013
-			 * 000000050000000000000014
-			 * ...
-			 *
-			 * In this situation, when requested to send the WAL from
-			 * segment 0x13, on timeline 4, we read the WAL from file
-			 * 000000050000000000000013. Archive recovery prefers files from
-			 * newer timelines, so if the segment was restored from the
-			 * archive on this server, the file belonging to the old timeline,
-			 * 000000040000000000000013, might not exist. Their contents are
-			 * equal up to the switchpoint, because at a timeline switch, the
-			 * used portion of the old segment is copied to the new file.
-			 *-------
-			 */
-			sendSeg->ws_tli = sendTimeLine;
-			if (sendTimeLineIsHistoric)
-			{
-				XLogSegNo	endSegNo;
-
-				XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-				if (sendSeg->ws_segno == endSegNo)
-					sendSeg->ws_tli = sendTimeLineNextTLI;
-			}
-
-			XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
-			sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-			if (sendSeg->ws_file < 0)
-			{
-				/*
-				 * If the file is not found, assume it's because the standby
-				 * asked for a too old WAL segment that has already been
-				 * removed or recycled.
-				 */
-				if (errno == ENOENT)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("requested WAL segment %s has already been removed",
-									XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
-				else
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not open file \"%s\": %m",
-									path)));
-			}
-			sendSeg->ws_off = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendSeg->ws_off != startoff)
-		{
-			if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not seek in log segment %s to offset %u: %m",
-								XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-								startoff)));
-			sendSeg->ws_off = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (segcxt->ws_segsize - startoff))
-			segbytes = segcxt->ws_segsize - startoff;
-		else
-			segbytes = nbytes;
-
-		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-		readbytes = read(sendSeg->ws_file, p, segbytes);
-		pgstat_report_wait_end();
-		if (readbytes < 0)
-		{
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, (Size) segbytes)));
-		}
-		else if (readbytes == 0)
-		{
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-							sendSeg->ws_off, readbytes, (Size) segbytes)));
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendSeg->ws_off += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-
-	/*
-	 * After reading into the buffer, check that what we read was valid. We do
-	 * this after reading, because even though the segment was present when we
-	 * opened it, it might get recycled or removed while we read it. The
-	 * read() succeeds in that case, but the data we tried to read might
-	 * already have been overwritten with new WAL records.
-	 */
-	XLByteToSeg(startptr, segno, segcxt->ws_segsize);
-	CheckXLogRemoved(segno, ThisTimeLineID);
-
-	/*
-	 * During recovery, the currently-open WAL file might be replaced with the
-	 * file of the same name retrieved from archive. So we always need to
-	 * check what we read was valid after reading into the buffer. If it's
-	 * invalid, we try to open and read the file again.
-	 */
-	if (am_cascading_walsender)
-	{
-		WalSnd	   *walsnd = MyWalSnd;
-		bool		reload;
-
-		SpinLockAcquire(&walsnd->mutex);
-		reload = walsnd->needreload;
-		walsnd->needreload = false;
-		SpinLockRelease(&walsnd->mutex);
-
-		if (reload && sendSeg->ws_file >= 0)
-		{
-			close(sendSeg->ws_file);
-			sendSeg->ws_file = -1;
-
-			goto retry;
-		}
-	}
-}
-
 /*
  * Callback for XLogRead() to open the next segment.
  */
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index c0c2590d56..f1908a8868 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -320,128 +320,6 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
 					fname, strerror(errno));
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-				 XLogRecPtr startptr, char *buf, Size count)
-{
-	char	   *p;
-	XLogRecPtr	recptr;
-	Size		nbytes;
-
-	static int	sendFile = -1;
-	static XLogSegNo sendSegNo = 0;
-	static uint32 sendOff = 0;
-
-	p = buf;
-	recptr = startptr;
-	nbytes = count;
-
-	while (nbytes > 0)
-	{
-		uint32		startoff;
-		int			segbytes;
-		int			readbytes;
-
-		startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-		{
-			char		fname[MAXFNAMELEN];
-			int			tries;
-
-			/* Switch to another logfile segment */
-			if (sendFile >= 0)
-				close(sendFile);
-
-			XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-			/*
-			 * In follow mode there is a short period of time after the server
-			 * has written the end of the previous file before the new file is
-			 * available. So we loop for 5 seconds looking for the file to
-			 * appear before giving up.
-			 */
-			for (tries = 0; tries < 10; tries++)
-			{
-				sendFile = open_file_in_directory(directory, fname);
-				if (sendFile >= 0)
-					break;
-				if (errno == ENOENT)
-				{
-					int			save_errno = errno;
-
-					/* File not there yet, try again */
-					pg_usleep(500 * 1000);
-
-					errno = save_errno;
-					continue;
-				}
-				/* Any other error, fall through and fail */
-				break;
-			}
-
-			if (sendFile < 0)
-				fatal_error("could not find file \"%s\": %s",
-							fname, strerror(errno));
-			sendOff = 0;
-		}
-
-		/* Need to seek in the file? */
-		if (sendOff != startoff)
-		{
-			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-			{
-				int			err = errno;
-				char		fname[MAXPGPATH];
-
-				XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-				fatal_error("could not seek in log file %s to offset %u: %s",
-							fname, startoff, strerror(err));
-			}
-			sendOff = startoff;
-		}
-
-		/* How many bytes are within this segment? */
-		if (nbytes > (WalSegSz - startoff))
-			segbytes = WalSegSz - startoff;
-		else
-			segbytes = nbytes;
-
-		readbytes = read(sendFile, p, segbytes);
-		if (readbytes <= 0)
-		{
-			int			err = errno;
-			char		fname[MAXPGPATH];
-			int			save_errno = errno;
-
-			XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-			errno = save_errno;
-
-			if (readbytes < 0)
-				fatal_error("could not read from log file %s, offset %u, length %d: %s",
-							fname, sendOff, segbytes, strerror(err));
-			else if (readbytes == 0)
-				fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-							fname, sendOff, readbytes, (Size) segbytes);
-		}
-
-		/* Update state for read */
-		recptr += readbytes;
-
-		sendOff += readbytes;
-		nbytes -= readbytes;
-		p += readbytes;
-	}
-}
-
 /*
  * XLogReader read_page callback
  */
-- 
2.20.1

Reply via email to