I thought that the way to have debug output for this new WAL code is to
use WAL_DEBUG; that way it won't bother anyone and we can remove them
later if necessary.

Also, I realized that we should cause any error in the path that
assembles a record from contrecords is to set a flag that we can test
after the standard "err:" label; no need to create a new label.

I also wrote a lot more comments to try and explain what is going on and
why.

I'm still unsure about the two-flags reporting in xlogreader, so I put
that in a separate commit.  Opinions on that one?

The last commit is something I noticed in pg_rewind ...

-- 
Álvaro Herrera              Valdivia, Chile  —  https://www.EnterpriseDB.com/
"No hay ausente sin culpa ni presente sin disculpa" (Prov. francés)
>From 6abc5026f92b99d704bff527d1306eb8588635e9 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 31 Aug 2021 20:55:10 -0400
Subject: [PATCH v3 1/5] Revert "Avoid creating archive status ".ready" files
 too early"

This reverts commit 515e3d84a0b58b58eb30194209d2bc47ed349f5b.
---
 src/backend/access/transam/timeline.c    |   2 +-
 src/backend/access/transam/xlog.c        | 220 ++---------------------
 src/backend/access/transam/xlogarchive.c |  17 +-
 src/backend/postmaster/walwriter.c       |   7 -
 src/backend/replication/walreceiver.c    |   6 +-
 src/include/access/xlog.h                |   1 -
 src/include/access/xlogarchive.h         |   4 +-
 src/include/access/xlogdefs.h            |   1 -
 8 files changed, 24 insertions(+), 234 deletions(-)

diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c
index acd5c2431d..8d0903c175 100644
--- a/src/backend/access/transam/timeline.c
+++ b/src/backend/access/transam/timeline.c
@@ -452,7 +452,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 	if (XLogArchivingActive())
 	{
 		TLHistoryFileName(histfname, newTLI);
-		XLogArchiveNotify(histfname, true);
+		XLogArchiveNotify(histfname);
 	}
 }
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24165ab03e..e51a7a749d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -724,18 +724,6 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastFpwDisableRecPtr;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
-
-	/*
-	 * Variables used to track segment-boundary-crossing WAL records.  See
-	 * RegisterSegmentBoundary.  Protected by segtrack_lck.
-	 */
-	XLogSegNo	lastNotifiedSeg;
-	XLogSegNo	earliestSegBoundary;
-	XLogRecPtr	earliestSegBoundaryEndPtr;
-	XLogSegNo	latestSegBoundary;
-	XLogRecPtr	latestSegBoundaryEndPtr;
-
-	slock_t		segtrack_lck;	/* locks shared variables shown above */
 } XLogCtlData;
 
 static XLogCtlData *XLogCtl = NULL;
@@ -932,7 +920,6 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
 						   XLogSegNo *endlogSegNo);
 static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
-static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1167,56 +1154,23 @@ XLogInsertRecord(XLogRecData *rdata,
 	END_CRIT_SECTION();
 
 	/*
-	 * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
-	 * segment boundary, register that and wake up walwriter.
+	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		XLogSegNo	StartSeg;
-		XLogSegNo	EndSeg;
-
-		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
-		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
-
-		/*
-		 * Register our crossing the segment boundary if that occurred.
-		 *
-		 * Note that we did not use XLByteToPrevSeg() for determining the
-		 * ending segment.  This is so that a record that fits perfectly into
-		 * the end of the segment causes the latter to get marked ready for
-		 * archival immediately.
-		 */
-		if (StartSeg != EndSeg && XLogArchivingActive())
-			RegisterSegmentBoundary(EndSeg, EndPos);
-
-		/*
-		 * Advance LogwrtRqst.Write so that it includes new block(s).
-		 *
-		 * We do this after registering the segment boundary so that the
-		 * comparison with the flushed pointer below can use the latest value
-		 * known globally.
-		 */
 		SpinLockAcquire(&XLogCtl->info_lck);
+		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
 		/* update local result copy while I have the chance */
 		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
-
-		/*
-		 * There's a chance that the record was already flushed to disk and we
-		 * missed marking segments as ready for archive.  If this happens, we
-		 * nudge the WALWriter, which will take care of notifying segments as
-		 * needed.
-		 */
-		if (StartSeg != EndSeg && XLogArchivingActive() &&
-			LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch)
-			SetLatch(ProcGlobal->walwriterLatch);
 	}
 
 	/*
 	 * If this was an XLOG_SWITCH record, flush the record and the empty
-	 * padding space that fills the rest of the segment.
+	 * padding space that fills the rest of the segment, and perform
+	 * end-of-segment actions (eg, notifying archiver).
 	 */
 	if (isLogSwitch)
 	{
@@ -2467,7 +2421,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
 	/* We should always be inside a critical section here */
 	Assert(CritSectionCount > 0);
-	Assert(LWLockHeldByMe(WALWriteLock));
 
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
@@ -2633,12 +2586,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			 * later. Doing it here ensures that one and only one backend will
 			 * perform this fsync.
 			 *
-			 * If WAL archiving is active, we attempt to notify the archiver
-			 * of any segments that are now ready for archival.
-			 *
-			 * This is also the right place to update the timer for
-			 * archive_timeout and to signal for a checkpoint if too many
-			 * logfile segments have been used since the last checkpoint.
+			 * This is also the right place to notify the Archiver that the
+			 * segment is ready to copy to archival storage, and to update the
+			 * timer for archive_timeout, and to signal for a checkpoint if
+			 * too many logfile segments have been used since the last
+			 * checkpoint.
 			 */
 			if (finishing_seg)
 			{
@@ -2650,7 +2602,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
 
 				if (XLogArchivingActive())
-					NotifySegmentsReadyForArchive(LogwrtResult.Flush);
+					XLogArchiveNotifySeg(openLogSegNo);
 
 				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2738,9 +2690,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
-
-	if (XLogArchivingActive())
-		NotifySegmentsReadyForArchive(LogwrtResult.Flush);
 }
 
 /*
@@ -4379,131 +4328,6 @@ ValidateXLOGDirectoryStructure(void)
 	}
 }
 
-/*
- * RegisterSegmentBoundary
- *
- * WAL records that are split across a segment boundary require special
- * treatment for archiving: the initial segment must not be archived until
- * the end segment has been flushed, in case we crash before we have
- * the chance to flush the end segment (because after recovery we would
- * overwrite that WAL record with a different one, and so the file we
- * archived no longer represents truth.)  This also applies to streaming
- * physical replication.
- *
- * To handle this, we keep track of the LSN of WAL records that cross
- * segment boundaries.  Two such are sufficient: the ones with the
- * earliest and the latest end pointers we know about, since the flush
- * position advances monotonically.  WAL record writers register
- * boundary-crossing records here, which is used by .ready file creation
- * to delay until the end segment is known flushed.
- */
-static void
-RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
-{
-	XLogSegNo	segno PG_USED_FOR_ASSERTS_ONLY;
-
-	/* verify caller computed segment number correctly */
-	AssertArg((XLByteToSeg(endpos, segno, wal_segment_size), segno == seg));
-
-	SpinLockAcquire(&XLogCtl->segtrack_lck);
-
-	/*
-	 * If no segment boundaries are registered, store the new segment boundary
-	 * in earliestSegBoundary.  Otherwise, store the greater segment
-	 * boundaries in latestSegBoundary.
-	 */
-	if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
-	{
-		XLogCtl->earliestSegBoundary = seg;
-		XLogCtl->earliestSegBoundaryEndPtr = endpos;
-	}
-	else if (seg > XLogCtl->earliestSegBoundary &&
-			 (XLogCtl->latestSegBoundary == MaxXLogSegNo ||
-			  seg > XLogCtl->latestSegBoundary))
-	{
-		XLogCtl->latestSegBoundary = seg;
-		XLogCtl->latestSegBoundaryEndPtr = endpos;
-	}
-
-	SpinLockRelease(&XLogCtl->segtrack_lck);
-}
-
-/*
- * NotifySegmentsReadyForArchive
- *
- * Mark segments as ready for archival, given that it is safe to do so.
- * This function is idempotent.
- */
-void
-NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
-{
-	XLogSegNo	latest_boundary_seg;
-	XLogSegNo	last_notified;
-	XLogSegNo	flushed_seg;
-	XLogSegNo	seg;
-	bool		keep_latest;
-
-	XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size);
-
-	SpinLockAcquire(&XLogCtl->segtrack_lck);
-
-	if (XLogCtl->latestSegBoundary <= flushed_seg &&
-		XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr)
-	{
-		latest_boundary_seg = XLogCtl->latestSegBoundary;
-		keep_latest = false;
-	}
-	else if (XLogCtl->earliestSegBoundary <= flushed_seg &&
-			 XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr)
-	{
-		latest_boundary_seg = XLogCtl->earliestSegBoundary;
-		keep_latest = true;
-	}
-	else
-	{
-		SpinLockRelease(&XLogCtl->segtrack_lck);
-		return;
-	}
-
-	last_notified = XLogCtl->lastNotifiedSeg;
-
-	/*
-	 * Update shared memory and discard segment boundaries that are no longer
-	 * needed.
-	 *
-	 * It is safe to update shared memory before we attempt to create the
-	 * .ready files.  If our calls to XLogArchiveNotifySeg() fail,
-	 * RemoveOldXlogFiles() will retry it as needed.
-	 */
-	if (last_notified < latest_boundary_seg - 1)
-		XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
-
-	if (keep_latest)
-	{
-		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
-		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
-	}
-	else
-	{
-		XLogCtl->earliestSegBoundary = MaxXLogSegNo;
-		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
-	}
-
-	XLogCtl->latestSegBoundary = MaxXLogSegNo;
-	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
-
-	SpinLockRelease(&XLogCtl->segtrack_lck);
-
-	/*
-	 * Notify archiver about segments that are ready for archival (by creating
-	 * the corresponding .ready files).
-	 */
-	for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
-		XLogArchiveNotifySeg(seg, false);
-
-	PgArchWakeup();
-}
-
 /*
  * Remove previous backup history files.  This also retries creation of
  * .ready files for any backup history files for which XLogArchiveNotify
@@ -5406,17 +5230,9 @@ XLOGShmemInit(void)
 
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
-	SpinLockInit(&XLogCtl->segtrack_lck);
 	SpinLockInit(&XLogCtl->ulsn_lck);
 	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
 	ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
-
-	/* Initialize stuff for marking segments as ready for archival. */
-	XLogCtl->lastNotifiedSeg = MaxXLogSegNo;
-	XLogCtl->earliestSegBoundary = MaxXLogSegNo;
-	XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
-	XLogCtl->latestSegBoundary = MaxXLogSegNo;
-	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
 }
 
 /*
@@ -8057,20 +7873,6 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-	/*
-	 * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
-	 */
-	if (XLogArchivingActive())
-	{
-		XLogSegNo	EndOfLogSeg;
-
-		XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size);
-
-		SpinLockAcquire(&XLogCtl->segtrack_lck);
-		XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1;
-		SpinLockRelease(&XLogCtl->segtrack_lck);
-	}
-
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
@@ -8198,7 +8000,7 @@ StartupXLOG(void)
 				XLogArchiveCleanup(partialfname);
 
 				durable_rename(origpath, partialpath, ERROR);
-				XLogArchiveNotify(partialfname, true);
+				XLogArchiveNotify(partialfname);
 			}
 		}
 	}
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index b9c19b2085..26b023e754 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -433,7 +433,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
 	if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
 		XLogArchiveForceDone(xlogfname);
 	else
-		XLogArchiveNotify(xlogfname, true);
+		XLogArchiveNotify(xlogfname);
 
 	/*
 	 * If the existing file was replaced, since walsenders might have it open,
@@ -462,12 +462,9 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
  * by the archiver, e.g. we write 0000000100000001000000C6.ready
  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
  * then when complete, rename it to 0000000100000001000000C6.done
- *
- * Optionally, nudge the archiver process so that it'll notice the file we
- * create.
  */
 void
-XLogArchiveNotify(const char *xlog, bool nudge)
+XLogArchiveNotify(const char *xlog)
 {
 	char		archiveStatusPath[MAXPGPATH];
 	FILE	   *fd;
@@ -492,8 +489,8 @@ XLogArchiveNotify(const char *xlog, bool nudge)
 		return;
 	}
 
-	/* If caller requested, let archiver know it's got work to do */
-	if (nudge)
+	/* Notify archiver that it's got something to do */
+	if (IsUnderPostmaster)
 		PgArchWakeup();
 }
 
@@ -501,12 +498,12 @@ XLogArchiveNotify(const char *xlog, bool nudge)
  * Convenience routine to notify using segment number representation of filename
  */
 void
-XLogArchiveNotifySeg(XLogSegNo segno, bool nudge)
+XLogArchiveNotifySeg(XLogSegNo segno)
 {
 	char		xlog[MAXFNAMELEN];
 
 	XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size);
-	XLogArchiveNotify(xlog, nudge);
+	XLogArchiveNotify(xlog);
 }
 
 /*
@@ -611,7 +608,7 @@ XLogArchiveCheckDone(const char *xlog)
 		return true;
 
 	/* Retry creation of the .ready file */
-	XLogArchiveNotify(xlog, true);
+	XLogArchiveNotify(xlog);
 	return false;
 }
 
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 6a1e16edc2..626fae8454 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -248,13 +248,6 @@ WalWriterMain(void)
 		/* Process any signals received recently */
 		HandleWalWriterInterrupts();
 
-		/*
-		 * Notify the archiver of any WAL segments that are ready.  We do this
-		 * here to handle a race condition where WAL is flushed to disk prior
-		 * to registering the segment boundary.
-		 */
-		NotifySegmentsReadyForArchive(GetFlushRecPtr());
-
 		/*
 		 * Do what we're here for; then, if XLogBackgroundFlush() found useful
 		 * work to do, reset hibernation counter.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 60de3be92c..9a2bc37fd7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -622,7 +622,7 @@ WalReceiverMain(void)
 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
 				XLogArchiveForceDone(xlogfname);
 			else
-				XLogArchiveNotify(xlogfname, true);
+				XLogArchiveNotify(xlogfname);
 		}
 		recvFile = -1;
 
@@ -760,7 +760,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
 				XLogArchiveForceDone(fname);
 			else
-				XLogArchiveNotify(fname, true);
+				XLogArchiveNotify(fname);
 
 			pfree(fname);
 			pfree(content);
@@ -915,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 				if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
 					XLogArchiveForceDone(xlogfname);
 				else
-					XLogArchiveNotify(xlogfname, true);
+					XLogArchiveNotify(xlogfname);
 			}
 			recvFile = -1;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 6b6ae81c2d..0a8ede700d 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -315,7 +315,6 @@ extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
-extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);
 
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h
index 935b4cb02d..3edd1a976c 100644
--- a/src/include/access/xlogarchive.h
+++ b/src/include/access/xlogarchive.h
@@ -23,8 +23,8 @@ extern bool RestoreArchivedFile(char *path, const char *xlogfname,
 extern void ExecuteRecoveryCommand(const char *command, const char *commandName,
 								   bool failOnSignal);
 extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname);
-extern void XLogArchiveNotify(const char *xlog, bool nudge);
-extern void XLogArchiveNotifySeg(XLogSegNo segno, bool nudge);
+extern void XLogArchiveNotify(const char *xlog);
+extern void XLogArchiveNotifySeg(XLogSegNo segno);
 extern void XLogArchiveForceDone(const char *xlog);
 extern bool XLogArchiveCheckDone(const char *xlog);
 extern bool XLogArchiveIsBusy(const char *xlog);
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 9b455e88e3..60348d1850 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -46,7 +46,6 @@ typedef uint64 XLogRecPtr;
  * XLogSegNo - physical log file sequence number.
  */
 typedef uint64 XLogSegNo;
-#define MaxXLogSegNo	((XLogSegNo) 0xFFFFFFFFFFFFFFFF)
 
 /*
  * TimeLineID (TLI) - identifies different database histories to prevent
-- 
2.30.2

>From f767cdddb3120f1f6c079c8eb00eaff38ea98c79 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v3 2/5] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/transam/xlog.c       | 53 +++++++++++++++++++++++--
 src/backend/access/transam/xlogreader.c | 39 +++++++++++++++++-
 src/include/access/xlog_internal.h      | 14 ++++++-
 src/include/access/xlogreader.h         |  3 ++
 4 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..411f1618df 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -586,6 +586,8 @@ typedef struct XLogCtlData
 	XLogRecPtr	replicationSlotMinLSN;	/* oldest LSN needed by any slot */
 
 	XLogSegNo	lastRemovedSegNo;	/* latest removed/recycled XLOG segment */
+	XLogRecPtr	abortedContrecordPtr; /* LSN of incomplete record at end of
+									   * WAL */
 
 	/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
 	XLogRecPtr	unloggedLSN;
@@ -848,6 +850,7 @@ static XLogSource XLogReceiptSource = XLOG_FROM_ANY;
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;	/* start of last record read */
 static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
+static XLogRecPtr abortedContrecordPtr;	/* end+1 of incomplete record */
 
 /*
  * Local copies of equivalent fields in the control file.  When running
@@ -2246,6 +2249,30 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
+		/*
+		 * If the last page ended with an aborted partial continuation record,
+		 * mark the new page to indicate that the partial record can be
+		 * omitted.
+		 *
+		 * This happens only once at the end of recovery, so there's no race
+		 * condition here.
+		 */
+		if (XLogCtl->abortedContrecordPtr >= NewPageBeginPtr)
+		{
+#ifdef WAL_DEBUG
+			if (XLogCtl->abortedContrecordPtr != NewPageBeginPtr)
+				elog(PANIC, "inconsistent aborted contrecord location %X/%X, expected %X/%X",
+					 LSN_FORMAT_ARGS(XLogCtl->abortedContrecordPtr),
+					 LSN_FORMAT_ARGS(NewPageBeginPtr));
+			ereport(LOG,
+					(errmsg_internal("setting XLP_FIRST_IS_ABORTED_PARTIAL flag at %X/%X",
+									 LSN_FORMAT_ARGS(NewPageBeginPtr))));
+#endif
+			NewPage->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL;
+
+			XLogCtl->abortedContrecordPtr = InvalidXLogRecPtr;
+		}
+
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -4392,6 +4419,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		record = XLogReadRecord(xlogreader, &errormsg);
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
+		abortedContrecordPtr = xlogreader->abortedContrecordPtr;
 		if (record == NULL)
 		{
 			if (readFile >= 0)
@@ -7691,10 +7719,29 @@ StartupXLOG(void)
 	/*
 	 * Re-fetch the last valid or last applied record, so we can identify the
 	 * exact endpoint of what we consider the valid portion of WAL.
+	 *
+	 * When recovery ended in an incomplete record, continue writing from the
+	 * point where it went missing.  This leaves behind an initial part of
+	 * broken record, which rescues downstream which have already received
+	 * that first part.
 	 */
-	XLogBeginRead(xlogreader, LastRec);
-	record = ReadRecord(xlogreader, PANIC, false);
-	EndOfLog = EndRecPtr;
+	if (XLogRecPtrIsInvalid(abortedContrecordPtr))
+	{
+		XLogBeginRead(xlogreader, LastRec);
+		record = ReadRecord(xlogreader, PANIC, false);
+		EndOfLog = EndRecPtr;
+	}
+	else
+	{
+#ifdef WAL_DEBUG
+		ereport(LOG,
+				(errmsg_internal("recovery overwriting broken contrecord at %X/%X (EndRecPtr: %X/%X)",
+								 LSN_FORMAT_ARGS(abortedContrecordPtr),
+								 LSN_FORMAT_ARGS(EndRecPtr))));
+#endif
+		EndOfLog = abortedContrecordPtr;
+		XLogCtl->abortedContrecordPtr = abortedContrecordPtr;
+	}
 
 	/*
 	 * EndOfLogTLI is the TLI in the filename of the XLOG segment containing
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..dbfa6d3562 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +294,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedContrecordPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +321,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +419,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -442,14 +448,28 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 			readOff = ReadPageInternal(state, targetPagePtr,
 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
-
 			if (readOff < 0)
 				goto err;
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an "aborted
+			 * partial" flag, that means the continuation record was lost.
+			 * Ignore the record we were reading, since we now know it's broken
+			 * and lost forever, and restart the read by assuming the address
+			 * to read is the location where we found this flag.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
+			{
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +571,21 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  We deal with this by setting abortedContrecordPtr to the
+		 * location of the piece we failed to read, or the start of the page
+		 * we read where validation failed.  If caller is WAL replay, it will
+		 * know that recovery ended and that this is where to start writing
+		 * future WAL marking the next piece with XLP_FIRST_IS_ABORTED_PARTIAL,
+		 * which will in turn signal downstream WAL consumers that the broken
+		 * WAL record here is to be ignored.
+		 */
+		state->abortedContrecordPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..9bc72b4c95 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/*
+ * This flag marks a record that replaces a missing contrecord.
+ * When on WAL replay we expect a continuation record at the start of
+ * a page that is not there, recovery ends but the checkpoint record
+ * that follows is marked with this flag, which indicates WAL readers
+ * that the incomplete record is to be skipped, and that WAL reading
+ * is to be resumed here.  This is useful for downstream consumers of
+ * WAL which have already received (the first half of) the original
+ * broken WAL record, such as via archive_command or physical streaming
+ * replication, which we cannot "rewind".
+ */
+#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..96e5eab1c9 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -175,6 +175,9 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/* end+1 of incomplete record at end of WAL */
+	XLogRecPtr	abortedContrecordPtr;
+
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
-- 
2.30.2

>From 725d099ed1d917c68adca39a2ab989e326271cc0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 3 Sep 2021 17:56:11 -0400
Subject: [PATCH v3 3/5] crosscheck that FIRST_IS_CONTRECORD is not together
 with FIRST_IS_ABORTED_PARTIAL

---
 src/backend/access/transam/xlogreader.c | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index dbfa6d3562..0098c42d2b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -464,6 +464,26 @@ restart:
 			 */
 			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
 			{
+				/*
+				 * If we see XLP_FIRST_IS_CONTRECORD together with
+				 * XLP_FIRST_IS_ABORTED_PARTIAL, something has gone wrong
+				 * while writing this record.  However, we don't get too angry
+				 * about it for now.  (If the record is really corrupt, the
+				 * header will fail the CRC check later.)
+				 */
+				if (pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)
+				{
+#ifndef FRONTEND
+					ereport(WARNING,
+							(errmsg_internal("unexpected flag XLP_FIRST_IS_CONTRECORD found together with XLP_FIRST_IS_ABORTED_PARTIAL in %X/%X",
+											 LSN_FORMAT_ARGS(targetPagePtr))));
+#else
+					fprintf(stderr,
+							"unexpected flag XLP_FIRST_IS_CONTRECORD found together with XLP_FIRST_IS_ABORTED_PARTIAL in %X/%X\n",
+							LSN_FORMAT_ARGS(targetPagePtr));
+#endif
+				}
+
 				ResetDecoder(state);
 				RecPtr = targetPagePtr;
 				goto restart;
-- 
2.30.2

>From 2a2c4fc06c71568e045bb2002051b9be81f5edd2 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 18:03:35 -0400
Subject: [PATCH v3 4/5] debugging changes

---
 src/backend/access/transam/xlog.c | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 411f1618df..66eb5c7a68 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -726,6 +726,8 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastFpwDisableRecPtr;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	bool		crossSeg;
 } XLogCtlData;
 
 static XLogCtlData *XLogCtl = NULL;
@@ -1161,6 +1163,13 @@ XLogInsertRecord(XLogRecData *rdata,
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
+		XLogSegNo	StartSeg, EndSeg;
+
+		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+		if (StartSeg != EndSeg)
+			XLogCtl->crossSeg = true;
+
 		SpinLockAcquire(&XLogCtl->info_lck);
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
@@ -2623,11 +2632,21 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			{
 				issue_xlog_fsync(openLogFile, openLogSegNo);
 
+				if (XLogCtl->crossSeg)
+				{
+					static int c = 0;
+					struct stat b;
+
+					if (stat("/tmp/hoge", &b) == 0)
+						Assert(c++ < 1);
+				}
+
 				/* signal that we need to wakeup walsenders later */
 				WalSndWakeupRequest();
 
 				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
 
+				/* XXX can we give this responsibility to WAL writer? */
 				if (XLogArchivingActive())
 					XLogArchiveNotifySeg(openLogSegNo);
 
-- 
2.30.2

>From 23c7e4e31fea629eabb00e3e4939168d504c15c8 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 3 Sep 2021 17:55:21 -0400
Subject: [PATCH v3 5/5] upgrade assert to if, in pg_rewind

---
 src/bin/pg_rewind/parsexlog.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7d6a..97cc12bd92 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -102,7 +102,9 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	 * If 'endpoint' didn't point exactly at a record boundary, the caller
 	 * messed up.
 	 */
-	Assert(xlogreader->EndRecPtr == endpoint);
+	if (xlogreader->EndRecPtr == endpoint)
+		pg_fatal("end pointer %X/%X is not a valid end point; expected %X/%X",
+				 LSN_FORMAT_ARGS(endpoint), LSN_FORMAT_ARGS(xlogreader->EndRecPtr));
 
 	XLogReaderFree(xlogreader);
 	if (xlogreadfd != -1)
-- 
2.30.2

Reply via email to