On 11/6/24 21:30, Zhijie Hou (Fujitsu) wrote:

Thanks for the patch! I am reading the patch and noticed few minor things.

1.
+       /*
+        * This is a local transaction. Make sure that the xact_time
+        * higher than any timestamp we have seen thus far.
+        *
+        * TODO: This is not postmaster restart safe. If the local
+        * system clock is further behind other nodes than it takes
+        * for the postmaster to restart (time between it stops
+        * accepting new transactions and time when it becomes ready
+        * to accept new transactions), local transactions will not
+        * be bumped into the future correctly.
+        */

The TODO section mentions other nodes, but I believe think patch currently do
not have the handling of timestamps for other nodes. Should we either remove
this part or add a brief explanation to clarify the relationship with other
nodes?

That TODO is actually obsolete. I understood from Amit Kapila that the community does assume that NTP synchronization is good enough. And it indeed is. Even my servers here at home are using a GPS based NTP server connected to the LAN and are usually in sync by single-digit microseconds. I removed it.


2.
+/*
+ * Hook function to be called while holding the WAL insert spinlock.
+ * to adjust commit timestamps via Lamport clock if needed.
+ */

The second line seems can be improved:
"to adjust commit timestamps .." => "It adjusts commit timestamps ..."

How about

/*
 * Hook function to be called while holding the WAL insert spinlock.
 * It guarantees that commit timestamps are advancing in LSN order.
 */
static void EnsureMonotonicTransactionStopTimestamp(void *data);


Thank you for looking at this and your input. New patch attached.


Best Regards, Jan
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7ebcc2a55..0406dc44be 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -370,6 +370,12 @@ static void ShowTransactionStateRec(const char *str, TransactionState s);
 static const char *BlockStateAsString(TBlockState blockState);
 static const char *TransStateAsString(TransState state);
 
+/*
+ * Hook function to be called while holding the WAL insert spinlock.
+ * It guarantees that commit timestamps are advancing in LSN order.
+ */
+static void EnsureMonotonicTransactionStopTimestamp(void *data);
+
 
 /* ----------------------------------------------------------------
  *	transaction state accessors
@@ -2214,6 +2220,13 @@ StartTransaction(void)
 	if (TransactionTimeout > 0)
 		enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout);
 
+	/*
+	 * Reset XLogReserveInsertHook (function called while holding
+	 * the WAL insert spinlock)
+	 */
+	XLogReserveInsertHook = NULL;
+	XLogReserveInsertHookData = NULL;
+
 	ShowTransactionState("StartTransaction");
 }
 
@@ -5831,6 +5844,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
@@ -5974,7 +5988,19 @@ XactLogCommitRecord(TimestampTz commit_time,
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/*
+	 * Install our hook for the call to ReserveXLogInsertLocation() so that
+	 * we can modify the xactStopTimestamp and the xact_time of the xlrec
+	 * while holding the lock that determines the commit-LSN to ensure
+	 * the commit timestamps are monotonically increasing.
+	 */
+	XLogReserveInsertHook = EnsureMonotonicTransactionStopTimestamp;
+	XLogReserveInsertHookData = (void *)&xlrec;
+	result = XLogInsert(RM_XACT_ID, info);
+	XLogReserveInsertHook = NULL;
+	XLogReserveInsertHookData = NULL;
+
+	return result;
 }
 
 /*
@@ -6445,3 +6471,33 @@ xact_redo(XLogReaderState *record)
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * Hook function used in XactLogCommitRecord() to ensure that the
+ * commit timestamp is monotonically increasing in commit-LSN order.
+ */
+static void
+EnsureMonotonicTransactionStopTimestamp(void *data)
+{
+	xl_xact_commit	   *xlrec = (xl_xact_commit *)data;
+	TimestampTz			logical_clock;
+
+	logical_clock = XLogGetLastTransactionStopTimestamp();
+
+	/*
+	 * This is a local transaction. Make sure that the xact_time
+	 * higher than any timestamp we have seen thus far.
+	 */
+	if (logical_clock >= xlrec->xact_time)
+	{
+		logical_clock++;
+		xlrec->xact_time = logical_clock;
+		xactStopTimestamp = logical_clock;
+
+		XLogReserveInsertHookModifiedRecord = true;
+	}
+	else
+		logical_clock = xlrec->xact_time;
+
+	XLogSetLastTransactionStopTimestamp(logical_clock);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bca..bb70a7de09 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -155,6 +155,16 @@ int			wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
  */
 int			CheckPointSegments;
 
+/*
+ * Hook to be called inside of ReserveXLogInsertLocation() for
+ * operations that need to be performed while holding the WAL
+ * insert spinlock. Currently this is used to guarantee a monotonically
+ * increasing commit-timestamp in LSN order.
+ */
+XLogReserveInsertHookType	XLogReserveInsertHook = NULL;
+void					   *XLogReserveInsertHookData = NULL;
+bool						XLogReserveInsertHookModifiedRecord = false;
+
 /* Estimated distance between checkpoints, in bytes */
 static double CheckPointDistanceEstimate = 0;
 static double PrevCheckPointDistance = 0;
@@ -551,6 +561,14 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastFpwDisableRecPtr;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	/*
+	 * This is our shared, logical clock that we use to force
+	 * commit timestamps to be monotonically increasing in
+	 * commit-LSN order. This is protected by the Wal-insert
+	 * spinlock.
+	 */
+	TimestampTz	lastTransactionStopTimestamp;
 } XLogCtlData;
 
 /*
@@ -700,6 +718,7 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos,
 								TimeLineID tli);
+static void XLogRecordCorrectCRC(XLogRecData *rdata);
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -778,6 +797,12 @@ XLogInsertRecord(XLogRecData *rdata,
 	if (!XLogInsertAllowed())
 		elog(ERROR, "cannot make new WAL entries during recovery");
 
+	/*
+	 * Make sure the flag telling that ReserveXLog...() modified the
+	 * record is false at this point.
+	 */
+	XLogReserveInsertHookModifiedRecord = false;
+
 	/*
 	 * Given that we're not in recovery, InsertTimeLineID is set and can't
 	 * change, so we can read it without a lock.
@@ -906,6 +931,16 @@ XLogInsertRecord(XLogRecData *rdata,
 
 	if (inserted)
 	{
+		/*
+		 * If our reserve hook modified the XLog Record,
+		 * recalculate the CRC.
+		 */
+		if (XLogReserveInsertHookModifiedRecord)
+		{
+			XLogRecordCorrectCRC(rdata);
+			XLogReserveInsertHookModifiedRecord = false;
+		}
+
 		/*
 		 * Now that xl_prev has been filled in, calculate CRC of the record
 		 * header.
@@ -1086,6 +1121,25 @@ XLogInsertRecord(XLogRecData *rdata,
 	return EndPos;
 }
 
+/*
+ * Function to recalculate the WAL Record's CRC in case it was
+ * altered during the callback from ReserveXLogInsertLocation().
+ */
+static void
+XLogRecordCorrectCRC(XLogRecData *rdata)
+{
+	XLogRecData    *rdt;
+	XLogRecord	   *rechdr = (XLogRecord *)rdata->data;
+	pg_crc32c       rdata_crc;
+
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdata->data + SizeOfXLogRecord, rdata->len - SizeOfXLogRecord);
+	for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Reserves the right amount of space for a record of given size from the WAL.
  * *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1130,6 +1184,12 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	 */
 	SpinLockAcquire(&Insert->insertpos_lck);
 
+	/*
+	 * If set call the XLogReserveInsertHook function
+	 */
+	if (XLogReserveInsertHook != NULL)
+		XLogReserveInsertHook(XLogReserveInsertHookData);
+
 	startbytepos = Insert->CurrBytePos;
 	endbytepos = startbytepos + size;
 	prevbytepos = Insert->PrevBytePos;
@@ -1189,6 +1249,12 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 		return false;
 	}
 
+	/*
+	 * If set call the XLogReserveInsertHook function
+	 */
+	if (XLogReserveInsertHook != NULL)
+		XLogReserveInsertHook(XLogReserveInsertHookData);
+
 	endbytepos = startbytepos + size;
 	prevbytepos = Insert->PrevBytePos;
 
@@ -9510,3 +9576,19 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+/*
+ * Get/set the last-transaction-stop-timestamp in shared memory.
+ * Caller must ensure that the WAL-insert spinlock is held.
+ */
+extern TimestampTz
+XLogGetLastTransactionStopTimestamp(void)
+{
+	return XLogCtl->lastTransactionStopTimestamp;
+}
+
+extern void
+XLogSetLastTransactionStopTimestamp(TimestampTz ts)
+{
+	XLogCtl->lastTransactionStopTimestamp = ts;
+}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 34ad46c067..187d1dc9bc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,6 +199,17 @@ typedef enum WALAvailability
 struct XLogRecData;
 struct XLogReaderState;
 
+/*
+ * Hook called from inside of holding the lock that determines
+ * the LSN order of WAL records. We currently use this to ensure that
+ * commit timestamps are monotonically increasing in their LSN
+ * order.
+ */
+typedef void (*XLogReserveInsertHookType)(void *data);
+extern XLogReserveInsertHookType XLogReserveInsertHook;
+extern void *XLogReserveInsertHookData;
+extern bool XLogReserveInsertHookModifiedRecord;
+
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
 								   XLogRecPtr fpw_lsn,
 								   uint8 flags,
@@ -270,6 +281,13 @@ extern void SetInstallXLogFileSegmentActive(void);
 extern bool IsInstallXLogFileSegmentActive(void);
 extern void XLogShutdownWalRcv(void);
 
+/*
+ * Functions to access the last commit Lamport timestamp held in
+ * XLogCtl.
+ */
+extern TimestampTz XLogGetLastTransactionStopTimestamp(void);
+extern void XLogSetLastTransactionStopTimestamp(TimestampTz tz);
+
 /*
  * Routines to start, stop, and get status of a base backup.
  */

Reply via email to