From 0923c1196bd8e4611de0a613a752e197d6c19a0f Mon Sep 17 00:00:00 2001
From: Jakub Wartak <jakub.wartak@enterprisedb.com>
Date: Thu, 26 Jan 2023 11:50:10 +0000
Subject: [PATCH] WIP: Syncrep and improving latency due to WAL throttling

---
 src/backend/access/transam/xact.c |  2 ++
 src/backend/access/transam/xlog.c | 42 +++++++++++++++++++++++++++++++
 src/backend/tcop/postgres.c       |  3 +++
 src/backend/utils/init/globals.c  |  1 +
 src/include/access/xlog.h         |  2 ++
 src/include/miscadmin.h           |  1 +
 6 files changed, 51 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d85e313908..05d56d65f9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2395,6 +2395,7 @@ CommitTransaction(void)
 
 	XactTopFullTransactionId = InvalidFullTransactionId;
 	nParallelCurrentXids = 0;
+	backendWalInserted = 0;
 
 	/*
 	 * done with commit processing, set current transaction state back to
@@ -2685,6 +2686,7 @@ PrepareTransaction(void)
 
 	XactTopFullTransactionId = InvalidFullTransactionId;
 	nParallelCurrentXids = 0;
+	backendWalInserted = 0;
 
 	/*
 	 * done with 1st phase commit processing, set current transaction state
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fb4c860bde..0add515b91 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1,4 +1,5 @@
 /*-------------------------------------------------------------------------
+
  *
  * xlog.c
  *		PostgreSQL write-ahead log manager
@@ -638,6 +639,8 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+uint32	backendWalInserted = 0;
+
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										XLogRecPtr EndOfLog,
 										TimeLineID newTLI);
@@ -1022,6 +1025,16 @@ XLogInsertRecord(XLogRecData *rdata,
 		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
 		pgWalUsage.wal_records++;
 		pgWalUsage.wal_fpi += num_fpi;
+
+		/* WAL throttling */
+		backendWalInserted += rechdr->xl_tot_len;
+		if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY || synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			/*synchronous_commit_flush_wal_after > 0 &&*/
+			backendWalInserted > 256L * 1024L)
+		{
+			InterruptPending=true;
+			XLogDelayPending=true;
+		}
 	}
 
 	return EndPos;
@@ -8952,3 +8965,32 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+
+/*
+ * Called from ProcessMessageInterrupts() to avoid waiting while being in critical section
+ */ 
+void HandleXLogDelayPending()
+{
+	/* flush only up to the last fully filled page */
+	XLogRecPtr 	LastFullyWrittenXLogPage = XactLastRecEnd - (XactLastRecEnd % XLOG_BLCKSZ);
+	XLogDelayPending = false;
+
+	//HOLD_INTERRUPTS();
+
+	/* XXX Debug for now */
+	elog(WARNING, "throttling WAL down on this session (backendWalInserted=%d, LSN=%X/%X flushingTo=%X/%X)", 
+		backendWalInserted, 
+		LSN_FORMAT_ARGS(XactLastRecEnd),
+		LSN_FORMAT_ARGS(LastFullyWrittenXLogPage));
+
+	/* XXX: refactor SyncRepWaitForLSN() to have different waitevent than default WAIT_EVENT_SYNC_REP  */
+	/* maybe new WAIT_EVENT_SYNC_REP_BIG or something like that */
+	XLogFlush(LastFullyWrittenXLogPage);
+	SyncRepWaitForLSN(LastFullyWrittenXLogPage, false);
+	elog(WARNING, "throttling WAL down on this session - end");
+	backendWalInserted = 0;
+
+	//RESUME_INTERRUPTS();
+}
+
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 470b734e9e..9eaf2df1ce 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3393,6 +3393,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		HandleParallelApplyMessages();
+
+	if (XLogDelayPending)
+		HandleXLogDelayPending();
 }
 
 /*
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 1b1d814254..8ed66b2eae 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -37,6 +37,7 @@ volatile sig_atomic_t IdleSessionTimeoutPending = false;
 volatile sig_atomic_t ProcSignalBarrierPending = false;
 volatile sig_atomic_t LogMemoryContextPending = false;
 volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
+volatile sig_atomic_t XLogDelayPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..1c0d35ea91 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -51,6 +51,7 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT uint32 backendWalInserted;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
@@ -246,6 +247,7 @@ extern TimeLineID GetWALInsertionTimeLine(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
+extern void HandleXLogDelayPending(void);
 
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 96b3a1e1a0..0fbb600d65 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -95,6 +95,7 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
+extern PGDLLIMPORT volatile sig_atomic_t XLogDelayPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
-- 
2.30.2

