So I'm again distracted by something else, so here's what will have to pass for v3 for the time being.
-- Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 24165ab03e..43495b8260 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -731,8 +731,10 @@ typedef struct XLogCtlData */ XLogSegNo lastNotifiedSeg; XLogSegNo earliestSegBoundary; + XLogRecPtr earliestSegBoundaryStartPtr; XLogRecPtr earliestSegBoundaryEndPtr; XLogSegNo latestSegBoundary; + XLogRecPtr latestSegBoundaryStartPtr; XLogRecPtr latestSegBoundaryEndPtr; slock_t segtrack_lck; /* locks shared variables shown above */ @@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo, XLogSegNo *endlogSegNo); static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); -static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos); +static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static XLogRecord *ReadRecord(XLogReaderState *xlogreader, @@ -1022,6 +1024,8 @@ XLogInsertRecord(XLogRecData *rdata, info == XLOG_SWITCH); XLogRecPtr StartPos; XLogRecPtr EndPos; + XLogSegNo StartSeg; + XLogSegNo EndSeg; bool prevDoPageWrites = doPageWrites; /* we assume that all of the record header is in the first chunk */ @@ -1157,6 +1161,31 @@ XLogInsertRecord(XLogRecData *rdata, */ } + /* + * Before releasing our WAL insertion lock, register that we crossed the + * segment boundary if that occurred. We need to do it with the lock held + * for GetSafeFlushRecPtr's sake: otherwise it could see the WAL flush + * point advance but not see the registration, which would lead it to + * wrongly conclude that our flush point is safe to use. + */ + if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) + { + XLByteToSeg(StartPos, StartSeg, wal_segment_size); + XLByteToSeg(EndPos, EndSeg, wal_segment_size); + + /* + * Register our crossing the segment boundary if that occurred. + * + * Note that we did not use XLByteToPrevSeg() for determining the + * ending segment. This is so that a record that fits perfectly into + * the end of the segment causes said segment to get marked ready for + * archival immediately. + */ + if (StartSeg != EndSeg && + (XLogArchivingActive() || XLogStandbyInfoActive())) + RegisterSegmentBoundary(EndSeg, StartPos, EndPos); + } + /* * Done! Let others know that we're finished. */ @@ -1168,27 +1197,10 @@ XLogInsertRecord(XLogRecData *rdata, /* * If we crossed page boundary, update LogwrtRqst.Write; if we crossed - * segment boundary, register that and wake up walwriter. + * segment boundary, wake up walwriter. */ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { - XLogSegNo StartSeg; - XLogSegNo EndSeg; - - XLByteToSeg(StartPos, StartSeg, wal_segment_size); - XLByteToSeg(EndPos, EndSeg, wal_segment_size); - - /* - * Register our crossing the segment boundary if that occurred. - * - * Note that we did not use XLByteToPrevSeg() for determining the - * ending segment. This is so that a record that fits perfectly into - * the end of the segment causes the latter to get marked ready for - * archival immediately. - */ - if (StartSeg != EndSeg && XLogArchivingActive()) - RegisterSegmentBoundary(EndSeg, EndPos); - /* * Advance LogwrtRqst.Write so that it includes new block(s). * @@ -2649,7 +2661,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ - if (XLogArchivingActive()) + if (XLogArchivingActive() || XLogStandbyInfoActive()) NotifySegmentsReadyForArchive(LogwrtResult.Flush); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); @@ -2739,7 +2751,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) SpinLockRelease(&XLogCtl->info_lck); } - if (XLogArchivingActive()) + if (XLogArchivingActive() || XLogStandbyInfoActive()) NotifySegmentsReadyForArchive(LogwrtResult.Flush); } @@ -4398,7 +4410,7 @@ ValidateXLOGDirectoryStructure(void) * to delay until the end segment is known flushed. */ static void -RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) +RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos) { XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY; @@ -4415,6 +4427,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) if (XLogCtl->earliestSegBoundary == MaxXLogSegNo) { XLogCtl->earliestSegBoundary = seg; + XLogCtl->earliestSegBoundaryStartPtr = startpos; XLogCtl->earliestSegBoundaryEndPtr = endpos; } else if (seg > XLogCtl->earliestSegBoundary && @@ -4422,6 +4435,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) seg > XLogCtl->latestSegBoundary)) { XLogCtl->latestSegBoundary = seg; + XLogCtl->latestSegBoundaryStartPtr = startpos; XLogCtl->latestSegBoundaryEndPtr = endpos; } @@ -4438,10 +4452,8 @@ void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) { XLogSegNo latest_boundary_seg; - XLogSegNo last_notified; XLogSegNo flushed_seg; XLogSegNo seg; - bool keep_latest; XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size); @@ -4451,13 +4463,17 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr) { latest_boundary_seg = XLogCtl->latestSegBoundary; - keep_latest = false; + XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr; + XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; } else if (XLogCtl->earliestSegBoundary <= flushed_seg && XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr) { latest_boundary_seg = XLogCtl->earliestSegBoundary; - keep_latest = true; + XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; + XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr; + XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr; } else { @@ -4465,41 +4481,38 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) return; } - last_notified = XLogCtl->lastNotifiedSeg; - - /* - * Update shared memory and discard segment boundaries that are no longer - * needed. - * - * It is safe to update shared memory before we attempt to create the - * .ready files. If our calls to XLogArchiveNotifySeg() fail, - * RemoveOldXlogFiles() will retry it as needed. - */ - if (last_notified < latest_boundary_seg - 1) - XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1; - - if (keep_latest) - { - XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; - XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr; - } - else - { - XLogCtl->earliestSegBoundary = MaxXLogSegNo; - XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; - } - XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr; XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr; - SpinLockRelease(&XLogCtl->segtrack_lck); + if (XLogArchivingActive()) + { + XLogSegNo last_notified; - /* - * Notify archiver about segments that are ready for archival (by creating - * the corresponding .ready files). - */ - for (seg = last_notified + 1; seg < latest_boundary_seg; seg++) - XLogArchiveNotifySeg(seg, false); + last_notified = XLogCtl->lastNotifiedSeg; + + /* + * Update shared memory and discard segment boundaries that are no + * longer needed. + * + * It is safe to have updated shared memory before we attempt to + * create the .ready files. If our calls to XLogArchiveNotifySeg() + * fail, RemoveOldXlogFiles() will retry it as needed. + */ + if (last_notified < latest_boundary_seg - 1) + XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1; + + SpinLockRelease(&XLogCtl->segtrack_lck); + + /* + * Notify archiver about segments that are ready for archival (by + * creating the corresponding .ready files). + */ + for (seg = last_notified + 1; seg < latest_boundary_seg; seg++) + XLogArchiveNotifySeg(seg, false); + } + else + SpinLockRelease(&XLogCtl->segtrack_lck); PgArchWakeup(); } @@ -8776,6 +8789,30 @@ GetFlushRecPtr(void) return LogwrtResult.Flush; } +/* + * GetSafeFlushRecPtr -- Returns a "safe" flush position. + * + * Similar to the above, except that avoid reporting a location that might be + * overwritten if there's a crash before syncing the next segment. + */ +XLogRecPtr +GetSafeFlushRecPtr(void) +{ + XLogRecPtr flush; + + SpinLockAcquire(&XLogCtl->info_lck); + flush = XLogCtl->LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + if (XLogCtl->earliestSegBoundary != MaxXLogSegNo && + XLogCtl->earliestSegBoundaryStartPtr < flush) + flush = XLogCtl->earliestSegBoundaryStartPtr; + SpinLockRelease(&XLogCtl->segtrack_lck); + + return flush; +} + /* * GetLastImportantRecPtr -- Returns the LSN of the last important record * inserted. All records not explicitly marked as unimportant are considered diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..aafb3e91de 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -621,7 +621,7 @@ StartReplication(StartReplicationCmd *cmd) FlushPtr = GetStandbyFlushRecPtr(); } else - FlushPtr = GetFlushRecPtr(); + FlushPtr = GetSafeFlushRecPtr(); if (cmd->timeline != 0) { @@ -2650,7 +2650,7 @@ XLogSendPhysical(void) * primary: if the primary subsequently crashes and restarts, standbys * must not have applied any WAL that got lost on the primary. */ - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = GetSafeFlushRecPtr(); } /* diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 6b6ae81c2d..1af59c36d4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p) extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(void); +extern XLogRecPtr GetSafeFlushRecPtr(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void RemovePromoteSignalFiles(void); extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);