Hi,

I keep getting occasional complaints about the impact of large/bulk
transactions on latency of small OLTP transactions, so I'd like to
revive this thread a bit and move it forward.

Attached is a rebased v3, followed by 0002 patch with some review
comments, missing comments and minor tweaks. More about that later ...

It's been a couple months, and there's been a fair amount of discussion
and changes earlier, so I guess it makes sense to post a summary,
stating the purpose (and scope), and then go through the various open
questions etc.


goals
-----
The goal is to limit the impact of large transactions (producing a lot
of WAL) on small OLTP transactions, in a cluster with a sync replica.
Imagine a backend executing single-row inserts, or something like that.
The commit will wait for the replica to confirm the WAL, which may be
expensive, but it's well determined by the network roundtrip.

But then a large transaction comes, and inserts a lot of WAL (imagine a
COPY which inserts 100MB of data, VACUUM, CREATE INDEX and so on). A
small transaction may insert a COMMIT record right after this WAL chunk,
and locally that's (mostly) fine. But with the sync replica it's much
worse - we don't send WAL until it's flushed locally, and then we need
to wait for the WAL to be sent, applied and confirmed by the replica.
This takes time (depending on the bandwidth), and it may not happen
until the small transaction does COMMIT (because we may not flush WAL
from in-progress transaction very often).

Jakub Wartak presented some examples of the impact when he started this
thread, and it can be rather bad. Particularly for latency-sensitive
applications. I plan to do more experiments with the current patch, but
I don't have the results yet.


scope
-----
Now, let's talk about scope - what the patch does not aim to do. The
patch is explicitly intended for syncrep clusters, not async. There have
been proposals to also support throttling for async replicas, logical
replication etc. I suppose all of that could be implemented, and I do
see the benefit of defining some sort of maximum lag even for async
replicas. But the agreement was to focus on the syncrep case, where it's
particularly painful, and perhaps extend it in the future.

I believe adding throttling for physical async replication should not be
difficult - in principle we need to determine how far the replica got,
and compare it to the local LSN. But there's likely complexity with
defining which async replicas to look at, inventing a sensible way to
configure this, etc. It'd be helpful if people interested in that
feature took a look at this patch and tried extending etc.

It's not clear to me what to do about disconnected replicas, though. We
may not even know about them, if there's no slot (and how would we know
what the slot is for). So maybe this would need a new GUC listing the
interesting replicas, and all would need to be connected. But that's an
availability issue, because then all replicas need to be connected.

I'm not sure about logical replication, but I guess we could treat it
similarly to async.

But what I think would need to be different is handling of small
transactions. For syncrep we automatically wait for those at commit,
which means automatic throttling. But for async (and logical), it's
trivial to cause ever-increasing lag with only tiny transactions, thanks
to the single-process replay, so maybe we'd need to throttle those too.
(The recovery prefetching improved this for async quite a bit, ofc.)


implementation
--------------
The implementation is fairly straightforward, and happens in two places.
XLogInsertRecord() decides if a throttling might be needed for this
backend, and then HandleXLogDelayPending() does the wait.

XLogInsertRecord() checks if the backend produced certain amount of WAL
(might be 1MB, for example). We do this because we don't want to do the
expensive stuff in HandleXLogDelayPending() too often (e.g. after every
XLOG record).

HandleXLogDelayPending() picks a suitable LSN, flushes it and then also
waits for the sync replica, as if it was a commit. This limits the lag,
i.e. the amount of WAL that the small transaction will need to wait for
to be replicated and confirmed by the replica.

There was a fair amount of discussion about how to pick the LSN. I think
the agreement is we certainly can't pick the current LSN (because that
would lead to write amplification for the partially filled page), and we
probably even want to backoff a bit more, to make it more likely the LSN
is already flushed. So for example with the threshold set to 8MB we
might go back 1MB, or something like that. That'd still limit the lag.


problems
--------
Now let's talk about some problems - both conceptual and technical
(essentially review comments for the patch).

1) The goal of the patch is to limit the impact on latency, but the
relationship between WAL amounts and latency may not be linear. But we
don't have a good way to predict latency, and WAL lag is the only thing
we have, so there's that. Ultimately, it's a best effort.

2) The throttling is per backend. That makes it simple, but it means
that it's hard to enforce a global lag limit. Imagine the limit is 8MB,
and with a single backend that works fine - the lag should not exceed
the 8MB value. But if there are N backends, the lag could be up to
N-times 8MB, I believe. That's a bit annoying, but I guess the only
solution would be to have some autovacuum-like cost balancing, with all
backends (or at least those running large stuff) doing the checks more
often. I'm not sure we want to do that.

3) The actual throttling (flush and wait for syncrep) happens in
ProcessInterrupts(), which mostly works but it has two drawbacks:

 * It may not happen "early enough" if the backends inserts a lot of
XLOG records without processing interrupts in between.

 * It may happen "too early" if the backend inserts enough WAL to need
throttling (i.e. sets XLogDelayPending), but then after processing
interrupts it would be busy with other stuff, not inserting more WAL.

I think ideally we'd do the throttling right before inserting the next
XLOG record, but there's no convenient place, I think. We'd need to
annotate a lot of places, etc. So maybe ProcessInterrupts() is a
reasonable approximation.

We may need to add CHECK_FOR_INTERRUPTS() to a couple more places, but
that seems reasonable.

4) I'm not sure I understand why we need XactLastThrottledRecEnd. Why
can't we just use XLogRecEnd?

5) I think the way XLogFlush() resets backendWalInserted is a bit wrong.
Imagine a backend generates a fair amount of WAL, and then calls
XLogFlush(lsn). Why is it OK to set backendWalInserted=0 when we don't
know if the generated WAL was before the "lsn"? I suppose we don't use
very old lsn values for flushing, but I don't know if this drift could
accumulate over time, or cause some other issues.

6) Why XLogInsertRecord() did skip SYNCHRONOUS_COMMIT_REMOTE_FLUSH?

7) I find the "synchronous_commit_wal_throttle_threshold" name
annoyingly long, so I renamed it to just "wal_throttle_threshold". I've
also renamed the GUC to "wal_throttle_after" and I wonder if maybe it
should be configured in , maybe it should be in GUC_UNIT_BLOCKS just
like the other _after options? But those changes are more a matter of
taste, feel free to ignore this.


missing pieces
--------------
The thing that's missing is that some processes (like aggressive
anti-wraparound autovacuum) should not be throttled. If people set the
GUC in the postgresql.conf, I guess that'll affect those processes too,
so I guess we should explicitly reset the GUC for those processes. I
wonder if there are other cases that should not be throttled.


tangents
--------
While discussing this with Andres a while ago, he mentioned a somewhat
orthogonal idea - sending unflushed data to the replica.

We currently never send unflushed data to the replica, which makes sense
because this data is not durable and if the primary crashes/restarts,
this data will disappear. But it also means there may be a fairly large
chunk of WAL data that we may need to send at COMMIT and wait for the
confirmation.

He suggested we might actually send the data to the replica, but the
replica would know this data is not flushed yet and so would not do the
recovery etc. And at commit we could just send a request to flush,
without having to transfer the data at that moment.

I don't have a very good intuition about how large the effect would be,
i.e. how much unflushed WAL data could accumulate on the primary
(kilobytes/megabytes?), and how big is the difference between sending a
couple kilobytes or just a request to flush.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From a46f9619a67d1f0847705e953db9240e612fbc59 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Sat, 4 Nov 2023 15:05:59 +0100
Subject: [PATCH v4 1/2] v3

---
 src/backend/access/heap/vacuumlazy.c          |  7 ++-
 src/backend/access/transam/xlog.c             | 55 +++++++++++++++++++
 src/backend/catalog/system_views.sql          |  1 +
 src/backend/commands/explain.c                |  5 ++
 src/backend/executor/instrument.c             |  2 +
 src/backend/replication/syncrep.c             | 17 ++++--
 src/backend/tcop/postgres.c                   |  3 +
 src/backend/utils/activity/pgstat_wal.c       |  1 +
 .../utils/activity/wait_event_names.txt       |  1 +
 src/backend/utils/adt/pgstatfuncs.c           | 10 +++-
 src/backend/utils/init/globals.c              |  1 +
 src/backend/utils/misc/guc_tables.c           | 12 ++++
 src/include/access/xlog.h                     |  3 +
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/executor/instrument.h             |  1 +
 src/include/miscadmin.h                       |  2 +
 src/include/pgstat.h                          |  3 +-
 src/test/regress/expected/rules.out           |  3 +-
 18 files changed, 120 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 6985d299b2..d9cc06f620 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -762,10 +762,15 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 (long long) PageMissOp,
 							 (long long) PageDirtyOp);
 			appendStringInfo(&buf,
-							 _("WAL usage: %lld records, %lld full page images, %llu bytes\n"),
+							 _("WAL usage: %lld records, %lld full page images, %llu bytes"),
 							 (long long) walusage.wal_records,
 							 (long long) walusage.wal_fpi,
 							 (unsigned long long) walusage.wal_bytes);
+			if(walusage.wal_throttled > 0)
+				appendStringInfo(&buf, _("%lld times throttled"), (long long) walusage.wal_throttled);
+			else
+				appendStringInfo(&buf, _("\n"));
+
 			appendStringInfo(&buf, _("system usage: %s"), pg_rusage_show(&ru0));
 
 			ereport(verbose ? INFO : LOG,
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 40461923ea..3947f889de 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1,4 +1,5 @@
 /*-------------------------------------------------------------------------
+
  *
  * xlog.c
  *		PostgreSQL write-ahead log manager
@@ -73,6 +74,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "portability/instr_time.h"
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
@@ -82,6 +84,7 @@
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -108,6 +111,7 @@
 #include "utils/timestamp.h"
 #include "utils/varlena.h"
 
+
 extern uint32 bootstrap_data_checksum_version;
 
 /* timeline ID to be used when bootstrapping */
@@ -138,6 +142,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int		synchronous_commit_wal_throttle_threshold = 0; /* kb */
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -252,10 +257,14 @@ static int	LocalXLogInsertAllowed = -1;
  * parallel backends may have written WAL records at later LSNs than the value
  * stored here.  The parallel leader advances its own copy, when necessary,
  * in WaitForParallelWorkersToFinish.
+ *
+ * XactLastThrottledRecEnd points to the last XLOG record that should be throttled
+ * as the additional WAL records could be generated before processing interrupts.
  */
 XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
+static XLogRecPtr	XactLastThrottledRecEnd = InvalidXLogRecPtr;
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -648,6 +657,8 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+uint32	backendWalInserted = 0;
+
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										XLogRecPtr EndOfLog,
 										TimeLineID newTLI);
@@ -1073,6 +1084,24 @@ XLogInsertRecord(XLogRecData *rdata,
 		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
 		pgWalUsage.wal_records++;
 		pgWalUsage.wal_fpi += num_fpi;
+
+		/* WAL throttling: we can slow down (some) backends generating a lot of WAL
+		 * in syncrep scenario by waiting for standby confirmation. This allows
+		 * prioritzation of other backends over this backend in bandwidth constrained
+		 * WAN scenarios. Such throttled down backends are going to be visible with
+		 * "SyncRepThrottled" wait event.
+		 */
+		backendWalInserted += rechdr->xl_tot_len;
+		if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY ||
+			synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			synchronous_commit_wal_throttle_threshold > 0 &&
+			backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L)
+		{
+			XactLastThrottledRecEnd = XactLastRecEnd;
+			InterruptPending = true;
+			XLogDelayPending = true;
+			pgWalUsage.wal_throttled++;
+		}
 	}
 
 	return EndPos;
@@ -2607,6 +2636,9 @@ XLogFlush(XLogRecPtr record)
 		return;
 	}
 
+	/* reset WAL throttling bytes counter */
+	backendWalInserted = 0;
+
 	/* Quick exit if already known flushed */
 	if (record <= LogwrtResult.Flush)
 		return;
@@ -9109,3 +9141,26 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+
+/*
+ * Called from ProcessMessageInterrupts() to avoid waiting while
+ * being in critical section. Performing those directly from XLogInsertRecord()
+ * would cause locks to be held for longer duration.
+ */
+void
+HandleXLogDelayPending()
+{
+	/* flush only up to the last fully filled page to avoid repeating flushing
+	 * of the same page multiple times */
+	XLogRecPtr 	LastFullyWrittenXLogPage = XactLastThrottledRecEnd -
+		(XactLastThrottledRecEnd % XLOG_BLCKSZ);
+
+	Assert(synchronous_commit_wal_throttle_threshold > 0);
+	Assert(backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L);
+	Assert(XactLastThrottledRecEnd != InvalidXLogRecPtr);
+
+	XLogFlush(LastFullyWrittenXLogPage);
+	SyncRepWaitForLSN(LastFullyWrittenXLogPage, false);
+	XLogDelayPending = false;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 886f175fc2..4ba74365d3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1153,6 +1153,7 @@ CREATE VIEW pg_stat_wal AS
         w.wal_sync,
         w.wal_write_time,
         w.wal_sync_time,
+        w.wal_throttled,
         w.stats_reset
     FROM pg_stat_get_wal() w;
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f1d71bc54e..0ec58c1397 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3752,6 +3752,9 @@ show_wal_usage(ExplainState *es, const WalUsage *usage)
 			if (usage->wal_bytes > 0)
 				appendStringInfo(es->str, " bytes=" UINT64_FORMAT,
 								 usage->wal_bytes);
+			if (usage->wal_throttled > 0)
+				appendStringInfo(es->str, " throttled=%lld",
+								 (long long) usage->wal_throttled);
 			appendStringInfoChar(es->str, '\n');
 		}
 	}
@@ -3763,6 +3766,8 @@ show_wal_usage(ExplainState *es, const WalUsage *usage)
 							   usage->wal_fpi, es);
 		ExplainPropertyUInteger("WAL Bytes", NULL,
 								usage->wal_bytes, es);
+		ExplainPropertyUInteger("WAL Throttled", NULL,
+								usage->wal_throttled, es);
 	}
 }
 
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index c383f34c06..b834247e95 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -280,6 +280,7 @@ WalUsageAdd(WalUsage *dst, WalUsage *add)
 	dst->wal_bytes += add->wal_bytes;
 	dst->wal_records += add->wal_records;
 	dst->wal_fpi += add->wal_fpi;
+	dst->wal_throttled += add->wal_throttled;
 }
 
 void
@@ -288,4 +289,5 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
 	dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
 	dst->wal_records += add->wal_records - sub->wal_records;
 	dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
+	dst->wal_throttled += add->wal_throttled - sub->wal_throttled;
 }
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 0ea71b5c43..e0fd6cfb19 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -145,6 +145,7 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
  * to be flushed if synchronous_commit is set to the higher level of
  * remote_apply, because only commit records provide apply feedback.
  */
+
 void
 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 {
@@ -153,9 +154,10 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	/*
 	 * This should be called while holding interrupts during a transaction
 	 * commit to prevent the follow-up shared memory queue cleanups to be
-	 * influenced by external interruptions.
+	 * influenced by external interruptions. The only exception is WAL throttling
+	 * where this could be called without holding interrupts.
 	 */
-	Assert(InterruptHoldoffCount > 0);
+	Assert(XLogDelayPending == true || InterruptHoldoffCount > 0);
 
 	/*
 	 * Fast exit if user has not requested sync replication, or there are no
@@ -229,6 +231,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	for (;;)
 	{
 		int			rc;
+		uint32			wait_event;
 
 		/* Must reset the latch before testing state. */
 		ResetLatch(MyLatch);
@@ -282,12 +285,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 			break;
 		}
 
+		/* XLogDelayPending flag that is used here is being reset afterwards in
+		 * in HandleXLogDelayPending()
+		 */
+		if(XLogDelayPending == true)
+			wait_event = WAIT_EVENT_SYNC_REP_THROTTLED;
+		else
+			wait_event = WAIT_EVENT_SYNC_REP;
 		/*
 		 * Wait on latch.  Any condition that should wake us up will set the
 		 * latch, so no need for timeout.
 		 */
-		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
-					   WAIT_EVENT_SYNC_REP);
+		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, wait_event);
 
 		/*
 		 * If the postmaster dies, we'll probably never get an acknowledgment,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 6a070b5d8c..c3f3d0e86c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3459,6 +3459,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		HandleParallelApplyMessages();
+
+	if (XLogDelayPending)
+		HandleXLogDelayPending();
 }
 
 /*
diff --git a/src/backend/utils/activity/pgstat_wal.c b/src/backend/utils/activity/pgstat_wal.c
index 6a81b78135..537876d446 100644
--- a/src/backend/utils/activity/pgstat_wal.c
+++ b/src/backend/utils/activity/pgstat_wal.c
@@ -114,6 +114,7 @@ pgstat_flush_wal(bool nowait)
 	WALSTAT_ACC(wal_records, wal_usage_diff);
 	WALSTAT_ACC(wal_fpi, wal_usage_diff);
 	WALSTAT_ACC(wal_bytes, wal_usage_diff);
+	WALSTAT_ACC(wal_throttled, wal_usage_diff);
 	WALSTAT_ACC(wal_buffers_full, PendingWalStats);
 	WALSTAT_ACC(wal_write, PendingWalStats);
 	WALSTAT_ACC(wal_sync, PendingWalStats);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index d7995931bd..229297ba1c 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -140,6 +140,7 @@ REPLICATION_SLOT_DROP	"Waiting for a replication slot to become inactive so it c
 RESTORE_COMMAND	"Waiting for <xref linkend="guc-restore-command"/> to complete."
 SAFE_SNAPSHOT	"Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
 SYNC_REP	"Waiting for confirmation from a remote server during synchronous replication."
+SYNC_REP_THROTTLED	"Waiting for sync replica lag to reduce."
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at end of a parallel operation."
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6468b6a805..5b18823270 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1446,7 +1446,7 @@ pg_stat_get_io(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_wal(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_COLS	9
+#define PG_STAT_GET_WAL_COLS	10
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_WAL_COLS] = {0};
 	bool		nulls[PG_STAT_GET_WAL_COLS] = {0};
@@ -1471,7 +1471,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS)
 					   FLOAT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "wal_sync_time",
 					   FLOAT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "wal_throttled",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 
 	BlessTupleDesc(tupdesc);
@@ -1498,7 +1500,9 @@ pg_stat_get_wal(PG_FUNCTION_ARGS)
 	values[6] = Float8GetDatum(((double) wal_stats->wal_write_time) / 1000.0);
 	values[7] = Float8GetDatum(((double) wal_stats->wal_sync_time) / 1000.0);
 
-	values[8] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp);
+	values[8] = Int64GetDatum(wal_stats->wal_throttled);
+
+	values[9] = TimestampTzGetDatum(wal_stats->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 60bc1217fb..ccbbfae56a 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -40,6 +40,7 @@ volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
+bool XLogDelayPending = false;
 
 int			MyProcPid;
 pg_time_t	MyStartTime;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 7605eff9b9..5ebd269304 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2882,6 +2882,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronous_commit_wal_throttle_threshold", PGC_USERSET, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
+						"which it waits for synchronous commit confiration even without commit"),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&synchronous_commit_wal_throttle_threshold,
+		0, 0, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"extra_float_digits", PGC_USERSET, CLIENT_CONN_LOCALE,
 			gettext_noop("Sets the number of digits displayed for floating-point values."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index a14126d164..895ca62ca6 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -54,6 +54,8 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int synchronous_commit_wal_throttle_threshold;
+extern PGDLLIMPORT uint32 backendWalInserted;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
@@ -250,6 +252,7 @@ extern TimeLineID GetWALInsertionTimeLine(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
+extern void HandleXLogDelayPending(void);
 
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bc41e92677..6c5506f10b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5754,9 +5754,9 @@
 { oid => '1136', descr => 'statistics: information about WAL activity',
   proname => 'pg_stat_get_wal', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o}',
-  proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}',
+  proallargtypes => '{int8,int8,numeric,int8,int8,int8,float8,float8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,wal_throttled,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 { oid => '6248', descr => 'statistics: information about WAL prefetching',
   proname => 'pg_stat_get_recovery_prefetch', prorows => '1', proretset => 't',
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index d5d69941c5..7a06ef3258 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -53,6 +53,7 @@ typedef struct WalUsage
 	int64		wal_records;	/* # of WAL records produced */
 	int64		wal_fpi;		/* # of WAL full page images produced */
 	uint64		wal_bytes;		/* size of WAL records produced */
+	int64		wal_throttled;		/* # of times WAL throttling was engaged*/
 } WalUsage;
 
 /* Flag bits included in InstrAlloc's instrument_options bitmask */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index f0cc651435..aa56eeae3b 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -95,6 +95,8 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
+/* this doesn't need to be volatile sig_atomic_t as it set in XLogInsertRecord() */
+extern PGDLLIMPORT bool XLogDelayPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e6fd20f1ce..6302dfad61 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -235,7 +235,7 @@ typedef struct PgStat_TableXactStatus
  * ------------------------------------------------------------
  */
 
-#define PGSTAT_FILE_FORMAT_ID	0x01A5BCAC
+#define PGSTAT_FILE_FORMAT_ID	0x0907AFBD
 
 typedef struct PgStat_ArchiverStats
 {
@@ -434,6 +434,7 @@ typedef struct PgStat_WalStats
 	PgStat_Counter wal_sync;
 	PgStat_Counter wal_write_time;
 	PgStat_Counter wal_sync_time;
+	PgStat_Counter wal_throttled; /* how many times backend was throttled */
 	TimestampTz stat_reset_timestamp;
 } PgStat_WalStats;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 798d1610f2..1cc56b85fa 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2229,8 +2229,9 @@ pg_stat_wal| SELECT wal_records,
     wal_sync,
     wal_write_time,
     wal_sync_time,
+    wal_throttled,
     stats_reset
-   FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, stats_reset);
+   FROM pg_stat_get_wal() w(wal_records, wal_fpi, wal_bytes, wal_buffers_full, wal_write, wal_sync, wal_write_time, wal_sync_time, wal_throttled, stats_reset);
 pg_stat_wal_receiver| SELECT pid,
     status,
     receive_start_lsn,
-- 
2.41.0

From 126d2ab89716e93325f98203877df2d8fdbce51f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Sat, 4 Nov 2023 17:53:16 +0100
Subject: [PATCH v4 2/2] review

---
 src/backend/access/heap/vacuumlazy.c |   2 +-
 src/backend/access/transam/xlog.c    | 103 ++++++++++++++++++++-------
 src/backend/replication/syncrep.c    |  18 +++--
 src/backend/utils/misc/guc_tables.c  |   6 +-
 src/include/access/xlog.h            |   2 +-
 src/include/executor/instrument.h    |   2 +-
 src/include/miscadmin.h              |   2 +-
 7 files changed, 97 insertions(+), 38 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index d9cc06f620..123539d0fc 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -767,7 +767,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 							 (long long) walusage.wal_fpi,
 							 (unsigned long long) walusage.wal_bytes);
 			if(walusage.wal_throttled > 0)
-				appendStringInfo(&buf, _("%lld times throttled"), (long long) walusage.wal_throttled);
+				appendStringInfo(&buf, _("%lld times throttled\n"), (long long) walusage.wal_throttled);
 			else
 				appendStringInfo(&buf, _("\n"));
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3947f889de..7a549f8c3f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1,5 +1,4 @@
 /*-------------------------------------------------------------------------
-
  *
  * xlog.c
  *		PostgreSQL write-ahead log manager
@@ -111,7 +110,6 @@
 #include "utils/timestamp.h"
 #include "utils/varlena.h"
 
-
 extern uint32 bootstrap_data_checksum_version;
 
 /* timeline ID to be used when bootstrapping */
@@ -142,7 +140,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
-int		synchronous_commit_wal_throttle_threshold = 0; /* kb */
+int			wal_throttle_threshold = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -260,6 +258,8 @@ static int	LocalXLogInsertAllowed = -1;
  *
  * XactLastThrottledRecEnd points to the last XLOG record that should be throttled
  * as the additional WAL records could be generated before processing interrupts.
+ *
+ * XXX I'm not sure I understand what "record to be throttled" means?
  */
 XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
@@ -657,6 +657,12 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+/*
+ * Amount of WAL inserted in this backend since eithe the transaction
+ * start or throttle point (where we reset the counter to 0).
+ *
+ * XXX Not sure it should refer to "backend", it's really about xact, no?
+ */
 uint32	backendWalInserted = 0;
 
 static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
@@ -1085,17 +1091,33 @@ XLogInsertRecord(XLogRecData *rdata,
 		pgWalUsage.wal_records++;
 		pgWalUsage.wal_fpi += num_fpi;
 
-		/* WAL throttling: we can slow down (some) backends generating a lot of WAL
-		 * in syncrep scenario by waiting for standby confirmation. This allows
-		 * prioritzation of other backends over this backend in bandwidth constrained
-		 * WAN scenarios. Such throttled down backends are going to be visible with
-		 * "SyncRepThrottled" wait event.
+		/*
+		 * Decide if we need to throttle this backend, so that it does not write
+		 * WAL too fast, causing lag against the sync standby (which in turn
+		 * increases latency for standby confirmations). We may be holding locks
+		 * and blocking interrupts here, so we only make the decision, but the
+		 * wait (for sync standby confirmation) happens elsewhere.
+		 *
+		 * The throttling is applied only to large transactions (producing more
+		 * than wal_throttle_threshold kilobytes of WAL). Throttled backends
+		 * can be identified by a new wait event SYNC_REP_THROTTLED.
+		 *
+		 * Small transactions (by amount of produced WAL) are still subject to
+		 * the sync replication, so the same wait happens at commit time.
+		 *
+		 * XXX Not sure this is the right place for a comment explaining how the
+		 * throttling works. This place is way too low level, and rather far from
+		 * the place where the wait actually happens.
+		 *
+		 * XXX Should this be done even if XLogDelayPending is already set? Maybe
+		 * that should only update XactLastThrottledRecEnd, withoug incrementing
+		 * the pgWalUsage.wal_throttled counter?
 		 */
 		backendWalInserted += rechdr->xl_tot_len;
-		if ((synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_APPLY ||
-			synchronous_commit == SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
-			synchronous_commit_wal_throttle_threshold > 0 &&
-			backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L)
+
+		if ((synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_WRITE) &&
+			(wal_throttle_threshold > 0) &&
+			(backendWalInserted >= wal_throttle_threshold * 1024L))
 		{
 			XactLastThrottledRecEnd = XactLastRecEnd;
 			InterruptPending = true;
@@ -2636,7 +2658,22 @@ XLogFlush(XLogRecPtr record)
 		return;
 	}
 
-	/* reset WAL throttling bytes counter */
+	/*
+	 * Reset WAL throttling bytes counter.
+	 *
+	 * XXX I think this is somewhat wrong. The LSN we want to flush may be
+	 * somewhere in the past, before some (most) of the generated WAL. For
+	 * example assume the backend just wrote 1MB, but then we ask for flush
+	 * with a position 1MB back. Most of the generated WAL is likely after
+	 * the flushed LSN, yet we're setting this to 0. (I don't know how common
+	 * such situation is, probably not very.)
+	 *
+	 * I don't think we can / should track amount of WAL for arbitrary LSN
+	 * values, but maybe we should track the last flush LSN position, and
+	 * then linearly approximate the WAL.
+	 *
+	 * XXX For the approximation we should probably use LogwrtResult.Flush.
+	 */
 	backendWalInserted = 0;
 
 	/* Quick exit if already known flushed */
@@ -9142,25 +9179,41 @@ SetWalWriterSleeping(bool sleeping)
 	SpinLockRelease(&XLogCtl->info_lck);
 }
 
-
 /*
- * Called from ProcessMessageInterrupts() to avoid waiting while
- * being in critical section. Performing those directly from XLogInsertRecord()
- * would cause locks to be held for longer duration.
+ * HandleXLogDelayPending
+ *		Throttle backends generating large amounts of WAL.
+ *
+ * The throttling is implemented by waiting for a sync replica confirmation for
+ * a convenient LSN position. In particular, we do not wait for the current LSN,
+ * which may be in a partially filled WAL page (and we don't want to write this
+ * one out - we'd have to write it out again, causing write amplification).
+ * Instead, we move back to the last fully WAL page.
+ *
+ * Called from ProcessMessageInterrupts() to avoid syncrep waits in XLogInsert(),
+ * which happens in critical section and with blocked interrupts (so it would be
+ * impossible to cancel the wait if it gets stuck). Also, there may be locks held
+ * and we don't want to hold them longer just because of the wait.
+ *
+ * XXX Andres suggested we actually go back a couple pages, to increase the
+ * probability the LSN was already flushed (obviously, this depends on how much
+ * lag we allow).
+ *
+ * XXX Not sure why we use XactLastThrottledRecEnd and not simply XLogRecEnd?
  */
 void
 HandleXLogDelayPending()
 {
-	/* flush only up to the last fully filled page to avoid repeating flushing
-	 * of the same page multiple times */
-	XLogRecPtr 	LastFullyWrittenXLogPage = XactLastThrottledRecEnd -
-		(XactLastThrottledRecEnd % XLOG_BLCKSZ);
+	XLogRecPtr 	lsn;
+
+	/* calculate last fully filled page */
+	lsn = XactLastThrottledRecEnd - (XactLastThrottledRecEnd % XLOG_BLCKSZ);
 
-	Assert(synchronous_commit_wal_throttle_threshold > 0);
-	Assert(backendWalInserted > synchronous_commit_wal_throttle_threshold * 1024L);
+	Assert(wal_throttle_threshold > 0);
+	Assert(backendWalInserted >= wal_throttle_threshold * 1024L);
 	Assert(XactLastThrottledRecEnd != InvalidXLogRecPtr);
 
-	XLogFlush(LastFullyWrittenXLogPage);
-	SyncRepWaitForLSN(LastFullyWrittenXLogPage, false);
+	XLogFlush(lsn);
+	SyncRepWaitForLSN(lsn, false);
+
 	XLogDelayPending = false;
 }
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index e0fd6cfb19..152c51fe29 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -144,8 +144,11 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
  * represents a commit record.  If it doesn't, then we wait only for the WAL
  * to be flushed if synchronous_commit is set to the higher level of
  * remote_apply, because only commit records provide apply feedback.
+ *
+ * This may be called either when waiting for PREPARE/COMMIT, of because of WAL
+ * throttling (in which case the flag XLogDelayPending is set to true). We use
+ * different wait events for these cases.
  */
-
 void
 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 {
@@ -231,7 +234,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	for (;;)
 	{
 		int			rc;
-		uint32			wait_event;
+		uint32		wait_event;
 
 		/* Must reset the latch before testing state. */
 		ResetLatch(MyLatch);
@@ -285,18 +288,21 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 			break;
 		}
 
-		/* XLogDelayPending flag that is used here is being reset afterwards in
-		 * in HandleXLogDelayPending()
+		/*
+		 * XLogDelayPending means this syncrep wait happens because of WAL
+		 * throttling. The flag is reset in HandleXLogDelayPending() later.
 		 */
-		if(XLogDelayPending == true)
+		if(XLogDelayPending)
 			wait_event = WAIT_EVENT_SYNC_REP_THROTTLED;
 		else
 			wait_event = WAIT_EVENT_SYNC_REP;
+
 		/*
 		 * Wait on latch.  Any condition that should wake us up will set the
 		 * latch, so no need for timeout.
 		 */
-		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, wait_event);
+		rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
+					   wait_event);
 
 		/*
 		 * If the postmaster dies, we'll probably never get an acknowledgment,
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 5ebd269304..0e6bd25a32 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2883,13 +2883,13 @@ struct config_int ConfigureNamesInt[] =
 	},
 
 	{
-		{"synchronous_commit_wal_throttle_threshold", PGC_USERSET, REPLICATION_SENDING,
+		{"wal_throttle_after", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum amount of WAL in kilobytes a backend generates "
-						"which it waits for synchronous commit confiration even without commit"),
+						 " before waiting for sync standby, to limit the replication lag."),
 			NULL,
 			GUC_UNIT_KB
 		},
-		&synchronous_commit_wal_throttle_threshold,
+		&wal_throttle_threshold,
 		0, 0, MAX_KILOBYTES,
 		NULL, NULL, NULL
 	},
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 895ca62ca6..9d1175edab 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -54,7 +54,7 @@ extern PGDLLIMPORT char *wal_consistency_checking_string;
 extern PGDLLIMPORT bool log_checkpoints;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
-extern PGDLLIMPORT int synchronous_commit_wal_throttle_threshold;
+extern PGDLLIMPORT int wal_throttle_threshold;
 extern PGDLLIMPORT uint32 backendWalInserted;
 
 extern PGDLLIMPORT int CheckPointSegments;
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 7a06ef3258..58eca5b7a1 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -53,7 +53,7 @@ typedef struct WalUsage
 	int64		wal_records;	/* # of WAL records produced */
 	int64		wal_fpi;		/* # of WAL full page images produced */
 	uint64		wal_bytes;		/* size of WAL records produced */
-	int64		wal_throttled;		/* # of times WAL throttling was engaged*/
+	int64		wal_throttled;		/* # of times WAL throttling was engaged */
 } WalUsage;
 
 /* Flag bits included in InstrAlloc's instrument_options bitmask */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index aa56eeae3b..44641e0e4c 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -95,7 +95,7 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
 extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
 extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
-/* this doesn't need to be volatile sig_atomic_t as it set in XLogInsertRecord() */
+/* doesn't need to be volatile sig_atomic_t as it's not set by signal handler */
 extern PGDLLIMPORT bool XLogDelayPending;
 
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
-- 
2.41.0

Reply via email to