On 11/6/24 21:30, Zhijie Hou (Fujitsu) wrote:
Thanks for the patch! I am reading the patch and noticed few minor things. 1. + /* + * This is a local transaction. Make sure that the xact_time + * higher than any timestamp we have seen thus far. + * + * TODO: This is not postmaster restart safe. If the local + * system clock is further behind other nodes than it takes + * for the postmaster to restart (time between it stops + * accepting new transactions and time when it becomes ready + * to accept new transactions), local transactions will not + * be bumped into the future correctly. + */ The TODO section mentions other nodes, but I believe think patch currently do not have the handling of timestamps for other nodes. Should we either remove this part or add a brief explanation to clarify the relationship with other nodes?
That TODO is actually obsolete. I understood from Amit Kapila that the community does assume that NTP synchronization is good enough. And it indeed is. Even my servers here at home are using a GPS based NTP server connected to the LAN and are usually in sync by single-digit microseconds. I removed it.
2. +/* + * Hook function to be called while holding the WAL insert spinlock. + * to adjust commit timestamps via Lamport clock if needed. + */ The second line seems can be improved: "to adjust commit timestamps .." => "It adjusts commit timestamps ..."
How about /* * Hook function to be called while holding the WAL insert spinlock. * It guarantees that commit timestamps are advancing in LSN order. */ static void EnsureMonotonicTransactionStopTimestamp(void *data); Thank you for looking at this and your input. New patch attached. Best Regards, Jan
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b7ebcc2a55..0406dc44be 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -370,6 +370,12 @@ static void ShowTransactionStateRec(const char *str, TransactionState s); static const char *BlockStateAsString(TBlockState blockState); static const char *TransStateAsString(TransState state); +/* + * Hook function to be called while holding the WAL insert spinlock. + * It guarantees that commit timestamps are advancing in LSN order. + */ +static void EnsureMonotonicTransactionStopTimestamp(void *data); + /* ---------------------------------------------------------------- * transaction state accessors @@ -2214,6 +2220,13 @@ StartTransaction(void) if (TransactionTimeout > 0) enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout); + /* + * Reset XLogReserveInsertHook (function called while holding + * the WAL insert spinlock) + */ + XLogReserveInsertHook = NULL; + XLogReserveInsertHookData = NULL; + ShowTransactionState("StartTransaction"); } @@ -5831,6 +5844,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; uint8 info; + XLogRecPtr result; Assert(CritSectionCount > 0); @@ -5974,7 +5988,19 @@ XactLogCommitRecord(TimestampTz commit_time, /* we allow filtering by xacts */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_XACT_ID, info); + /* + * Install our hook for the call to ReserveXLogInsertLocation() so that + * we can modify the xactStopTimestamp and the xact_time of the xlrec + * while holding the lock that determines the commit-LSN to ensure + * the commit timestamps are monotonically increasing. + */ + XLogReserveInsertHook = EnsureMonotonicTransactionStopTimestamp; + XLogReserveInsertHookData = (void *)&xlrec; + result = XLogInsert(RM_XACT_ID, info); + XLogReserveInsertHook = NULL; + XLogReserveInsertHookData = NULL; + + return result; } /* @@ -6445,3 +6471,33 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * Hook function used in XactLogCommitRecord() to ensure that the + * commit timestamp is monotonically increasing in commit-LSN order. + */ +static void +EnsureMonotonicTransactionStopTimestamp(void *data) +{ + xl_xact_commit *xlrec = (xl_xact_commit *)data; + TimestampTz logical_clock; + + logical_clock = XLogGetLastTransactionStopTimestamp(); + + /* + * This is a local transaction. Make sure that the xact_time + * higher than any timestamp we have seen thus far. + */ + if (logical_clock >= xlrec->xact_time) + { + logical_clock++; + xlrec->xact_time = logical_clock; + xactStopTimestamp = logical_clock; + + XLogReserveInsertHookModifiedRecord = true; + } + else + logical_clock = xlrec->xact_time; + + XLogSetLastTransactionStopTimestamp(logical_clock); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6f58412bca..bb70a7de09 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -155,6 +155,16 @@ int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; */ int CheckPointSegments; +/* + * Hook to be called inside of ReserveXLogInsertLocation() for + * operations that need to be performed while holding the WAL + * insert spinlock. Currently this is used to guarantee a monotonically + * increasing commit-timestamp in LSN order. + */ +XLogReserveInsertHookType XLogReserveInsertHook = NULL; +void *XLogReserveInsertHookData = NULL; +bool XLogReserveInsertHookModifiedRecord = false; + /* Estimated distance between checkpoints, in bytes */ static double CheckPointDistanceEstimate = 0; static double PrevCheckPointDistance = 0; @@ -551,6 +561,14 @@ typedef struct XLogCtlData XLogRecPtr lastFpwDisableRecPtr; slock_t info_lck; /* locks shared variables shown above */ + + /* + * This is our shared, logical clock that we use to force + * commit timestamps to be monotonically increasing in + * commit-LSN order. This is protected by the Wal-insert + * spinlock. + */ + TimestampTz lastTransactionStopTimestamp; } XLogCtlData; /* @@ -700,6 +718,7 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos, TimeLineID tli); +static void XLogRecordCorrectCRC(XLogRecData *rdata); static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, @@ -778,6 +797,12 @@ XLogInsertRecord(XLogRecData *rdata, if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery"); + /* + * Make sure the flag telling that ReserveXLog...() modified the + * record is false at this point. + */ + XLogReserveInsertHookModifiedRecord = false; + /* * Given that we're not in recovery, InsertTimeLineID is set and can't * change, so we can read it without a lock. @@ -906,6 +931,16 @@ XLogInsertRecord(XLogRecData *rdata, if (inserted) { + /* + * If our reserve hook modified the XLog Record, + * recalculate the CRC. + */ + if (XLogReserveInsertHookModifiedRecord) + { + XLogRecordCorrectCRC(rdata); + XLogReserveInsertHookModifiedRecord = false; + } + /* * Now that xl_prev has been filled in, calculate CRC of the record * header. @@ -1086,6 +1121,25 @@ XLogInsertRecord(XLogRecData *rdata, return EndPos; } +/* + * Function to recalculate the WAL Record's CRC in case it was + * altered during the callback from ReserveXLogInsertLocation(). + */ +static void +XLogRecordCorrectCRC(XLogRecData *rdata) +{ + XLogRecData *rdt; + XLogRecord *rechdr = (XLogRecord *)rdata->data; + pg_crc32c rdata_crc; + + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, rdata->data + SizeOfXLogRecord, rdata->len - SizeOfXLogRecord); + for (rdt = rdata->next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + + rechdr->xl_crc = rdata_crc; +} + /* * Reserves the right amount of space for a record of given size from the WAL. * *StartPos is set to the beginning of the reserved section, *EndPos to @@ -1130,6 +1184,12 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, */ SpinLockAcquire(&Insert->insertpos_lck); + /* + * If set call the XLogReserveInsertHook function + */ + if (XLogReserveInsertHook != NULL) + XLogReserveInsertHook(XLogReserveInsertHookData); + startbytepos = Insert->CurrBytePos; endbytepos = startbytepos + size; prevbytepos = Insert->PrevBytePos; @@ -1189,6 +1249,12 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) return false; } + /* + * If set call the XLogReserveInsertHook function + */ + if (XLogReserveInsertHook != NULL) + XLogReserveInsertHook(XLogReserveInsertHookData); + endbytepos = startbytepos + size; prevbytepos = Insert->PrevBytePos; @@ -9510,3 +9576,19 @@ SetWalWriterSleeping(bool sleeping) XLogCtl->WalWriterSleeping = sleeping; SpinLockRelease(&XLogCtl->info_lck); } + +/* + * Get/set the last-transaction-stop-timestamp in shared memory. + * Caller must ensure that the WAL-insert spinlock is held. + */ +extern TimestampTz +XLogGetLastTransactionStopTimestamp(void) +{ + return XLogCtl->lastTransactionStopTimestamp; +} + +extern void +XLogSetLastTransactionStopTimestamp(TimestampTz ts) +{ + XLogCtl->lastTransactionStopTimestamp = ts; +} diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 34ad46c067..187d1dc9bc 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -199,6 +199,17 @@ typedef enum WALAvailability struct XLogRecData; struct XLogReaderState; +/* + * Hook called from inside of holding the lock that determines + * the LSN order of WAL records. We currently use this to ensure that + * commit timestamps are monotonically increasing in their LSN + * order. + */ +typedef void (*XLogReserveInsertHookType)(void *data); +extern XLogReserveInsertHookType XLogReserveInsertHook; +extern void *XLogReserveInsertHookData; +extern bool XLogReserveInsertHookModifiedRecord; + extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, @@ -270,6 +281,13 @@ extern void SetInstallXLogFileSegmentActive(void); extern bool IsInstallXLogFileSegmentActive(void); extern void XLogShutdownWalRcv(void); +/* + * Functions to access the last commit Lamport timestamp held in + * XLogCtl. + */ +extern TimestampTz XLogGetLastTransactionStopTimestamp(void); +extern void XLogSetLastTransactionStopTimestamp(TimestampTz tz); + /* * Routines to start, stop, and get status of a base backup. */