From 6ecd080e9b8e735645b32ac42a65f37d751af599 Mon Sep 17 00:00:00 2001
From: Maxim Orlov <orlovmg@gmail.com>
Date: Thu, 10 Apr 2025 21:39:29 +0800
Subject: [PATCH v5] Process sync requests incrementally in AbsorbSyncRequests

If the number of sync requests is big enough, the palloc() call in
AbsorbSyncRequests() will attempt to allocate more than 1 GB of memory,
resulting in failure. This can lead to an infinite loop in the checkpointer
process, as it repeatedly fails to absorb the pending requests.

To avoid this, we process requests incrementally in batches. This patch
introduces a bounded ring buffer for storing checkpointer sync requests,
ensuring that memory usage stays within a safe range and avoiding
excessive allocations.

Original author: Maxim Orlov <orlovmg@gmail.com>
Patch updated and revised by: Xuneng Zhou <xunengzhou@gmail.com>
---
 src/backend/postmaster/checkpointer.c | 150 ++++++++++++++++++-------
 1 file changed, 109 insertions(+), 41 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index d3cb3f1891c..914454d909e 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -127,6 +127,11 @@ typedef struct

 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
+
+	int			head;			/* Index of the first request in the ring
+								 * buffer */
+	int			tail;			/* Index of the last request in the ring
+								 * buffer */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;

@@ -135,6 +140,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
 /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
 #define WRITES_PER_ABSORB		1000

+/* Maximum number of checkpointer requests to process in one batch */
+#define CKPT_REQ_BATCH_SIZE 10000
+
+/* Max number of requests the checkpointer request queue can hold */
+#define MAX_CHECKPOINT_REQUESTS 10000000
+
 /*
  * GUC parameters
  */
@@ -974,7 +985,8 @@ CheckpointerShmemInit(void)
 		 */
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
-		CheckpointerShmem->max_requests = NBuffers;
+		CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS);
+		CheckpointerShmem->head = CheckpointerShmem->tail = 0;
 		ConditionVariableInit(&CheckpointerShmem->start_cv);
 		ConditionVariableInit(&CheckpointerShmem->done_cv);
 	}
@@ -1152,6 +1164,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 {
 	CheckpointerRequest *request;
 	bool		too_full;
+	int			insert_pos;

 	if (!IsUnderPostmaster)
 		return false;			/* probably shouldn't even get here */
@@ -1175,10 +1188,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 	}

 	/* OK, insert request */
-	request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
+	insert_pos = CheckpointerShmem->tail;
+	request = &CheckpointerShmem->requests[insert_pos];
 	request->ftag = *ftag;
 	request->type = type;

+	CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests;
+	CheckpointerShmem->num_requests++;
+
 	/* If queue is more than half full, nudge the checkpointer to empty it */
 	too_full = (CheckpointerShmem->num_requests >=
 				CheckpointerShmem->max_requests / 2);
@@ -1220,12 +1237,16 @@ CompactCheckpointerRequestQueue(void)
 	struct CheckpointerSlotMapping
 	{
 		CheckpointerRequest request;
-		int			slot;
+		int			ring_idx;
 	};

-	int			n,
-				preserve_count;
+	int			n;
 	int			num_skipped = 0;
+	int			head;
+	int			max_requests;
+	int			num_requests;
+	int			read_idx,
+				write_idx;
 	HASHCTL		ctl;
 	HTAB	   *htab;
 	bool	   *skip_slot;
@@ -1237,8 +1258,13 @@ CompactCheckpointerRequestQueue(void)
 	if (CritSectionCount > 0)
 		return false;

+	max_requests = CheckpointerShmem->max_requests;
+	num_requests = CheckpointerShmem->num_requests;
+
 	/* Initialize skip_slot array */
-	skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
+	skip_slot = palloc0(sizeof(bool) * max_requests);
+
+	head = CheckpointerShmem->head;

 	/* Initialize temporary hash table */
 	ctl.keysize = sizeof(CheckpointerRequest);
@@ -1262,7 +1288,8 @@ CompactCheckpointerRequestQueue(void)
 	 * away preceding entries that would end up being canceled anyhow), but
 	 * it's not clear that the extra complexity would buy us anything.
 	 */
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
+	read_idx = head;
+	for (n = 0; n < num_requests; n++)
 	{
 		CheckpointerRequest *request;
 		struct CheckpointerSlotMapping *slotmap;
@@ -1275,16 +1302,19 @@ CompactCheckpointerRequestQueue(void)
 		 * CheckpointerShmemInit.  Note also that RelFileLocator had better
 		 * contain no pad bytes.
 		 */
-		request = &CheckpointerShmem->requests[n];
+		request = &CheckpointerShmem->requests[read_idx];
 		slotmap = hash_search(htab, request, HASH_ENTER, &found);
 		if (found)
 		{
 			/* Duplicate, so mark the previous occurrence as skippable */
-			skip_slot[slotmap->slot] = true;
+			skip_slot[slotmap->ring_idx] = true;
 			num_skipped++;
 		}
 		/* Remember slot containing latest occurrence of this request value */
-		slotmap->slot = n;
+		slotmap->ring_idx = read_idx;
+
+		/* Move to the next request in the ring buffer */
+		read_idx = (read_idx + 1) % max_requests;
 	}

 	/* Done with the hash table. */
@@ -1298,17 +1328,34 @@ CompactCheckpointerRequestQueue(void)
 	}

 	/* We found some duplicates; remove them. */
-	preserve_count = 0;
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
+	read_idx = write_idx = head;
+	for (n = 0; n < num_requests; n++)
 	{
-		if (skip_slot[n])
-			continue;
-		CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+		/* If this slot is NOT skipped, keep it */
+		if (!skip_slot[read_idx])
+		{
+			/* If the read and write positions are different, copy the request */
+			if (write_idx != read_idx)
+				CheckpointerShmem->requests[write_idx] =
+					CheckpointerShmem->requests[read_idx];
+
+			/* Advance the write position */
+			write_idx = (write_idx + 1) % max_requests;
+		}
+
+		read_idx = (read_idx + 1) % max_requests;
 	}
+
+	/*
+	 * Update ring buffer state: head remains the same, tail moves, count
+	 * decreases
+	 */
+	CheckpointerShmem->tail = write_idx;
+	CheckpointerShmem->num_requests -= num_skipped;
+
 	ereport(DEBUG1,
 			(errmsg_internal("compacted fsync request queue from %d entries to %d entries",
-							 CheckpointerShmem->num_requests, preserve_count)));
-	CheckpointerShmem->num_requests = preserve_count;
+							 num_requests, CheckpointerShmem->num_requests)));

 	/* Cleanup. */
 	pfree(skip_slot);
@@ -1329,40 +1376,61 @@ AbsorbSyncRequests(void)
 {
 	CheckpointerRequest *requests = NULL;
 	CheckpointerRequest *request;
-	int			n;
+	int			n,
+				i;
+	bool		loop;

 	if (!AmCheckpointerProcess())
 		return;

-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
-	/*
-	 * We try to avoid holding the lock for a long time by copying the request
-	 * array, and processing the requests after releasing the lock.
-	 *
-	 * Once we have cleared the requests from shared memory, we have to PANIC
-	 * if we then fail to absorb them (eg, because our hashtable runs out of
-	 * memory).  This is because the system cannot run safely if we are unable
-	 * to fsync what we have been told to fsync.  Fortunately, the hashtable
-	 * is so small that the problem is quite unlikely to arise in practice.
-	 */
-	n = CheckpointerShmem->num_requests;
-	if (n > 0)
+	do
 	{
-		requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-		memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
-	}
+		LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+
+		/*
+		 * We try to avoid holding the lock for a long time by copying the
+		 * request array, and processing the requests after releasing the
+		 * lock.
+		 *
+		 * Once we have cleared the requests from shared memory, we have to
+		 * PANIC if we then fail to absorb them (eg, because our hashtable
+		 * runs out of memory).  This is because the system cannot run safely
+		 * if we are unable to fsync what we have been told to fsync.
+		 * Fortunately, the hashtable is so small that the problem is quite
+		 * unlikely to arise in practice.
+		 *
+		 * Note: we could not palloc more than 1Gb of memory, thus make sure
+		 * that the maximum number of elements will fit in the requests
+		 * buffer.
+		 */
+		n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE);
+		if (n > 0)
+		{
+			if (!requests)
+				requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
+
+			for (i = 0; i < n; i++)
+			{
+				requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head];
+				CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests;
+			}
+
+			CheckpointerShmem->num_requests -= n;
+
+		}
+
+		START_CRIT_SECTION();
+
+		/* Are there any requests in the queue? If so, keep going. */
+		loop = CheckpointerShmem->num_requests != 0;
+
+		LWLockRelease(CheckpointerCommLock);
+
+		for (request = requests; n > 0; request++, n--)
+			RememberSyncRequest(&request->ftag, request->type);
+
+		END_CRIT_SECTION();
+	} while (loop);

-	START_CRIT_SECTION();
-
-	CheckpointerShmem->num_requests = 0;
-
-	LWLockRelease(CheckpointerCommLock);
-
-	for (request = requests; n > 0; request++, n--)
-		RememberSyncRequest(&request->ftag, request->type);
-
-	END_CRIT_SECTION();
-
 	if (requests)
 		pfree(requests);
 -- 
2.48.1
