Hi,

I am working on using the read stream in autoprewarm. I observed ~10%
performance gain with this change. The patch is attached.

The downside of the read stream approach is that a new read stream
object needs to be created for each database, relation and fork. I was
wondering if this would cause a regression but it did not (at least
depending on results of my testing). Another downside could be the
code getting complicated.

For the testing,
- I created 50 databases with each of them having 50 tables and the
size of the tables are 520KB.
    - patched: 51157 ms
    - master: 56769 ms
- I created 5 databases with each of them having 1 table and the size
of the tables are 3GB.
    - patched: 32679 ms
    - master: 36706 ms

I put debugging message with timing information in
autoprewarm_database_main() function, then run autoprewarm 100 times
(by restarting the server) and cleared the OS cache before each
restart. Also, I ensured that the block number of the buffer returning
from the read stream API is correct. I am not sure if that much
testing is enough for this kind of change.

Any feedback would be appreciated.

--
Regards,
Nazir Bilal Yavuz
Microsoft
From c5e286612912ba6840d967812171162a948153e4 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Wed, 7 Aug 2024 17:27:50 +0300
Subject: [PATCH v1] Use read stream in autoprewarm

Instead of reading blocks with ReadBufferExtended(), create read stream
object for each possible case and use it.

This change provides about 10% performance improvement.
---
 contrib/pg_prewarm/autoprewarm.c | 102 +++++++++++++++++++++++++++++--
 1 file changed, 97 insertions(+), 5 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index d061731706a..96e93c46f85 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -44,6 +44,7 @@
 #include "storage/lwlock.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
@@ -429,6 +430,58 @@ apw_load_buffers(void)
 						apw_state->prewarmed_blocks, num_elements)));
 }
 
+struct apw_read_stream_private
+{
+	bool		first_block;
+	int			max_pos;
+	int			pos;
+	BlockInfoRecord *block_info;
+	BlockNumber nblocks_in_fork;
+
+};
+
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+	bool	   *rs_have_free_buffer = per_buffer_data;
+	BlockInfoRecord *old_blk;
+	BlockInfoRecord *cur_blk;
+
+	*rs_have_free_buffer = true;
+
+	if (!have_free_buffer())
+	{
+		*rs_have_free_buffer = false;
+		return InvalidBlockNumber;
+	}
+
+	if (p->pos == p->max_pos)
+		return InvalidBlockNumber;
+
+	if (p->first_block)
+	{
+		p->first_block = false;
+		return p->block_info[p->pos++].blocknum;
+	}
+
+	old_blk = &(p->block_info[p->pos - 1]);
+	cur_blk = &(p->block_info[p->pos]);
+
+	if (old_blk->database == cur_blk->database &&
+		old_blk->forknum == cur_blk->forknum &&
+		old_blk->filenumber == cur_blk->filenumber &&
+		cur_blk->blocknum < p->nblocks_in_fork)
+	{
+		p->pos++;
+		return cur_blk->blocknum;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
  * those got grouped with this database).
@@ -442,6 +495,9 @@ autoprewarm_database_main(Datum main_arg)
 	BlockNumber nblocks = 0;
 	BlockInfoRecord *old_blk = NULL;
 	dsm_segment *seg;
+	ReadStream *stream = NULL;
+	struct apw_read_stream_private p;
+	bool	   *rs_have_free_buffer;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -458,13 +514,16 @@ autoprewarm_database_main(Datum main_arg)
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
 	pos = apw_state->prewarm_start_idx;
 
+	p.block_info = block_info;
+	p.max_pos = apw_state->prewarm_stop_idx;
+
 	/*
 	 * Loop until we run out of blocks to prewarm or until we run out of free
 	 * buffers.
 	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
+	for (; pos < apw_state->prewarm_stop_idx; pos++)
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
+		BlockInfoRecord *blk = &block_info[pos];
 		Buffer		buf;
 
 		CHECK_FOR_INTERRUPTS();
@@ -477,6 +536,18 @@ autoprewarm_database_main(Datum main_arg)
 			old_blk->database != 0)
 			break;
 
+		/*
+		 * If stream needs to be created again, end it before closing the old
+		 * relation.
+		 */
+		if (stream && (old_blk == NULL ||
+					   old_blk->filenumber != blk->filenumber ||
+					   old_blk->forknum != blk->forknum))
+		{
+			Assert(read_stream_next_buffer(stream, (void **) &rs_have_free_buffer) == InvalidBuffer);
+			read_stream_end(stream);
+		}
+
 		/*
 		 * As soon as we encounter a block of a new relation, close the old
 		 * relation. Note that rel will be NULL if try_relation_open failed
@@ -513,7 +584,10 @@ autoprewarm_database_main(Datum main_arg)
 			continue;
 		}
 
-		/* Once per fork, check for fork existence and size. */
+		/*
+		 * Once per fork, check for fork existence and size. Then create read
+		 * stream if it is suitable.
+		 */
 		if (old_blk == NULL ||
 			old_blk->filenumber != blk->filenumber ||
 			old_blk->forknum != blk->forknum)
@@ -525,7 +599,21 @@ autoprewarm_database_main(Datum main_arg)
 			if (blk->forknum > InvalidForkNumber &&
 				blk->forknum <= MAX_FORKNUM &&
 				smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
 				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
+
+				/* Create read stream. */
+				p.nblocks_in_fork = nblocks;
+				p.pos = pos;
+				p.first_block = true;
+				stream = read_stream_begin_relation(READ_STREAM_FULL,
+													NULL,
+													rel,
+													blk->forknum,
+													apw_read_stream_next_block,
+													&p,
+													sizeof(bool));
+			}
 			else
 				nblocks = 0;
 		}
@@ -539,16 +627,20 @@ autoprewarm_database_main(Datum main_arg)
 		}
 
 		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
+		buf = read_stream_next_buffer(stream, (void **) &rs_have_free_buffer);
 		if (BufferIsValid(buf))
 		{
 			apw_state->prewarmed_blocks++;
 			ReleaseBuffer(buf);
 		}
+		/* There are no free buffers left in shared buffers, break the loop. */
+		else if (!(*rs_have_free_buffer))
+			break;
 
 		old_blk = blk;
 	}
+	Assert(read_stream_next_buffer(stream, (void **) &rs_have_free_buffer) == InvalidBuffer);
+	read_stream_end(stream);
 
 	dsm_detach(seg);
 
-- 
2.45.2

Reply via email to