So I'm again distracted by something else, so here's what will have to
pass for v3 for the time being.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24165ab03e..43495b8260 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -731,8 +731,10 @@ typedef struct XLogCtlData
 	 */
 	XLogSegNo	lastNotifiedSeg;
 	XLogSegNo	earliestSegBoundary;
+	XLogRecPtr	earliestSegBoundaryStartPtr;
 	XLogRecPtr	earliestSegBoundaryEndPtr;
 	XLogSegNo	latestSegBoundary;
+	XLogRecPtr	latestSegBoundaryStartPtr;
 	XLogRecPtr	latestSegBoundaryEndPtr;
 
 	slock_t		segtrack_lck;	/* locks shared variables shown above */
@@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
 						   XLogSegNo *endlogSegNo);
 static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
-static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1022,6 +1024,8 @@ XLogInsertRecord(XLogRecData *rdata,
 							   info == XLOG_SWITCH);
 	XLogRecPtr	StartPos;
 	XLogRecPtr	EndPos;
+	XLogSegNo	StartSeg;
+	XLogSegNo	EndSeg;
 	bool		prevDoPageWrites = doPageWrites;
 
 	/* we assume that all of the record header is in the first chunk */
@@ -1157,6 +1161,31 @@ XLogInsertRecord(XLogRecData *rdata,
 		 */
 	}
 
+	/*
+	 * Before releasing our WAL insertion lock, register that we crossed the
+	 * segment boundary if that occurred.  We need to do it with the lock held
+	 * for GetSafeFlushRecPtr's sake: otherwise it could see the WAL flush
+	 * point advance but not see the registration, which would lead it to
+	 * wrongly conclude that our flush point is safe to use.
+	 */
+	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+	{
+		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+
+		/*
+		 * Register our crossing the segment boundary if that occurred.
+		 *
+		 * Note that we did not use XLByteToPrevSeg() for determining the
+		 * ending segment.  This is so that a record that fits perfectly into
+		 * the end of the segment causes said segment to get marked ready for
+		 * archival immediately.
+		 */
+		if (StartSeg != EndSeg &&
+			(XLogArchivingActive() || XLogStandbyInfoActive()))
+			RegisterSegmentBoundary(EndSeg, StartPos, EndPos);
+	}
+
 	/*
 	 * Done! Let others know that we're finished.
 	 */
@@ -1168,27 +1197,10 @@ XLogInsertRecord(XLogRecData *rdata,
 
 	/*
 	 * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
-	 * segment boundary, register that and wake up walwriter.
+	 * segment boundary, wake up walwriter.
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		XLogSegNo	StartSeg;
-		XLogSegNo	EndSeg;
-
-		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
-		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
-
-		/*
-		 * Register our crossing the segment boundary if that occurred.
-		 *
-		 * Note that we did not use XLByteToPrevSeg() for determining the
-		 * ending segment.  This is so that a record that fits perfectly into
-		 * the end of the segment causes the latter to get marked ready for
-		 * archival immediately.
-		 */
-		if (StartSeg != EndSeg && XLogArchivingActive())
-			RegisterSegmentBoundary(EndSeg, EndPos);
-
 		/*
 		 * Advance LogwrtRqst.Write so that it includes new block(s).
 		 *
@@ -2649,7 +2661,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 
 				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
 
-				if (XLogArchivingActive())
+				if (XLogArchivingActive() || XLogStandbyInfoActive())
 					NotifySegmentsReadyForArchive(LogwrtResult.Flush);
 
 				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
@@ -2739,7 +2751,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
-	if (XLogArchivingActive())
+	if (XLogArchivingActive() || XLogStandbyInfoActive())
 		NotifySegmentsReadyForArchive(LogwrtResult.Flush);
 }
 
@@ -4398,7 +4410,7 @@ ValidateXLOGDirectoryStructure(void)
  * to delay until the end segment is known flushed.
  */
 static void
-RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos)
 {
 	XLogSegNo	segno PG_USED_FOR_ASSERTS_ONLY;
 
@@ -4415,6 +4427,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 	if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
 	{
 		XLogCtl->earliestSegBoundary = seg;
+		XLogCtl->earliestSegBoundaryStartPtr = startpos;
 		XLogCtl->earliestSegBoundaryEndPtr = endpos;
 	}
 	else if (seg > XLogCtl->earliestSegBoundary &&
@@ -4422,6 +4435,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 			  seg > XLogCtl->latestSegBoundary))
 	{
 		XLogCtl->latestSegBoundary = seg;
+		XLogCtl->latestSegBoundaryStartPtr = startpos;
 		XLogCtl->latestSegBoundaryEndPtr = endpos;
 	}
 
@@ -4438,10 +4452,8 @@ void
 NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
 {
 	XLogSegNo	latest_boundary_seg;
-	XLogSegNo	last_notified;
 	XLogSegNo	flushed_seg;
 	XLogSegNo	seg;
-	bool		keep_latest;
 
 	XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size);
 
@@ -4451,13 +4463,17 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
 		XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr)
 	{
 		latest_boundary_seg = XLogCtl->latestSegBoundary;
-		keep_latest = false;
+		XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+		XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr;
+		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
 	}
 	else if (XLogCtl->earliestSegBoundary <= flushed_seg &&
 			 XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr)
 	{
 		latest_boundary_seg = XLogCtl->earliestSegBoundary;
-		keep_latest = true;
+		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+		XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr;
+		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
 	}
 	else
 	{
@@ -4465,41 +4481,38 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
 		return;
 	}
 
-	last_notified = XLogCtl->lastNotifiedSeg;
-
-	/*
-	 * Update shared memory and discard segment boundaries that are no longer
-	 * needed.
-	 *
-	 * It is safe to update shared memory before we attempt to create the
-	 * .ready files.  If our calls to XLogArchiveNotifySeg() fail,
-	 * RemoveOldXlogFiles() will retry it as needed.
-	 */
-	if (last_notified < latest_boundary_seg - 1)
-		XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
-
-	if (keep_latest)
-	{
-		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
-		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
-	}
-	else
-	{
-		XLogCtl->earliestSegBoundary = MaxXLogSegNo;
-		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
-	}
-
 	XLogCtl->latestSegBoundary = MaxXLogSegNo;
+	XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr;
 	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
 
-	SpinLockRelease(&XLogCtl->segtrack_lck);
+	if (XLogArchivingActive())
+	{
+		XLogSegNo	last_notified;
 
-	/*
-	 * Notify archiver about segments that are ready for archival (by creating
-	 * the corresponding .ready files).
-	 */
-	for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
-		XLogArchiveNotifySeg(seg, false);
+		last_notified = XLogCtl->lastNotifiedSeg;
+
+		/*
+		 * Update shared memory and discard segment boundaries that are no
+		 * longer needed.
+		 *
+		 * It is safe to have updated shared memory before we attempt to
+		 * create the .ready files.  If our calls to XLogArchiveNotifySeg()
+		 * fail, RemoveOldXlogFiles() will retry it as needed.
+		 */
+		if (last_notified < latest_boundary_seg - 1)
+			XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
+
+		SpinLockRelease(&XLogCtl->segtrack_lck);
+
+		/*
+		 * Notify archiver about segments that are ready for archival (by
+		 * creating the corresponding .ready files).
+		 */
+		for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
+			XLogArchiveNotifySeg(seg, false);
+	}
+	else
+		SpinLockRelease(&XLogCtl->segtrack_lck);
 
 	PgArchWakeup();
 }
@@ -8776,6 +8789,30 @@ GetFlushRecPtr(void)
 	return LogwrtResult.Flush;
 }
 
+/*
+ * GetSafeFlushRecPtr -- Returns a "safe" flush position.
+ *
+ * Similar to the above, except that avoid reporting a location that might be
+ * overwritten if there's a crash before syncing the next segment.
+ */
+XLogRecPtr
+GetSafeFlushRecPtr(void)
+{
+	XLogRecPtr		flush;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	flush = XLogCtl->LogwrtResult.Flush;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	SpinLockAcquire(&XLogCtl->segtrack_lck);
+	if (XLogCtl->earliestSegBoundary != MaxXLogSegNo &&
+		XLogCtl->earliestSegBoundaryStartPtr < flush)
+		flush = XLogCtl->earliestSegBoundaryStartPtr;
+	SpinLockRelease(&XLogCtl->segtrack_lck);
+
+	return flush;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..aafb3e91de 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -621,7 +621,7 @@ StartReplication(StartReplicationCmd *cmd)
 		FlushPtr = GetStandbyFlushRecPtr();
 	}
 	else
-		FlushPtr = GetFlushRecPtr();
+		FlushPtr = GetSafeFlushRecPtr();
 
 	if (cmd->timeline != 0)
 	{
@@ -2650,7 +2650,7 @@ XLogSendPhysical(void)
 		 * primary: if the primary subsequently crashes and restarts, standbys
 		 * must not have applied any WAL that got lost on the primary.
 		 */
-		SendRqstPtr = GetFlushRecPtr();
+		SendRqstPtr = GetSafeFlushRecPtr();
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 6b6ae81c2d..1af59c36d4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetSafeFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);

Reply via email to