Hi,

Thank you for the review!

On Fri, 12 Jul 2024 at 02:52, Noah Misch <n...@leadboat.com> wrote:
>
> On Tue, Apr 16, 2024 at 02:12:19PM +0300, Nazir Bilal Yavuz wrote:
> > I am working on using read streams in the CREATE DATABASE command when the
> > strategy is wal_log. RelationCopyStorageUsingBuffer() function is used in
> > this context. This function reads source buffers then copies them to the
>
> Please rebase.  I applied this to 40126ac for review purposes.

Rebased.

> > Subject: [PATCH v1 1/3] Refactor PinBufferForBlock() to remove if checks 
> > about
> >  persistence
> >
> > There are if checks in PinBufferForBlock() function to set persistence
> > of the relation and this function is called for the each block in the
> > relation. Instead of that, set persistence of the relation before
> > PinBufferForBlock() function.
>
> I tried with the following additional patch to see if PinBufferForBlock() ever
> gets invalid smgr_relpersistence:
>
> ====
> --- a/src/backend/storage/buffer/bufmgr.c
> +++ b/src/backend/storage/buffer/bufmgr.c
> @@ -1098,6 +1098,11 @@ PinBufferForBlock(Relation rel,
>
>         Assert(blockNum != P_NEW);
>
> +       if (!(smgr_persistence == RELPERSISTENCE_TEMP ||
> +                 smgr_persistence == RELPERSISTENCE_PERMANENT ||
> +                 smgr_persistence == RELPERSISTENCE_UNLOGGED))
> +               elog(WARNING, "unexpected relpersistence %d", 
> smgr_persistence);
> +
>         if (smgr_persistence == RELPERSISTENCE_TEMP)
>         {
>                 io_context = IOCONTEXT_NORMAL;
> ====
>
> That still gets relpersistence==0 in various src/test/regress cases.  I think
> the intent was to prevent that.  If not, please add a comment about when
> relpersistence==0 is still allowed.

I fixed it, it is caused by (mode == RBM_ZERO_AND_CLEANUP_LOCK | mode
== RBM_ZERO_AND_LOCK) case in the ReadBuffer_common(). The persistence
was not updated for this path before. I also added an assert check for
this problem to PinBufferForBlock().

> > --- a/src/backend/storage/aio/read_stream.c
> > +++ b/src/backend/storage/aio/read_stream.c
> > @@ -549,7 +549,7 @@ read_stream_begin_relation(int flags,
> >       {
> >               stream->ios[i].op.rel = rel;
> >               stream->ios[i].op.smgr = RelationGetSmgr(rel);
> > -             stream->ios[i].op.smgr_persistence = 0;
> > +             stream->ios[i].op.smgr_persistence = 
> > rel->rd_rel->relpersistence;
>
> Does the following comment in ReadBuffersOperation need an update?
>
>         /*
>          * The following members should be set by the caller.  If only smgr is
>          * provided without rel, then smgr_persistence can be set to override 
> the
>          * default assumption of RELPERSISTENCE_PERMANENT.
>          */
>

I believe it does not need to be updated but I renamed
'ReadBuffersOperation.smgr_persistence' as
'ReadBuffersOperation.persistence'. So, this comment is updated as
well. I think that rename suits better because persistence does not
need to come from smgr, it could come from relation, too. Do you think
it is a good idea? If it is, does it need a separate commit?

> > --- a/src/backend/storage/buffer/bufmgr.c
> > +++ b/src/backend/storage/buffer/bufmgr.c
>
> > +/*
> > + * Helper struct for read stream object used in
> > + * RelationCopyStorageUsingBuffer() function.
> > + */
> > +struct copy_storage_using_buffer_read_stream_private
> > +{
> > +     BlockNumber blocknum;
> > +     int64           last_block;
> > +};
>
> Why is last_block an int64, not a BlockNumber?

You are right, the type of last_block should be BlockNumber; done. I
copied it from pg_prewarm_read_stream_private struct and I guess the
same should be applied to it as well but it is not the topic of this
thread, so I did not update it yet.

> > @@ -4667,19 +4698,31 @@ RelationCopyStorageUsingBuffer(RelFileLocator 
> > srclocator,
>
> >       /* Iterate over each block of the source relation file. */
> >       for (blkno = 0; blkno < nblocks; blkno++)
> >       {
> >               CHECK_FOR_INTERRUPTS();
> >
> >               /* Read block from source relation. */
> > -             srcBuf = ReadBufferWithoutRelcache(srclocator, forkNum, blkno,
> > -                                                                           
> >      RBM_NORMAL, bstrategy_src,
> > -                                                                           
> >      permanent);
> > +             srcBuf = read_stream_next_buffer(src_stream, NULL);
> >               LockBuffer(srcBuf, BUFFER_LOCK_SHARE);
>
> I think this should check for read_stream_next_buffer() returning
> InvalidBuffer.  pg_prewarm doesn't, but the other callers do, and I think the
> other callers are a better model.  LockBuffer() doesn't check the
> InvalidBuffer case, so let's avoid the style of using a
> read_stream_next_buffer() return value without checking.

There is an assert in the LockBuffer which checks for the
InvalidBuffer. If that is not enough, we may add an if check for
InvalidBuffer but what should we do in this case? It should not
happen, so erroring out may be a good idea.

Updated patches are attached (without InvalidBuffer check for now).

--
Regards,
Nazir Bilal Yavuz
Microsoft
From 977cea9451e602d8d3d5e4f0cf3cd7dfea11879e Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Sun, 7 Apr 2024 22:33:36 +0300
Subject: [PATCH v2 1/3] Refactor PinBufferForBlock() to remove if checks about
 persistence

There are if checks in PinBufferForBlock() function to set persistence
of the relation and this function is called for the each block in the
relation. Instead of that, set persistence of the relation before
PinBufferForBlock() function.
---
 src/include/storage/bufmgr.h          |  4 ++--
 src/backend/storage/aio/read_stream.c |  2 +-
 src/backend/storage/buffer/bufmgr.c   | 32 +++++++++++++--------------
 3 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a1e71013d32..ba9a1a289a9 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -116,12 +116,12 @@ struct ReadBuffersOperation
 {
 	/*
 	 * The following members should be set by the caller.  If only smgr is
-	 * provided without rel, then smgr_persistence can be set to override the
+	 * provided without rel, then persistence can be set to override the
 	 * default assumption of RELPERSISTENCE_PERMANENT.
 	 */
 	Relation	rel;
 	struct SMgrRelationData *smgr;
-	char		smgr_persistence;
+	char		persistence;
 	ForkNumber	forknum;
 	BufferAccessStrategy strategy;
 
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 74b9bae6313..58221649f27 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -551,7 +551,7 @@ read_stream_begin_relation(int flags,
 	{
 		stream->ios[i].op.rel = rel;
 		stream->ios[i].op.smgr = RelationGetSmgr(rel);
-		stream->ios[i].op.smgr_persistence = 0;
+		stream->ios[i].op.persistence = rel->rd_rel->relpersistence;
 		stream->ios[i].op.forknum = forknum;
 		stream->ios[i].op.strategy = strategy;
 	}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 61816730955..7cbcdc63a6f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1104,7 +1104,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 static pg_attribute_always_inline Buffer
 PinBufferForBlock(Relation rel,
 				  SMgrRelation smgr,
-				  char smgr_persistence,
+				  char persistence,
 				  ForkNumber forkNum,
 				  BlockNumber blockNum,
 				  BufferAccessStrategy strategy,
@@ -1113,22 +1113,12 @@ PinBufferForBlock(Relation rel,
 	BufferDesc *bufHdr;
 	IOContext	io_context;
 	IOObject	io_object;
-	char		persistence;
 
 	Assert(blockNum != P_NEW);
 
-	/*
-	 * If there is no Relation it usually implies recovery and thus permanent,
-	 * but we take an argument because CreateAndCopyRelationData can reach us
-	 * with only an SMgrRelation for an unlogged relation that we don't want
-	 * to flag with BM_PERMANENT.
-	 */
-	if (rel)
-		persistence = rel->rd_rel->relpersistence;
-	else if (smgr_persistence == 0)
-		persistence = RELPERSISTENCE_PERMANENT;
-	else
-		persistence = smgr_persistence;
+	Assert((persistence == RELPERSISTENCE_TEMP ||
+			persistence == RELPERSISTENCE_PERMANENT ||
+			persistence == RELPERSISTENCE_UNLOGGED));
 
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1203,6 +1193,7 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 	ReadBuffersOperation operation;
 	Buffer		buffer;
 	int			flags;
+	char		persistence;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1224,12 +1215,19 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags);
 	}
 
+	if (rel)
+		persistence = rel->rd_rel->relpersistence;
+	else if (smgr_persistence == 0)
+		persistence = RELPERSISTENCE_PERMANENT;
+	else
+		persistence = smgr_persistence;
+
 	if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK ||
 				 mode == RBM_ZERO_AND_LOCK))
 	{
 		bool		found;
 
-		buffer = PinBufferForBlock(rel, smgr, smgr_persistence,
+		buffer = PinBufferForBlock(rel, smgr, persistence,
 								   forkNum, blockNum, strategy, &found);
 		ZeroAndLockBuffer(buffer, mode, found);
 		return buffer;
@@ -1241,7 +1239,7 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		flags = 0;
 	operation.smgr = smgr;
 	operation.rel = rel;
-	operation.smgr_persistence = smgr_persistence;
+	operation.persistence = persistence;
 	operation.forknum = forkNum;
 	operation.strategy = strategy;
 	if (StartReadBuffer(&operation,
@@ -1272,7 +1270,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 
 		buffers[i] = PinBufferForBlock(operation->rel,
 									   operation->smgr,
-									   operation->smgr_persistence,
+									   operation->persistence,
 									   operation->forknum,
 									   blockNum + i,
 									   operation->strategy,
-- 
2.45.2

From 53c2bd12d71e521d975002fb03ef53b74389ccb7 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Mon, 8 Apr 2024 00:14:52 +0300
Subject: [PATCH v2 2/3] Add a way to create read stream object by using
 SMgrRelation

Currently read stream object can be created only by using Relation,
there is no way to create it by using SMgrRelation.

To achieve that, read_stream_begin_impl() function is created and
contents of the read_stream_begin_relation() is moved into this
function.

Both read_stream_begin_relation() and read_stream_begin_smgr_relation()
calls read_stream_begin_impl() by Relation and SMgrRelation
respectively.
---
 src/include/storage/read_stream.h     | 10 ++++
 src/backend/storage/aio/read_stream.c | 83 ++++++++++++++++++++++-----
 2 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index f676d2cc20a..4e599904f26 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -15,6 +15,7 @@
 #define READ_STREAM_H
 
 #include "storage/bufmgr.h"
+#include "storage/smgr.h"
 
 /* Default tuning, reasonable for many users. */
 #define READ_STREAM_DEFAULT 0x00
@@ -57,6 +58,15 @@ extern ReadStream *read_stream_begin_relation(int flags,
 											  void *callback_private_data,
 											  size_t per_buffer_data_size);
 extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data);
+extern ReadStream *read_stream_begin_smgr_relation(int flags,
+												   BufferAccessStrategy strategy,
+												   SMgrRelation smgr,
+												   char smgr_persistence,
+												   ForkNumber forknum,
+												   ReadStreamBlockNumberCB callback,
+												   void *callback_private_data,
+												   size_t per_buffer_data_size);
+extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 58221649f27..cdec918d29d 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -406,14 +406,16 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
  * write extra data for each block into the space provided to it.  It will
  * also receive callback_private_data for its own purposes.
  */
-ReadStream *
-read_stream_begin_relation(int flags,
-						   BufferAccessStrategy strategy,
-						   Relation rel,
-						   ForkNumber forknum,
-						   ReadStreamBlockNumberCB callback,
-						   void *callback_private_data,
-						   size_t per_buffer_data_size)
+static ReadStream *
+read_stream_begin_impl(int flags,
+					   BufferAccessStrategy strategy,
+					   Relation rel,
+					   SMgrRelation smgr,
+					   char smgr_persistence,
+					   ForkNumber forknum,
+					   ReadStreamBlockNumberCB callback,
+					   void *callback_private_data,
+					   size_t per_buffer_data_size)
 {
 	ReadStream *stream;
 	size_t		size;
@@ -422,9 +424,6 @@ read_stream_begin_relation(int flags,
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
 	Oid			tablespace_id;
-	SMgrRelation smgr;
-
-	smgr = RelationGetSmgr(rel);
 
 	/*
 	 * Decide how many I/Os we will allow to run at the same time.  That
@@ -434,7 +433,7 @@ read_stream_begin_relation(int flags,
 	 */
 	tablespace_id = smgr->smgr_rlocator.locator.spcOid;
 	if (!OidIsValid(MyDatabaseId) ||
-		IsCatalogRelation(rel) ||
+		(rel && IsCatalogRelation(rel)) ||
 		IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
 	{
 		/*
@@ -550,8 +549,8 @@ read_stream_begin_relation(int flags,
 	for (int i = 0; i < max_ios; ++i)
 	{
 		stream->ios[i].op.rel = rel;
-		stream->ios[i].op.smgr = RelationGetSmgr(rel);
-		stream->ios[i].op.persistence = rel->rd_rel->relpersistence;
+		stream->ios[i].op.smgr = smgr;
+		stream->ios[i].op.persistence = smgr_persistence;
 		stream->ios[i].op.forknum = forknum;
 		stream->ios[i].op.strategy = strategy;
 	}
@@ -559,6 +558,62 @@ read_stream_begin_relation(int flags,
 	return stream;
 }
 
+/*
+ * Create a new read stream for reading a relation.
+ * See read_stream_begin_impl() for the detailed explanation.
+ */
+ReadStream *
+read_stream_begin_relation(int flags,
+						   BufferAccessStrategy strategy,
+						   Relation rel,
+						   ForkNumber forknum,
+						   ReadStreamBlockNumberCB callback,
+						   void *callback_private_data,
+						   size_t per_buffer_data_size)
+{
+	return read_stream_begin_impl(flags,
+								  strategy,
+								  rel,
+								  RelationGetSmgr(rel),
+								  rel->rd_rel->relpersistence,
+								  forknum,
+								  callback,
+								  callback_private_data,
+								  per_buffer_data_size);
+}
+
+/*
+ * Create a new read stream for reading a SMgr relation.
+ * See read_stream_begin_impl() for the detailed explanation.
+ */
+ReadStream *
+read_stream_begin_smgr_relation(int flags,
+								BufferAccessStrategy strategy,
+								SMgrRelation smgr,
+								char smgr_persistence,
+								ForkNumber forknum,
+								ReadStreamBlockNumberCB callback,
+								void *callback_private_data,
+								size_t per_buffer_data_size)
+{
+	char		persistence;
+
+	if (smgr_persistence == 0)
+		persistence = RELPERSISTENCE_PERMANENT;
+	else
+		persistence = smgr_persistence;
+
+	return read_stream_begin_impl(flags,
+								  strategy,
+								  NULL,
+								  smgr,
+								  persistence,
+								  forknum,
+								  callback,
+								  callback_private_data,
+								  per_buffer_data_size);
+}
+
 /*
  * Pull one pinned buffer out of a stream.  Each call returns successive
  * blocks in the order specified by the callback.  If per_buffer_data_size was
-- 
2.45.2

From 31a2d8a7eef71bcd56804535f4e8afa90c8e86d3 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Tue, 16 Jul 2024 13:22:15 +0300
Subject: [PATCH v2 3/3] Use read streams in CREATE DATABASE when strategy is
 wal_log

CREATE DABASE command uses RelationCopyStorageUsingBuffer() function to
copy buffers when the strategy is wal_log. This function reads source
buffers then copies them to the destination buffers. Read streams are
used only only while reading source buffers because the destination
buffer is read by 'RBM_ZERO_AND_LOCK' option, so it is not important.
---
 src/backend/storage/buffer/bufmgr.c | 53 ++++++++++++++++++++++++++---
 1 file changed, 49 insertions(+), 4 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7cbcdc63a6f..2acc109c4db 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -54,6 +54,7 @@
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "storage/standby.h"
 #include "utils/memdebug.h"
@@ -135,6 +136,33 @@ typedef struct SMgrSortArray
 	SMgrRelation srel;
 } SMgrSortArray;
 
+/*
+ * Helper struct for read stream object used in
+ * RelationCopyStorageUsingBuffer() function.
+ */
+struct copy_storage_using_buffer_read_stream_private
+{
+	BlockNumber blocknum;
+	BlockNumber last_block;
+};
+
+/*
+ * Callback function to get next block for read stream object used in
+ * RelationCopyStorageUsingBuffer() function.
+ */
+static BlockNumber
+copy_storage_using_buffer_read_stream_next_block(ReadStream *stream,
+												 void *callback_private_data,
+												 void *per_buffer_data)
+{
+	struct copy_storage_using_buffer_read_stream_private *p = callback_private_data;
+
+	if (p->blocknum < p->last_block)
+		return p->blocknum++;
+
+	return InvalidBlockNumber;
+}
+
 /* GUC variables */
 bool		zero_damaged_pages = false;
 int			bgwriter_lru_maxpages = 100;
@@ -4688,6 +4716,9 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 	PGIOAlignedBlock buf;
 	BufferAccessStrategy bstrategy_src;
 	BufferAccessStrategy bstrategy_dst;
+	struct copy_storage_using_buffer_read_stream_private p;
+	ReadStream *src_stream;
+	SMgrRelation src_smgr;
 
 	/*
 	 * In general, we want to write WAL whenever wal_level > 'minimal', but we
@@ -4716,19 +4747,31 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 	bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
 	bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
 
+	/* Initalize streaming read */
+	p.blocknum = 0;
+	p.last_block = nblocks;
+	src_smgr = smgropen(srclocator, INVALID_PROC_NUMBER);
+	src_stream = read_stream_begin_smgr_relation(READ_STREAM_FULL,
+												 bstrategy_src,
+												 src_smgr,
+												 permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED,
+												 forkNum,
+												 copy_storage_using_buffer_read_stream_next_block,
+												 &p,
+												 0);
+
 	/* Iterate over each block of the source relation file. */
 	for (blkno = 0; blkno < nblocks; blkno++)
 	{
 		CHECK_FOR_INTERRUPTS();
 
 		/* Read block from source relation. */
-		srcBuf = ReadBufferWithoutRelcache(srclocator, forkNum, blkno,
-										   RBM_NORMAL, bstrategy_src,
-										   permanent);
+		srcBuf = read_stream_next_buffer(src_stream, NULL);
 		LockBuffer(srcBuf, BUFFER_LOCK_SHARE);
 		srcPage = BufferGetPage(srcBuf);
 
-		dstBuf = ReadBufferWithoutRelcache(dstlocator, forkNum, blkno,
+		dstBuf = ReadBufferWithoutRelcache(dstlocator, forkNum,
+										   BufferGetBlockNumber(srcBuf),
 										   RBM_ZERO_AND_LOCK, bstrategy_dst,
 										   permanent);
 		dstPage = BufferGetPage(dstBuf);
@@ -4748,6 +4791,8 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 		UnlockReleaseBuffer(dstBuf);
 		UnlockReleaseBuffer(srcBuf);
 	}
+	Assert(read_stream_next_buffer(src_stream, NULL) == InvalidBuffer);
+	read_stream_end(src_stream);
 
 	FreeAccessStrategy(bstrategy_src);
 	FreeAccessStrategy(bstrategy_dst);
-- 
2.45.2

Reply via email to