On 2021-Jul-30, Bossart, Nathan wrote: > Yes, that was what I was worried about. However, I just performed a > variety of tests with just 0001 applied, and I am beginning to suspect > my concerns were unfounded. With wal_buffers set very high, > synchronous_commit set to off, and a long sleep at the end of > XLogWrite(), I can reliably cause the archive status files to lag far > behind the current open WAL segment. However, even if I crash at this > time, the .ready files are created when the server restarts (albeit > out of order).
I think that creating files out of order might be problematic. On the archiver side, pgarch_readyXlog() expects to return the oldest archivable file; but if we create a newer segment's .ready file first, it is possible that a directory scan would return that newer file before the older segment's .ready file appears. However, the comments in pgarch_readyXlog() aren't super convincing that processing the files in order is actually a correctness requirement, so perhaps it doesn't matter all that much. I noticed that XLogCtl->lastNotifiedSeg is protected by both the info_lck and ArchNotifyLock. I think it it's going to be protected by the lwlock, then we should drop the use of the spinlock. We set archiver's latch on each XLogArchiveNotify(), but if we're doing it in a loop such as in NotifySegmentsReadyForArchive() perhaps it is better to create all the .ready files first and do PgArchWakeup() at the end. I'm not convinced that this is useful but let's at least discard the idea explicitly if not. -- Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/ "Always assume the user will do much worse than the stupidest thing you can imagine." (Julien PUYDT)
>From f9d94c614d6cf640fdaa09785d0817a1d86b9658 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 30 Jul 2021 18:35:52 -0400 Subject: [PATCH v2] Avoid creating archive status ".ready" files too early. WAL records may span multiple segments, but XLogWrite() does not wait for the entire record to be written out to disk before creating archive status files. Instead, as soon as the last WAL page of the segment is written, the archive status file will be created. If PostgreSQL crashes before it is able to write the rest of the record, it will end up reusing segments that have already been marked as ready-for-archival. However, the archiver process may have already processed the old version of the segment, so the wrong version of the segment may be backed-up. This backed-up segment will cause operations such as point-in-time restores to fail. To fix this, we keep track of records that span across segments and ensure that segments are only marked ready-for-archival once such records have been completely written to disk. --- src/backend/access/transam/timeline.c | 2 +- src/backend/access/transam/xlog.c | 279 ++++++++++++++++++++++- src/backend/access/transam/xlogarchive.c | 14 +- src/backend/replication/walreceiver.c | 6 +- src/backend/storage/lmgr/lwlocknames.txt | 1 + src/include/access/xlogarchive.h | 4 +- src/include/access/xlogdefs.h | 3 + 7 files changed, 289 insertions(+), 20 deletions(-) diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index 8d0903c175..acd5c2431d 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -452,7 +452,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, if (XLogArchivingActive()) { TLHistoryFileName(histfname, newTLI); - XLogArchiveNotify(histfname); + XLogArchiveNotify(histfname, true); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 26fa2b6c8f..700bbccff8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -531,6 +531,13 @@ typedef enum ExclusiveBackupState */ static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE; +/* entries for RecordBoundaryMap, used to mark segments ready for archival */ +typedef struct RecordBoundaryEntry +{ + XLogSegNo seg; /* must be first */ + XLogRecPtr pos; +} RecordBoundaryEntry; + /* * Shared state data for WAL insertion. */ @@ -742,6 +749,12 @@ typedef struct XLogCtlData */ XLogRecPtr lastFpwDisableRecPtr; + /* + * The last segment we've marked ready for archival. Protected by + * ArchNotifyLock. + */ + XLogSegNo lastNotifiedSeg; + slock_t info_lck; /* locks shared variables shown above */ } XLogCtlData; @@ -755,6 +768,12 @@ static WALInsertLockPadded *WALInsertLocks = NULL; */ static ControlFileData *ControlFile = NULL; +/* + * Record boundary map, used for marking segments as ready for archival. + * Protected by ArchNotifyLock. + */ +static HTAB *RecordBoundaryMap = NULL; + /* * Calculate the amount of space left on the page after 'endptr'. Beware * multiple evaluation! @@ -983,6 +1002,12 @@ static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); static void checkXLogConsistency(XLogReaderState *record); +static void RegisterRecordBoundaryEntry(XLogSegNo seg, XLogRecPtr pos); +static void NotifySegmentsReadyForArchive(void); +static XLogSegNo GetLastNotifiedSegment(void); +static void SetLastNotifiedSegment(XLogSegNo seg); +static XLogSegNo GetLatestRecordBoundarySegment(void); +static void RemoveRecordBoundariesUpTo(XLogSegNo seg); static void WALInsertLockAcquire(void); static void WALInsertLockAcquireExclusive(void); @@ -1030,6 +1055,8 @@ XLogInsertRecord(XLogRecData *rdata, info == XLOG_SWITCH); XLogRecPtr StartPos; XLogRecPtr EndPos; + XLogSegNo StartSeg; + XLogSegNo EndSeg; bool prevDoPageWrites = doPageWrites; /* we assume that all of the record header is in the first chunk */ @@ -1188,6 +1215,31 @@ XLogInsertRecord(XLogRecData *rdata, SpinLockRelease(&XLogCtl->info_lck); } + /* + * Record the record boundary if we crossed the segment boundary. This is + * used to ensure that segments are not marked ready for archival before the + * entire record has been flushed to disk. + * + * Note that we do not use XLByteToPrevSeg() for determining the ending + * segment. This is done so that a record that fits perfectly into the end + * of the segment is marked ready for archival as soon as the flushed + * pointer jumps to the next segment. + */ + XLByteToSeg(StartPos, StartSeg, wal_segment_size); + XLByteToSeg(EndPos, EndSeg, wal_segment_size); + + if (StartSeg != EndSeg && + XLogArchivingActive()) + { + RegisterRecordBoundaryEntry(EndSeg, EndPos); + + /* + * There's a chance that the record was already flushed to disk and we + * missed marking segments as ready for archive, so try to do that now. + */ + NotifySegmentsReadyForArchive(); + } + /* * If this was an XLOG_SWITCH record, flush the record and the empty * padding space that fills the rest of the segment, and perform @@ -1285,6 +1337,197 @@ XLogInsertRecord(XLogRecData *rdata, return EndPos; } +/* + * RegisterRecordBoundaryEntry + * + * This enters a new entry into the record boundary map, which is used for + * determing when it is safe to mark a segment as ready for archival. An entry + * with the given key (the segment number) must not already exist in the map. + * Also, the caller is responsible for ensuring that XLByteToSeg() would return + * the same segment number for the given record pointer. + */ +static void +RegisterRecordBoundaryEntry(XLogSegNo seg, XLogRecPtr pos) +{ + RecordBoundaryEntry *entry; + bool found; + + LWLockAcquire(ArchNotifyLock, LW_EXCLUSIVE); + + entry = (RecordBoundaryEntry *) hash_search(RecordBoundaryMap, + (void *) &seg, HASH_ENTER, + &found); + if (found) + elog(ERROR, "record boundary entry for segment already exists"); + + entry->pos = pos; + + LWLockRelease(ArchNotifyLock); +} + +/* + * NotifySegmentsReadyForArchive + * + * This function marks segments as ready for archival, given that it is safe to + * do so. It is safe to call this function repeatedly, even if nothing has + * changed since the last time it was called. + */ +static void +NotifySegmentsReadyForArchive(void) +{ + XLogRecPtr flushed; + XLogSegNo flushed_seg; + XLogSegNo latest_boundary_seg; + XLogSegNo last_notified; + + /* + * We first do a quick sanity check to see if we can bail out without taking + * the ArchNotifyLock at all. It is expected that this function will run + * frequently and that it will need to do nothing the vast majority of the + * time. + * + * Specifically, we bail out if the shared memory value for the last + * notified segment has not yet been initialized or if we've already marked + * the segment prior to the segment that contains "flushed" as ready for + * archival. We intentionally use XLByteToSeg() instead of + * XLByteToPrevSeg() so that we don't skip notifying when a record fits + * perfectly into the end of a segment. ("flushed" should point to the + * first byte of the record _after_ the one that is known to be flushed to + * disk.) + */ + LWLockAcquire(ArchNotifyLock, LW_SHARED); + last_notified = GetLastNotifiedSegment(); + LWLockRelease(ArchNotifyLock); + + if (XLogSegNoIsInvalid(last_notified)) + return; + + flushed = GetFlushRecPtr(); + XLByteToSeg(flushed, flushed_seg, wal_segment_size); + if (last_notified >= flushed_seg - 1) + return; + + /* + * At this point, we must acquire ArchNotifyLock before proceeding. In this + * section, we look for the latest record boundary in RecordBoundaryMap that + * is less than or equal to the current "flushed" pointer, and we notify the + * archiver that all segments up to (but not including) that boundary's + * associated segment are ready for archival. + */ + LWLockAcquire(ArchNotifyLock, LW_EXCLUSIVE); + + latest_boundary_seg = GetLatestRecordBoundarySegment(); + if (!XLogSegNoIsInvalid(latest_boundary_seg)) + { + XLogSegNo i; + + /* create the archive status files */ + for (i = GetLastNotifiedSegment() + 1; i < latest_boundary_seg; i++) + XLogArchiveNotifySeg(i, false); + + /* update shared memory */ + SetLastNotifiedSegment(latest_boundary_seg - 1); + + /* remove old boundaries from the map */ + RemoveRecordBoundariesUpTo(latest_boundary_seg); + + PgArchWakeup(); + } + + LWLockRelease(ArchNotifyLock); +} + +/* + * GetLatestRecordBoundarySegment + * + * This function finds the latest record boundary in RecordBoundaryMap that is + * less than or equal to the current "flushed" pointer and returns its + * associated segment number, given that it is greater than the last notified + * segment. Otherwise, InvalidXLogSegNo is returned. + * + * Caller is expected to be holding ArchNotifyLock. + */ +static XLogSegNo +GetLatestRecordBoundarySegment(void) +{ + XLogRecPtr flushed; + XLogSegNo flushed_seg; + XLogSegNo last_notified; + + flushed = GetFlushRecPtr(); + XLByteToSeg(flushed, flushed_seg, wal_segment_size); + last_notified = GetLastNotifiedSegment(); + + for (XLogSegNo i = flushed_seg; i > last_notified; i--) + { + RecordBoundaryEntry *entry; + + entry = (RecordBoundaryEntry *) hash_search(RecordBoundaryMap, + (void *) &i, HASH_FIND, + NULL); + + if (entry != NULL && flushed >= entry->pos) + return entry->seg; + } + + return InvalidXLogSegNo; +} + +/* + * RemoveOldRecordBoundaries + * + * This function removes all entries in the RecordBoundaryMap with segment + * numbers up to an including seg. + * + * Caller is expected to be holding ArchNotifyLock. + */ +static void +RemoveRecordBoundariesUpTo(XLogSegNo seg) +{ + RecordBoundaryEntry *entry; + HASH_SEQ_STATUS status; + + hash_seq_init(&status, RecordBoundaryMap); + + while ((entry = (RecordBoundaryEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->seg <= seg) + (void) hash_search(RecordBoundaryMap, (void *) &entry->seg, + HASH_REMOVE, NULL); + } +} + +/* + * GetLastNotifiedSegment + * + * Retrieves last notified segment from shared memory. + */ +XLogSegNo +GetLastNotifiedSegment(void) +{ + XLogSegNo seg; + + Assert(LWLockHeldByMe(ArchNotifyLock)); + + seg = XLogCtl->lastNotifiedSeg; + + return seg; +} + +/* + * SetLastNotifiedSegment + * + * Sets last notified segment in shared memory. Callers should hold + * ArchNotifyLock exclusively when calling this function. + */ +static void +SetLastNotifiedSegment(XLogSegNo seg) +{ + Assert(LWLockHeldByMeInMode(ArchNotifyLock, LW_EXCLUSIVE)); + + XLogCtl->lastNotifiedSeg = seg; +} + /* * Reserves the right amount of space for a record of given size from the WAL. * *StartPos is set to the beginning of the reserved section, *EndPos to @@ -2607,11 +2850,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage, and to update the - * timer for archive_timeout, and to signal for a checkpoint if - * too many logfile segments have been used since the last - * checkpoint. + * This is also the right place to update the timer for + * archive_timeout and to signal for a checkpoint if too many + * logfile segments have been used since the last checkpoint. If + * lastNotifiedSeg hasn't been initialized yet, we need to do that, + * too. */ if (finishing_seg) { @@ -2623,7 +2866,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); + { + LWLockAcquire(ArchNotifyLock, LW_EXCLUSIVE); + + if (XLogSegNoIsInvalid(GetLastNotifiedSegment())) + SetLastNotifiedSegment(openLogSegNo - 1); + + LWLockRelease(ArchNotifyLock); + } XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2711,6 +2961,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease(&XLogCtl->info_lck); } + + if (XLogArchivingActive()) + NotifySegmentsReadyForArchive(); } /* @@ -5140,6 +5393,9 @@ XLOGShmemSize(void) /* and the buffers themselves */ size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers)); + /* stuff for marking segments as ready for archival */ + size = add_size(size, hash_estimate_size(16, sizeof(RecordBoundaryEntry))); + /* * Note: we don't count ControlFileData, it comes out of the "slop factor" * added by CreateSharedMemoryAndSemaphores. This lets us use this @@ -5157,6 +5413,7 @@ XLOGShmemInit(void) char *allocptr; int i; ControlFileData *localControlFile; + HASHCTL info; #ifdef WAL_DEBUG @@ -5250,12 +5507,20 @@ XLOGShmemInit(void) XLogCtl->InstallXLogFileSegmentActive = false; XLogCtl->SharedPromoteIsTriggered = false; XLogCtl->WalWriterSleeping = false; + XLogCtl->lastNotifiedSeg = InvalidXLogSegNo; SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogCtl->recoveryNotPausedCV); + + /* Initialize stuff for marking segments as ready for archival. */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(XLogSegNo); + info.entrysize = sizeof(RecordBoundaryEntry); + RecordBoundaryMap = ShmemInitHash("Record Boundary Table", 16, 16, &info, + HASH_ELEM | HASH_BLOBS); } /* @@ -8016,7 +8281,7 @@ StartupXLOG(void) XLogArchiveCleanup(partialfname); durable_rename(origpath, partialpath, ERROR); - XLogArchiveNotify(partialfname); + XLogArchiveNotify(partialfname, true); } } } diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index 26b023e754..e9cac90b4a 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -433,7 +433,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); /* * If the existing file was replaced, since walsenders might have it open, @@ -464,7 +464,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * then when complete, rename it to 0000000100000001000000C6.done */ void -XLogArchiveNotify(const char *xlog) +XLogArchiveNotify(const char *xlog, bool notify) { char archiveStatusPath[MAXPGPATH]; FILE *fd; @@ -489,8 +489,8 @@ XLogArchiveNotify(const char *xlog) return; } - /* Notify archiver that it's got something to do */ - if (IsUnderPostmaster) + /* If caller requested, notify archiver that it's got something to do */ + if (notify) PgArchWakeup(); } @@ -498,12 +498,12 @@ XLogArchiveNotify(const char *xlog) * Convenience routine to notify using segment number representation of filename */ void -XLogArchiveNotifySeg(XLogSegNo segno) +XLogArchiveNotifySeg(XLogSegNo segno, bool notify) { char xlog[MAXFNAMELEN]; XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size); - XLogArchiveNotify(xlog); + XLogArchiveNotify(xlog, notify); } /* @@ -608,7 +608,7 @@ XLogArchiveCheckDone(const char *xlog) return true; /* Retry creation of the .ready file */ - XLogArchiveNotify(xlog); + XLogArchiveNotify(xlog, true); return false; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9a2bc37fd7..60de3be92c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -622,7 +622,7 @@ WalReceiverMain(void) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); } recvFile = -1; @@ -760,7 +760,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(fname); else - XLogArchiveNotify(fname); + XLogArchiveNotify(fname, true); pfree(fname); pfree(content); @@ -915,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); } recvFile = -1; diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 6c7cf6c295..d39225bf94 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -53,3 +53,4 @@ XactTruncationLock 44 # 45 was XactTruncationLock until removal of BackendRandomLock WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 +ArchNotifyLock 48 diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h index 3edd1a976c..e25c3d8117 100644 --- a/src/include/access/xlogarchive.h +++ b/src/include/access/xlogarchive.h @@ -23,8 +23,8 @@ extern bool RestoreArchivedFile(char *path, const char *xlogfname, extern void ExecuteRecoveryCommand(const char *command, const char *commandName, bool failOnSignal); extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname); -extern void XLogArchiveNotify(const char *xlog); -extern void XLogArchiveNotifySeg(XLogSegNo segno); +extern void XLogArchiveNotify(const char *xlog, bool notify); +extern void XLogArchiveNotifySeg(XLogSegNo segno, bool notify); extern void XLogArchiveForceDone(const char *xlog); extern bool XLogArchiveCheckDone(const char *xlog); extern bool XLogArchiveIsBusy(const char *xlog); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 60348d1850..8d172f887a 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -47,6 +47,9 @@ typedef uint64 XLogRecPtr; */ typedef uint64 XLogSegNo; +#define InvalidXLogSegNo 0xFFFFFFFFFFFFFFFF +#define XLogSegNoIsInvalid(s) ((s) == InvalidXLogSegNo) + /* * TimeLineID (TLI) - identifies different database histories to prevent * confusion after restoring a prior state of a database installation. -- 2.20.1