On Fri, Feb 28, 2025 at 11:58 AM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> On Thu, Feb 27, 2025 at 1:08 PM Tom Lane <t...@sss.pgh.pa.us> wrote:
> > I wonder if it'd be a good idea to add something like
> >
> >                 Assert(stream->distance == 1);
> >                 Assert(stream->pending_read_nblocks == 0);
> >                 Assert(stream->per_buffer_data_size == 0);
> > +               Assert(per_buffer_data == NULL);
> >
> > in read_stream_next_buffer.  I doubt that this will shut Coverity
> > up, but it would help to catch caller coding errors, i.e. passing
> > a per_buffer_data pointer when there's no per-buffer data.
>
> I think this is a good stopgap. I was discussing adding this assert
> off-list with Thomas and he wanted to detail his more ambitious plans
> for type safety improvements in the read stream API. Less on the order
> of a redesign and more like a separate read_stream_next_buffer()s for
> when there is per buffer data and when there isn't. And a by-value and
> by-reference version for the one where there is data.

Here's what I had in mind.  Is it better?
From 68e9424b590051959142917459eb4ea074589b79 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 28 Feb 2025 10:48:29 +1300
Subject: [PATCH 1/2] Improve API for retrieving data from read streams.

Dealing with the per_buffer_data argument to read_stream_next_buffer()
has proven a bit clunky.  Provide some new wrapper functions/macros:

    buffer = read_stream_get_buffer(rs);
    buffer = read_stream_get_buffer_and_value(rs, &my_int);
    buffer = read_stream_get_buffer_and_pointer(rs, &my_pointer_to_int);

These improve readability and type safety via assertions.
---
 contrib/pg_prewarm/pg_prewarm.c          |  4 +-
 contrib/pg_visibility/pg_visibility.c    |  6 +--
 src/backend/access/heap/heapam.c         |  2 +-
 src/backend/access/heap/heapam_handler.c |  2 +-
 src/backend/access/heap/vacuumlazy.c     |  6 +--
 src/backend/storage/aio/read_stream.c    | 12 +++++
 src/backend/storage/buffer/bufmgr.c      |  4 +-
 src/include/storage/read_stream.h        | 64 ++++++++++++++++++++++++
 8 files changed, 87 insertions(+), 13 deletions(-)

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index a2f0ac4af0c..f6ae266d7b0 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -208,11 +208,11 @@ pg_prewarm(PG_FUNCTION_ARGS)
 			Buffer		buf;
 
 			CHECK_FOR_INTERRUPTS();
-			buf = read_stream_next_buffer(stream, NULL);
+			buf = read_stream_get_buffer(stream);
 			ReleaseBuffer(buf);
 			++blocks_done;
 		}
-		Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+		Assert(read_stream_get_buffer(stream) == InvalidBuffer);
 		read_stream_end(stream);
 	}
 
diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 7f268a18a74..e7187a46c9d 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -556,7 +556,7 @@ collect_visibility_data(Oid relid, bool include_pd)
 			Buffer		buffer;
 			Page		page;
 
-			buffer = read_stream_next_buffer(stream, NULL);
+			buffer = read_stream_get_buffer(stream);
 			LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
 			page = BufferGetPage(buffer);
@@ -569,7 +569,7 @@ collect_visibility_data(Oid relid, bool include_pd)
 
 	if (include_pd)
 	{
-		Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+		Assert(read_stream_get_buffer(stream) == InvalidBuffer);
 		read_stream_end(stream);
 	}
 
@@ -752,7 +752,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 										0);
 
 	/* Loop over every block in the relation. */
-	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+	while ((buffer = read_stream_get_buffer(stream)) != InvalidBuffer)
 	{
 		bool		check_frozen = all_frozen;
 		bool		check_visible = all_visible;
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index fa7935a0ed3..86f280069e0 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -609,7 +609,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 
 	scan->rs_dir = dir;
 
-	scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+	scan->rs_cbuf = read_stream_get_buffer(scan->rs_read_stream);
 	if (BufferIsValid(scan->rs_cbuf))
 		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index e78682c3cef..7487896b06c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1010,7 +1010,7 @@ heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
 	 * re-acquire sharelock for each tuple, but since we aren't doing much
 	 * work per tuple, the extra lock traffic is probably better avoided.
 	 */
-	hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
+	hscan->rs_cbuf = read_stream_get_buffer(stream);
 	if (!BufferIsValid(hscan->rs_cbuf))
 		return false;
 
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 1af18a78a2b..ac7a4d8c21d 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1230,7 +1230,6 @@ lazy_scan_heap(LVRelState *vacrel)
 		Page		page;
 		uint8		blk_info = 0;
 		bool		has_lpdead_items;
-		void	   *per_buffer_data = NULL;
 		bool		vm_page_frozen = false;
 		bool		got_cleanup_lock = false;
 
@@ -1287,13 +1286,12 @@ lazy_scan_heap(LVRelState *vacrel)
 										 PROGRESS_VACUUM_PHASE_SCAN_HEAP);
 		}
 
-		buf = read_stream_next_buffer(stream, &per_buffer_data);
+		buf = read_stream_get_buffer_and_value(stream, &blk_info);
 
 		/* The relation is exhausted. */
 		if (!BufferIsValid(buf))
 			break;
 
-		blk_info = *((uint8 *) per_buffer_data);
 		CheckBufferIsPinnedOnce(buf);
 		page = BufferGetPage(buf);
 		blkno = BufferGetBlockNumber(buf);
@@ -2740,7 +2738,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
 		vacuum_delay_point(false);
 
-		buf = read_stream_next_buffer(stream, (void **) &iter_result);
+		buf = read_stream_get_buffer_and_pointer(stream, &iter_result);
 
 		/* The relation is exhausted */
 		if (!BufferIsValid(buf))
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 04bdb5e6d4b..0f1332c46f6 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -615,6 +615,9 @@ read_stream_begin_smgr_relation(int flags,
  * valid until the next call to read_stream_next_buffer().  When the stream
  * runs out of data, InvalidBuffer is returned.  The caller may decide to end
  * the stream early at any time by calling read_stream_end().
+ *
+ * See read_stream.h for read_stream_get_buffer() and variants that provide
+ * some degree of type safety for the per_buffer_data argument.
  */
 Buffer
 read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
@@ -840,6 +843,15 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 	return read_stream_get_block(stream, NULL);
 }
 
+/*
+ * Return the configured per-buffer data size, for use in assertions.
+ */
+size_t
+read_stream_per_buffer_data_size(ReadStream *stream)
+{
+	return stream->per_buffer_data_size;
+}
+
 /*
  * Reset a read stream by releasing any queued up buffers, allowing the stream
  * to be used again for different blocks.  This can be used to clear an
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7915ed624c1..f4cedd15109 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4690,7 +4690,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 		CHECK_FOR_INTERRUPTS();
 
 		/* Read block from source relation. */
-		srcBuf = read_stream_next_buffer(src_stream, NULL);
+		srcBuf = read_stream_get_buffer(src_stream);
 		LockBuffer(srcBuf, BUFFER_LOCK_SHARE);
 		srcPage = BufferGetPage(srcBuf);
 
@@ -4715,7 +4715,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator,
 		UnlockReleaseBuffer(dstBuf);
 		UnlockReleaseBuffer(srcBuf);
 	}
-	Assert(read_stream_next_buffer(src_stream, NULL) == InvalidBuffer);
+	Assert(read_stream_get_buffer(src_stream) == InvalidBuffer);
 	read_stream_end(src_stream);
 
 	FreeAccessStrategy(bstrategy_src);
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index c11d8ce3300..68c9340b0e3 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -70,6 +70,7 @@ extern ReadStream *read_stream_begin_relation(int flags,
 extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data);
 extern BlockNumber read_stream_next_block(ReadStream *stream,
 										  BufferAccessStrategy *strategy);
+extern size_t read_stream_per_buffer_data_size(ReadStream *stream);
 extern ReadStream *read_stream_begin_smgr_relation(int flags,
 												   BufferAccessStrategy strategy,
 												   SMgrRelation smgr,
@@ -81,4 +82,67 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
+/*
+ * Get the next buffer from a stream that is not using per-buffer data.
+ */
+static inline Buffer
+read_stream_get_buffer(ReadStream *stream)
+{
+	Assert(read_stream_per_buffer_data_size(stream) == 0);
+	return read_stream_next_buffer(stream, NULL);
+}
+
+/*
+ * Inlinable helper for read_stream_get_buffer_and_value() macro.
+ */
+static inline Buffer
+read_stream_get_buffer_and_value_with_size(ReadStream *stream,
+										   void *output_data,
+										   size_t output_data_size)
+{
+	Buffer		buffer;
+	void	   *per_buffer_data;
+
+	Assert(read_stream_per_buffer_data_size(stream) == output_data_size);
+	buffer = read_stream_next_buffer(stream, &per_buffer_data);
+	if (buffer != InvalidBuffer)
+		memcpy(output_data, per_buffer_data, output_data_size);
+
+	return buffer;
+}
+
+/*
+ * Get the next buffer and a copy of the associated per-buffer data.
+ * InvalidBuffer means end-of-stream, and in that case the per-buffer data is
+ * undefined.  Example of use:
+ *
+ * int my_int;
+ *
+ * buf = read_stream_get_buffer_and_value(stream, &my_int);
+ */
+#define read_stream_get_buffer_and_value(stream, vp) \
+	read_stream_get_buffer_and_value_with_size((stream), (vp), sizeof(*(vp)))
+
+/*
+ * Get the next buffer and a pointer to the associated per-buffer data.  This
+ * avoids casts, while still checking that we have the expected level of
+ * indirection.  InvalidBuffer means end-of-stream, and in that case the output
+ * pointer is undefined.  Otherwise the output pointer should only be
+ * dereferenced up until the next call.  For example:
+ *
+ * int *my_int_p;
+ *
+ * buf = read_stream_get_buffer_and_pointer(stream, &my_int_p);
+ */
+#if HAVE__BUILTIN_TYPES_COMPATIBLE_P
+#define read_stream_get_buffer_and_pointer(stream, pointer) \
+	(StaticAssertExpr(!__builtin_types_compatible_p(__typeof__(**(pointer)), \
+													void), \
+					  "expected pointer to pointer to non-void"), \
+	 read_stream_next_buffer((stream), ((void **) (pointer))))
+#else
+#define read_stream_get_buffer_and_pointer(stream, pointer) \
+	read_stream_next_buffer((stream), ((void **) (pointer)))
+#endif
+
 #endif							/* READ_STREAM_H */
-- 
2.48.1

From 02bb13b80abc20cd8221287b4c908f9c7c0dde23 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 28 Feb 2025 12:53:23 +1300
Subject: [PATCH 2/2] Improve API for storing data in read streams.

Read stream callbacks receive a void pointer into the per-buffer data
queue so that can store data there for later retrieval by the buffer
consumer.  We can improve readability and safety a bit by changing
cast-and-assign or raw memcpy() to:

	read_stream_put_value(stream, per_buffer_data, my_int);

This form infers the size and asserts that the storage space matches,
generally mirroring the read_stream_get_buffer_and_value() call used for
retrieving the streamed data later.
---
 src/backend/access/heap/vacuumlazy.c | 6 +++---
 src/include/storage/read_stream.h    | 9 +++++++++
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index ac7a4d8c21d..9563906fb27 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1612,7 +1612,7 @@ heap_vac_scan_next_block(ReadStream *stream,
 		 */
 		vacrel->current_block = next_block;
 		blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM;
-		*((uint8 *) per_buffer_data) = blk_info;
+		read_stream_put_value(stream, per_buffer_data, blk_info);
 		return vacrel->current_block;
 	}
 	else
@@ -1628,7 +1628,7 @@ heap_vac_scan_next_block(ReadStream *stream,
 			blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM;
 		if (vacrel->next_unskippable_eager_scanned)
 			blk_info |= VAC_BLK_WAS_EAGER_SCANNED;
-		*((uint8 *) per_buffer_data) = blk_info;
+		read_stream_put_value(stream, per_buffer_data, blk_info);
 		return vacrel->current_block;
 	}
 }
@@ -2671,7 +2671,7 @@ vacuum_reap_lp_read_stream_next(ReadStream *stream,
 	 * Save the TidStoreIterResult for later, so we can extract the offsets.
 	 * It is safe to copy the result, according to TidStoreIterateNext().
 	 */
-	memcpy(per_buffer_data, iter_result, sizeof(*iter_result));
+	read_stream_put_value(stream, per_buffer_data, *iter_result);
 
 	return iter_result->blkno;
 }
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 68c9340b0e3..673dd75ccaf 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -145,4 +145,13 @@ read_stream_get_buffer_and_value_with_size(ReadStream *stream,
 	read_stream_next_buffer((stream), ((void **) (pointer)))
 #endif
 
+/*
+ * Set the per-buffer data by value.  This can be called from inside a
+ * callback that is returning block numbers.  It asserts that the value's size
+ * matches the available space.
+ */
+#define read_stream_put_value(stream, per_buffer_data, value) \
+	(AssertMacro(sizeof(value) == read_stream_per_buffer_data_size(stream)), \
+	 memcpy((per_buffer_data), &(value), sizeof(value)))
+
 #endif							/* READ_STREAM_H */
-- 
2.48.1

Reply via email to