On 2021-Sep-14, Alvaro Herrera wrote:

> On 2021-Sep-08, Kyotaro Horiguchi wrote:
> 
> > Thanks!  As my understanding the new record add the ability to
> > cross-check between a teard-off contrecord and the new record inserted
> > after the teard-off record.  I didn't test the version by myself but
> > the previous version implemented the essential machinery and that
> > won't change fundamentally by the new record.
> > 
> > So I think the current patch deserves to see the algorithm actually
> > works against the problem.
> 
> Here's a version with the new record type.  It passes check-world, and
> it seems to work correctly to prevent overwrite of the tail end of a
> segment containing a broken record.  This is very much WIP still;
> comments are missing and I haven't tried to implement any sort of
> verification that the record being aborted is the right one.

Here's the attachment I forgot earlier.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
"[PostgreSQL] is a great group; in my opinion it is THE best open source
development communities in existence anywhere."                (Lamar Owen)
>From 3de9660b2e570604412d435023dcbd13355fc123 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v5] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/rmgrdesc/xlogdesc.c  | 11 +++
 src/backend/access/transam/xlog.c       | 91 +++++++++++++++++++++++--
 src/backend/access/transam/xloginsert.c |  3 +
 src/backend/access/transam/xlogreader.c | 39 ++++++++++-
 src/include/access/xlog.h               |  1 +
 src/include/access/xlog_internal.h      | 20 +++++-
 src/include/access/xlogreader.h         |  3 +
 src/include/catalog/pg_control.h        |  1 +
 8 files changed, 159 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e6090a9dad..0be382f8a5 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -139,6 +139,14 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
+		appendStringInfo(buf, "lsn %X/%X",
+						 LSN_FORMAT_ARGS(xlrec.overwritten_lsn));
+	}
 }
 
 const char *
@@ -178,6 +186,9 @@ xlog_identify(uint8 info)
 		case XLOG_END_OF_RECOVERY:
 			id = "END_OF_RECOVERY";
 			break;
+		case XLOG_OVERWRITE_CONTRECORD:
+			id = "OVERWRITE_CONTRECORD";
+			break;
 		case XLOG_FPI:
 			id = "FPI";
 			break;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..73cfd64125 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -586,6 +586,8 @@ typedef struct XLogCtlData
 	XLogRecPtr	replicationSlotMinLSN;	/* oldest LSN needed by any slot */
 
 	XLogSegNo	lastRemovedSegNo;	/* latest removed/recycled XLOG segment */
+	XLogRecPtr	abortedContrecordPtr; /* LSN of incomplete record at end of
+									   * WAL */
 
 	/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
 	XLogRecPtr	unloggedLSN;
@@ -894,6 +896,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 								TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -950,6 +953,7 @@ static void rm_redo_error_callback(void *arg);
 static int	get_sync_bit(int method);
 
 static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+								bool set_aborted_partial,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos);
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
@@ -1120,8 +1124,9 @@ XLogInsertRecord(XLogRecData *rdata,
 		 * All the record data, including the header, is now ready to be
 		 * inserted. Copy the record in the space reserved.
 		 */
-		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
-							StartPos, EndPos);
+		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch,
+							flags & XLOG_SET_ABORTED_PARTIAL,
+							rdata, StartPos, EndPos);
 
 		/*
 		 * Unless record is flagged as not important, update LSN of last
@@ -1504,7 +1509,8 @@ checkXLogConsistency(XLogReaderState *record)
  * area in the WAL.
  */
 static void
-CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+					bool set_aborted_partial, XLogRecData *rdata,
 					XLogRecPtr StartPos, XLogRecPtr EndPos)
 {
 	char	   *currpos;
@@ -1560,6 +1566,10 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 			pagehdr->xlp_rem_len = write_len - written;
 			pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
 
+			/* XXX can only happen once in the loop. Verify? */
+			if (set_aborted_partial)
+				pagehdr->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL;
+
 			/* skip over the page header */
 			if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
 			{
@@ -4394,6 +4404,19 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
 		{
+			/*
+			 * When WAL ends in an incomplete record, keep track of it.  After
+			 * recovery is done, we'll write a record to indicate downstream
+			 * WAL readers that that portion is to be ignored.
+			 */
+			if (InRecovery &&
+				!XLogRecPtrIsInvalid(xlogreader->abortedContrecordPtr))
+			{
+				SpinLockAcquire(&XLogCtl->info_lck);
+				XLogCtl->abortedContrecordPtr = xlogreader->abortedContrecordPtr;
+				SpinLockRelease(&XLogCtl->info_lck);
+			}
+
 			if (readFile >= 0)
 			{
 				close(readFile);
@@ -6502,6 +6525,7 @@ StartupXLOG(void)
 	bool		haveTblspcMap = false;
 	XLogRecPtr	RecPtr,
 				checkPointLoc,
+				aborted_lsn,
 				EndOfLog;
 	TimeLineID	EndOfLogTLI;
 	TimeLineID	PrevTimeLineID;
@@ -7655,8 +7679,9 @@ StartupXLOG(void)
 
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
-	 * the startup checkpoint record. It will trump over the checkpoint and
-	 * subsequent records if it's still alive when we start writing WAL.
+	 * the startup checkpoint and aborted-contrecord records. It will trump
+	 * over these records and subsequent ones if it's still alive when we
+	 * start writing WAL.
 	 */
 	XLogShutdownWalRcv();
 
@@ -7689,8 +7714,12 @@ StartupXLOG(void)
 	StandbyMode = false;
 
 	/*
-	 * Re-fetch the last valid or last applied record, so we can identify the
-	 * exact endpoint of what we consider the valid portion of WAL.
+	 * Determine where to start writing WAL next.
+	 *
+	 * When recovery ended in an incomplete record, write a WAL record about
+	 * that and continue after it.  In all other cases, re-fetch the last
+	 * valid or last applied record, so we can identify the exact endpoint of
+	 * what we consider the valid portion of WAL.
 	 */
 	XLogBeginRead(xlogreader, LastRec);
 	record = ReadRecord(xlogreader, PANIC, false);
@@ -7821,6 +7850,17 @@ StartupXLOG(void)
 	XLogCtl->ThisTimeLineID = ThisTimeLineID;
 	XLogCtl->PrevTimeLineID = PrevTimeLineID;
 
+	/* XXX explain EndOfLog change */
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (!XLogRecPtrIsInvalid(XLogCtl->abortedContrecordPtr))
+	{
+		aborted_lsn = EndOfLog = XLogCtl->abortedContrecordPtr;
+		XLogCtl->abortedContrecordPtr = InvalidXLogRecPtr;
+	}
+	else
+		aborted_lsn = InvalidXLogRecPtr;
+	SpinLockRelease(&XLogCtl->info_lck);
+
 	/*
 	 * Prepare to write WAL starting at EndOfLog location, and init xlog
 	 * buffer cache using the block containing the last record from the
@@ -7880,6 +7920,8 @@ StartupXLOG(void)
 	 */
 	Insert->fullPageWrites = lastFullPageWrites;
 	LocalSetXLogInsertAllowed();
+	if (!XLogRecPtrIsInvalid(aborted_lsn))
+		CreateOverwriteContrecordRecord(aborted_lsn);
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
 
@@ -9365,6 +9407,37 @@ CreateEndOfRecoveryRecord(void)
 	LocalXLogInsertAllowed = -1;	/* return to "check" state */
 }
 
+/* XXX document this */
+static XLogRecPtr
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+{
+	xl_overwrite_contrecord xlrec;
+	XLogRecPtr	recptr;
+
+	/* sanity check */
+	if (!RecoveryInProgress())
+		elog(ERROR, "can only be used at end of recovery");
+
+	xlrec.overwritten_lsn = aborted_lsn;
+	/* XXX include the CRC-so-far? */
+
+	START_CRIT_SECTION();
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
+
+	/* this record overwrites the tail of the incomplete record */
+	XLogSetRecordFlags(XLOG_SET_ABORTED_PARTIAL);
+
+	recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
+
+	XLogFlush(recptr);
+
+	END_CRIT_SECTION();
+
+	return recptr;
+}
+
 /*
  * Flush all data in shared memory to disk, and fsync
  *
@@ -10295,6 +10368,10 @@ xlog_redo(XLogReaderState *record)
 
 		RecoveryRestartPoint(&checkPoint);
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		/* XXX verify the LSN of the record we're "aborting" */
+	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
 		xl_end_of_recovery xlrec;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index e596a0470a..23c46ba21f 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -409,6 +409,9 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
  *	 durability, which allows to avoid triggering WAL archiving and other
  *	 background activity.
+ * - XLOG_INCLUDE_XID, ???
+ * - XLOG_OVERWRITE_CONTRECORD, to signal that XLP_FIRST_IS_ABORTED_PARTIAL
+ *   is to be set.
  */
 void
 XLogSetRecordFlags(uint8 flags)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..dbfa6d3562 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +294,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedContrecordPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +321,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +419,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -442,14 +448,28 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 			readOff = ReadPageInternal(state, targetPagePtr,
 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
-
 			if (readOff < 0)
 				goto err;
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an "aborted
+			 * partial" flag, that means the continuation record was lost.
+			 * Ignore the record we were reading, since we now know it's broken
+			 * and lost forever, and restart the read by assuming the address
+			 * to read is the location where we found this flag.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
+			{
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +571,21 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  We deal with this by setting abortedContrecordPtr to the
+		 * location of the piece we failed to read, or the start of the page
+		 * we read where validation failed.  If caller is WAL replay, it will
+		 * know that recovery ended and that this is where to start writing
+		 * future WAL marking the next piece with XLP_FIRST_IS_ABORTED_PARTIAL,
+		 * which will in turn signal downstream WAL consumers that the broken
+		 * WAL record here is to be ignored.
+		 */
+		state->abortedContrecordPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0a8ede700d..caed9294a8 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -213,6 +213,7 @@ extern bool XLOG_DEBUG;
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
 #define XLOG_INCLUDE_XID		0x04	/* include XID of top-level xact */
+#define XLOG_SET_ABORTED_PARTIAL 0x08	/* set the aborted-partial page flag */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..0fb44ba37d 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/*
+ * This flag marks a record that replaces a missing contrecord.
+ * When on WAL replay we expect a continuation record at the start of
+ * a page that is not there, recovery ends but the checkpoint record
+ * that follows is marked with this flag, which indicates WAL readers
+ * that the incomplete record is to be skipped, and that WAL reading
+ * is to be resumed here.  This is useful for downstream consumers of
+ * WAL which have already received (the first half of) the original
+ * broken WAL record, such as via archive_command or physical streaming
+ * replication, which we cannot "rewind".
+ */
+#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@@ -249,6 +261,12 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Overwrite of prior contrecord */
+typedef struct xl_overwrite_contrecord
+{
+	XLogRecPtr	overwritten_lsn;
+} xl_overwrite_contrecord;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..96e5eab1c9 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -175,6 +175,9 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/* end+1 of incomplete record at end of WAL */
+	XLogRecPtr	abortedContrecordPtr;
+
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e3f48158ce..179efa9f4a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_OVERWRITE_CONTRECORD		0xC0
 
 
 /*
-- 
2.30.2

Reply via email to