On 2021-Sep-14, Alvaro Herrera wrote: > On 2021-Sep-08, Kyotaro Horiguchi wrote: > > > Thanks! As my understanding the new record add the ability to > > cross-check between a teard-off contrecord and the new record inserted > > after the teard-off record. I didn't test the version by myself but > > the previous version implemented the essential machinery and that > > won't change fundamentally by the new record. > > > > So I think the current patch deserves to see the algorithm actually > > works against the problem. > > Here's a version with the new record type. It passes check-world, and > it seems to work correctly to prevent overwrite of the tail end of a > segment containing a broken record. This is very much WIP still; > comments are missing and I haven't tried to implement any sort of > verification that the record being aborted is the right one.
Here's the attachment I forgot earlier. -- Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/ "[PostgreSQL] is a great group; in my opinion it is THE best open source development communities in existence anywhere." (Lamar Owen)
>From 3de9660b2e570604412d435023dcbd13355fc123 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 2 Sep 2021 17:21:46 -0400 Subject: [PATCH v5] Implement FIRST_IS_ABORTED_CONTRECORD --- src/backend/access/rmgrdesc/xlogdesc.c | 11 +++ src/backend/access/transam/xlog.c | 91 +++++++++++++++++++++++-- src/backend/access/transam/xloginsert.c | 3 + src/backend/access/transam/xlogreader.c | 39 ++++++++++- src/include/access/xlog.h | 1 + src/include/access/xlog_internal.h | 20 +++++- src/include/access/xlogreader.h | 3 + src/include/catalog/pg_control.h | 1 + 8 files changed, 159 insertions(+), 10 deletions(-) diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index e6090a9dad..0be382f8a5 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -139,6 +139,14 @@ xlog_desc(StringInfo buf, XLogReaderState *record) xlrec.ThisTimeLineID, xlrec.PrevTimeLineID, timestamptz_to_str(xlrec.end_time)); } + else if (info == XLOG_OVERWRITE_CONTRECORD) + { + xl_overwrite_contrecord xlrec; + + memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord)); + appendStringInfo(buf, "lsn %X/%X", + LSN_FORMAT_ARGS(xlrec.overwritten_lsn)); + } } const char * @@ -178,6 +186,9 @@ xlog_identify(uint8 info) case XLOG_END_OF_RECOVERY: id = "END_OF_RECOVERY"; break; + case XLOG_OVERWRITE_CONTRECORD: + id = "OVERWRITE_CONTRECORD"; + break; case XLOG_FPI: id = "FPI"; break; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e51a7a749d..73cfd64125 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -586,6 +586,8 @@ typedef struct XLogCtlData XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */ + XLogRecPtr abortedContrecordPtr; /* LSN of incomplete record at end of + * WAL */ /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */ XLogRecPtr unloggedLSN; @@ -894,6 +896,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI); static void LocalSetXLogInsertAllowed(void); static void CreateEndOfRecoveryRecord(void); +static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); @@ -950,6 +953,7 @@ static void rm_redo_error_callback(void *arg); static int get_sync_bit(int method); static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, + bool set_aborted_partial, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos); static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, @@ -1120,8 +1124,9 @@ XLogInsertRecord(XLogRecData *rdata, * All the record data, including the header, is now ready to be * inserted. Copy the record in the space reserved. */ - CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, - StartPos, EndPos); + CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, + flags & XLOG_SET_ABORTED_PARTIAL, + rdata, StartPos, EndPos); /* * Unless record is flagged as not important, update LSN of last @@ -1504,7 +1509,8 @@ checkXLogConsistency(XLogReaderState *record) * area in the WAL. */ static void -CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, +CopyXLogRecordToWAL(int write_len, bool isLogSwitch, + bool set_aborted_partial, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos) { char *currpos; @@ -1560,6 +1566,10 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, pagehdr->xlp_rem_len = write_len - written; pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD; + /* XXX can only happen once in the loop. Verify? */ + if (set_aborted_partial) + pagehdr->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL; + /* skip over the page header */ if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0) { @@ -4394,6 +4404,19 @@ ReadRecord(XLogReaderState *xlogreader, int emode, EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) { + /* + * When WAL ends in an incomplete record, keep track of it. After + * recovery is done, we'll write a record to indicate downstream + * WAL readers that that portion is to be ignored. + */ + if (InRecovery && + !XLogRecPtrIsInvalid(xlogreader->abortedContrecordPtr)) + { + SpinLockAcquire(&XLogCtl->info_lck); + XLogCtl->abortedContrecordPtr = xlogreader->abortedContrecordPtr; + SpinLockRelease(&XLogCtl->info_lck); + } + if (readFile >= 0) { close(readFile); @@ -6502,6 +6525,7 @@ StartupXLOG(void) bool haveTblspcMap = false; XLogRecPtr RecPtr, checkPointLoc, + aborted_lsn, EndOfLog; TimeLineID EndOfLogTLI; TimeLineID PrevTimeLineID; @@ -7655,8 +7679,9 @@ StartupXLOG(void) /* * Kill WAL receiver, if it's still running, before we continue to write - * the startup checkpoint record. It will trump over the checkpoint and - * subsequent records if it's still alive when we start writing WAL. + * the startup checkpoint and aborted-contrecord records. It will trump + * over these records and subsequent ones if it's still alive when we + * start writing WAL. */ XLogShutdownWalRcv(); @@ -7689,8 +7714,12 @@ StartupXLOG(void) StandbyMode = false; /* - * Re-fetch the last valid or last applied record, so we can identify the - * exact endpoint of what we consider the valid portion of WAL. + * Determine where to start writing WAL next. + * + * When recovery ended in an incomplete record, write a WAL record about + * that and continue after it. In all other cases, re-fetch the last + * valid or last applied record, so we can identify the exact endpoint of + * what we consider the valid portion of WAL. */ XLogBeginRead(xlogreader, LastRec); record = ReadRecord(xlogreader, PANIC, false); @@ -7821,6 +7850,17 @@ StartupXLOG(void) XLogCtl->ThisTimeLineID = ThisTimeLineID; XLogCtl->PrevTimeLineID = PrevTimeLineID; + /* XXX explain EndOfLog change */ + SpinLockAcquire(&XLogCtl->info_lck); + if (!XLogRecPtrIsInvalid(XLogCtl->abortedContrecordPtr)) + { + aborted_lsn = EndOfLog = XLogCtl->abortedContrecordPtr; + XLogCtl->abortedContrecordPtr = InvalidXLogRecPtr; + } + else + aborted_lsn = InvalidXLogRecPtr; + SpinLockRelease(&XLogCtl->info_lck); + /* * Prepare to write WAL starting at EndOfLog location, and init xlog * buffer cache using the block containing the last record from the @@ -7880,6 +7920,8 @@ StartupXLOG(void) */ Insert->fullPageWrites = lastFullPageWrites; LocalSetXLogInsertAllowed(); + if (!XLogRecPtrIsInvalid(aborted_lsn)) + CreateOverwriteContrecordRecord(aborted_lsn); UpdateFullPageWrites(); LocalXLogInsertAllowed = -1; @@ -9365,6 +9407,37 @@ CreateEndOfRecoveryRecord(void) LocalXLogInsertAllowed = -1; /* return to "check" state */ } +/* XXX document this */ +static XLogRecPtr +CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn) +{ + xl_overwrite_contrecord xlrec; + XLogRecPtr recptr; + + /* sanity check */ + if (!RecoveryInProgress()) + elog(ERROR, "can only be used at end of recovery"); + + xlrec.overwritten_lsn = aborted_lsn; + /* XXX include the CRC-so-far? */ + + START_CRIT_SECTION(); + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord)); + + /* this record overwrites the tail of the incomplete record */ + XLogSetRecordFlags(XLOG_SET_ABORTED_PARTIAL); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD); + + XLogFlush(recptr); + + END_CRIT_SECTION(); + + return recptr; +} + /* * Flush all data in shared memory to disk, and fsync * @@ -10295,6 +10368,10 @@ xlog_redo(XLogReaderState *record) RecoveryRestartPoint(&checkPoint); } + else if (info == XLOG_OVERWRITE_CONTRECORD) + { + /* XXX verify the LSN of the record we're "aborting" */ + } else if (info == XLOG_END_OF_RECOVERY) { xl_end_of_recovery xlrec; diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index e596a0470a..23c46ba21f 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -409,6 +409,9 @@ XLogRegisterBufData(uint8 block_id, char *data, int len) * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for * durability, which allows to avoid triggering WAL archiving and other * background activity. + * - XLOG_INCLUDE_XID, ??? + * - XLOG_OVERWRITE_CONTRECORD, to signal that XLP_FIRST_IS_ABORTED_PARTIAL + * is to be set. */ void XLogSetRecordFlags(uint8 flags) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5cf74e181a..dbfa6d3562 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) total_len; uint32 targetRecOff; uint32 pageHeaderSize; + bool assembled; bool gotheader; int readOff; @@ -293,6 +294,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) state->errormsg_buf[0] = '\0'; ResetDecoder(state); + state->abortedContrecordPtr = InvalidXLogRecPtr; RecPtr = state->EndRecPtr; @@ -319,7 +321,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) randAccess = true; } +restart: state->currRecPtr = RecPtr; + assembled = false; targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); targetRecOff = RecPtr % XLOG_BLCKSZ; @@ -415,6 +419,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) char *buffer; uint32 gotlen; + assembled = true; + /* * Enlarge readRecordBuf as needed. */ @@ -442,14 +448,28 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) readOff = ReadPageInternal(state, targetPagePtr, Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); - if (readOff < 0) goto err; Assert(SizeOfXLogShortPHD <= readOff); - /* Check that the continuation on next page looks valid */ pageHeader = (XLogPageHeader) state->readBuf; + + /* + * If we were expecting a continuation record and got an "aborted + * partial" flag, that means the continuation record was lost. + * Ignore the record we were reading, since we now know it's broken + * and lost forever, and restart the read by assuming the address + * to read is the location where we found this flag. + */ + if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL) + { + ResetDecoder(state); + RecPtr = targetPagePtr; + goto restart; + } + + /* Check that the continuation on next page looks valid */ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) { report_invalid_record(state, @@ -551,6 +571,21 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) return NULL; err: + if (assembled) + { + /* + * We get here when a record that spans multiple pages needs to be + * assembled, but something went wrong -- perhaps a contrecord piece + * was lost. We deal with this by setting abortedContrecordPtr to the + * location of the piece we failed to read, or the start of the page + * we read where validation failed. If caller is WAL replay, it will + * know that recovery ended and that this is where to start writing + * future WAL marking the next piece with XLP_FIRST_IS_ABORTED_PARTIAL, + * which will in turn signal downstream WAL consumers that the broken + * WAL record here is to be ignored. + */ + state->abortedContrecordPtr = targetPagePtr; + } /* * Invalidate the read state. We might read from a different source after diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0a8ede700d..caed9294a8 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -213,6 +213,7 @@ extern bool XLOG_DEBUG; #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ #define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */ +#define XLOG_SET_ABORTED_PARTIAL 0x08 /* set the aborted-partial page flag */ /* Checkpoint statistics */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 3b5eceff65..0fb44ba37d 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; #define XLP_LONG_HEADER 0x0002 /* This flag indicates backup blocks starting in this page are optional */ #define XLP_BKP_REMOVABLE 0x0004 +/* + * This flag marks a record that replaces a missing contrecord. + * When on WAL replay we expect a continuation record at the start of + * a page that is not there, recovery ends but the checkpoint record + * that follows is marked with this flag, which indicates WAL readers + * that the incomplete record is to be skipped, and that WAL reading + * is to be resumed here. This is useful for downstream consumers of + * WAL which have already received (the first half of) the original + * broken WAL record, such as via archive_command or physical streaming + * replication, which we cannot "rewind". + */ +#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008 /* All defined flag bits in xlp_info (used for validity checking of header) */ -#define XLP_ALL_FLAGS 0x0007 +#define XLP_ALL_FLAGS 0x000F #define XLogPageHeaderSize(hdr) \ (((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD) @@ -249,6 +261,12 @@ typedef struct xl_restore_point char rp_name[MAXFNAMELEN]; } xl_restore_point; +/* Overwrite of prior contrecord */ +typedef struct xl_overwrite_contrecord +{ + XLogRecPtr overwritten_lsn; +} xl_overwrite_contrecord; + /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */ typedef struct xl_end_of_recovery { diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 21d200d3df..96e5eab1c9 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -175,6 +175,9 @@ struct XLogReaderState XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + /* end+1 of incomplete record at end of WAL */ + XLogRecPtr abortedContrecordPtr; + /* ---------------------------------------- * Decoded representation of current record diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index e3f48158ce..179efa9f4a 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -76,6 +76,7 @@ typedef struct CheckPoint #define XLOG_END_OF_RECOVERY 0x90 #define XLOG_FPI_FOR_HINT 0xA0 #define XLOG_FPI 0xB0 +#define XLOG_OVERWRITE_CONTRECORD 0xC0 /* -- 2.30.2