On Mon, Mar 17, 2025 at 9:47 AM Matheus Alcantara
<matheusssil...@gmail.com> wrote:
>
> Sorry for the delay, attached v4 with the remaining fixes.

Thanks for the patch.

I started reviewing this with the intent to commit it. But, I decided
while studying it that I want to separate the SKIP_PAGES_NONE case and
the other cases into two callbacks. I think it is easier to read the
skip pages callback this way. The SKIP_PAGES_NONE case is just read
all blocks in the range, so we can use the existing default callback,
block_range_read_cb(). Then the callback for the
SKIP_PAGES_ALL_VISIBLE and SKIP_PAGES_ALL_FROZEN options can be clear
and simple.

I've attached two versions with this proposed structure.

amcheck-readsteram-1callback.patch implements this with one callback
and has the amcheck specific callback private data struct subclass
BlockRangeReadStreamPrivate (I called it
heapamcheck_rs_perblock_data).

amcheck-readstream-2callbacks.patch wraps block_range_read_cb() in an
amcheck specific callback and creates a BlockRangeReadStreamPrivate
and fills it in from the heapamcheck_rs_perblock_data to pass as
callback_private_data. Because this version is more explicit, it is
more safe. We don't have any type checking facilities that will alert
us if someone adds a member above the BlockRangeReadStreamPrivate in
heapamcheck_rs_perblock_data. But, I'm open to feedback.

- Melanie
From c28f070d76d713c9547a679f9d6f09c6c9d531fb Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths....@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH] Use read stream on amcheck

ci-os-only:
---
 contrib/amcheck/verify_heapam.c | 114 +++++++++++++++++++++++++-------
 1 file changed, 90 insertions(+), 24 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..caef0a3989a 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
@@ -185,6 +186,58 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
 										 HeapCheckContext *ctx,
 										 XidCommitStatus *status);
 
+typedef struct heapamcheck_rs_perblock_data
+{
+	BlockNumber last_exclusive;
+	BlockNumber current_blocknum;
+	SkipPages	skip_option;
+	Relation	rel;
+	Buffer	   *vmbuffer;
+}			heapamcheck_rs_perblock_data;
+
+static BlockNumber
+heapamcheck_rs_next_block_noskips(ReadStream *stream,
+								  void *callback_private_data,
+								  void *per_buffer_data)
+{
+	heapamcheck_rs_perblock_data *p = callback_private_data;
+	BlockRangeReadStreamPrivate range = {
+		.current_blocknum = p->current_blocknum,
+		.last_exclusive = p->last_exclusive
+	};
+
+	return block_range_read_stream_cb(stream, (void *) &range, NULL);
+}
+
+static BlockNumber
+heapamcheck_rs_next_block_skips(ReadStream *stream,
+								void *callback_private_data,
+								void *per_buffer_data)
+{
+	heapamcheck_rs_perblock_data *p = callback_private_data;
+
+	for (BlockNumber i; (i = p->current_blocknum++) < p->last_exclusive;)
+	{
+		int32		mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+		if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+		{
+			if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+				continue;
+		}
+
+		if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+		{
+			if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+				continue;
+		}
+
+		return i;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * Scan and report corruption in heap pages, optionally reconciling toasted
  * attributes with entries in the associated toast table.  Intended to be
@@ -231,6 +284,10 @@ verify_heapam(PG_FUNCTION_ARGS)
 	BlockNumber last_block;
 	BlockNumber nblocks;
 	const char *skip;
+	ReadStream *stream;
+	int			read_stream_flags;
+	ReadStreamBlockNumberCB cb;
+	heapamcheck_rs_perblock_data rsdata;
 
 	/* Check supplied arguments */
 	if (PG_ARGISNULL(0))
@@ -404,7 +461,32 @@ verify_heapam(PG_FUNCTION_ARGS)
 	if (TransactionIdIsNormal(ctx.relfrozenxid))
 		ctx.oldest_xid = ctx.relfrozenxid;
 
-	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+	rsdata.current_blocknum = first_block;
+	rsdata.last_exclusive = last_block + 1;
+	rsdata.skip_option = skip_option;
+	rsdata.rel = ctx.rel;
+	rsdata.vmbuffer = &vmbuffer;
+
+	if (skip_option == SKIP_PAGES_NONE)
+	{
+		cb = heapamcheck_rs_next_block_noskips;
+		read_stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+	}
+	else
+	{
+		cb = heapamcheck_rs_next_block_skips;
+		read_stream_flags = READ_STREAM_DEFAULT;
+	}
+
+	stream = read_stream_begin_relation(read_stream_flags,
+										ctx.bstrategy,
+										ctx.rel,
+										MAIN_FORKNUM,
+										cb,
+										&rsdata,
+										0);
+
+	while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 	{
 		OffsetNumber maxoff;
 		OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +499,11 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 		memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
 
-		/* Optionally skip over all-frozen or all-visible blocks */
-		if (skip_option != SKIP_PAGES_NONE)
-		{
-			int32		mapbits;
-
-			mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
-													   &vmbuffer);
-			if (skip_option == SKIP_PAGES_ALL_FROZEN)
-			{
-				if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
-					continue;
-			}
-
-			if (skip_option == SKIP_PAGES_ALL_VISIBLE)
-			{
-				if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
-					continue;
-			}
-		}
-
-		/* Read and lock the next page. */
-		ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
-										RBM_NORMAL, ctx.bstrategy);
+		/* Lock the next page. */
+		Assert(BufferIsValid(ctx.buffer));
 		LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+		ctx.blkno = BufferGetBlockNumber(ctx.buffer);
 		ctx.page = BufferGetPage(ctx.buffer);
 
 		/* Perform tuple checks */
@@ -798,6 +861,9 @@ verify_heapam(PG_FUNCTION_ARGS)
 		if (on_error_stop && ctx.is_corrupt)
 			break;
 	}
+	/* Ensure that the stream is completely read */
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
 
 	if (vmbuffer != InvalidBuffer)
 		ReleaseBuffer(vmbuffer);
-- 
2.34.1

From f4e0c14631a11d60e5b518db63c232ec5fcc0048 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths....@pm.me>
Date: Fri, 29 Nov 2024 18:52:43 -0300
Subject: [PATCH] Use read stream on amcheck

ci-os-only:
---
 contrib/amcheck/verify_heapam.c | 99 +++++++++++++++++++++++++--------
 1 file changed, 75 insertions(+), 24 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 827312306f6..2b323b6f4e4 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -25,6 +25,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
@@ -185,6 +186,43 @@ static XidBoundsViolation get_xid_status(TransactionId xid,
 										 HeapCheckContext *ctx,
 										 XidCommitStatus *status);
 
+typedef struct heapamcheck_rs_perblock_data
+{
+	BlockRangeReadStreamPrivate range;
+	SkipPages	skip_option;
+	Relation	rel;
+	Buffer	   *vmbuffer;
+} heapamcheck_rs_perblock_data;
+
+static BlockNumber
+heapam_read_stream_next_block(ReadStream *stream,
+							  void *callback_private_data,
+							  void *per_buffer_data)
+{
+	heapamcheck_rs_perblock_data *p = callback_private_data;
+
+	for (BlockNumber i; (i = p->range.current_blocknum++) < p->range.last_exclusive;)
+	{
+		int32		mapbits = visibilitymap_get_status(p->rel, i, p->vmbuffer);
+
+		if (p->skip_option == SKIP_PAGES_ALL_FROZEN)
+		{
+			if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
+				continue;
+		}
+
+		if (p->skip_option == SKIP_PAGES_ALL_VISIBLE)
+		{
+			if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
+				continue;
+		}
+
+		return i;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * Scan and report corruption in heap pages, optionally reconciling toasted
  * attributes with entries in the associated toast table.  Intended to be
@@ -231,6 +269,10 @@ verify_heapam(PG_FUNCTION_ARGS)
 	BlockNumber last_block;
 	BlockNumber nblocks;
 	const char *skip;
+	ReadStream *stream;
+	int			read_stream_flags;
+	ReadStreamBlockNumberCB cb;
+	heapamcheck_rs_perblock_data rsdata;
 
 	/* Check supplied arguments */
 	if (PG_ARGISNULL(0))
@@ -404,7 +446,32 @@ verify_heapam(PG_FUNCTION_ARGS)
 	if (TransactionIdIsNormal(ctx.relfrozenxid))
 		ctx.oldest_xid = ctx.relfrozenxid;
 
-	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
+	rsdata.range.current_blocknum = first_block;
+	rsdata.range.last_exclusive = last_block + 1;
+	rsdata.skip_option = skip_option;
+	rsdata.rel = ctx.rel;
+	rsdata.vmbuffer = &vmbuffer;
+
+	if (skip_option == SKIP_PAGES_NONE)
+	{
+		cb = block_range_read_stream_cb;
+		read_stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_FULL;
+	}
+	else
+	{
+		cb = heapam_read_stream_next_block;
+		read_stream_flags = READ_STREAM_DEFAULT;
+	}
+
+	stream = read_stream_begin_relation(read_stream_flags,
+										ctx.bstrategy,
+										ctx.rel,
+										MAIN_FORKNUM,
+										cb,
+										&rsdata,
+										0);
+
+	while ((ctx.buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 	{
 		OffsetNumber maxoff;
 		OffsetNumber predecessor[MaxOffsetNumber];
@@ -417,30 +484,11 @@ verify_heapam(PG_FUNCTION_ARGS)
 
 		memset(predecessor, 0, sizeof(OffsetNumber) * MaxOffsetNumber);
 
-		/* Optionally skip over all-frozen or all-visible blocks */
-		if (skip_option != SKIP_PAGES_NONE)
-		{
-			int32		mapbits;
-
-			mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
-													   &vmbuffer);
-			if (skip_option == SKIP_PAGES_ALL_FROZEN)
-			{
-				if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
-					continue;
-			}
-
-			if (skip_option == SKIP_PAGES_ALL_VISIBLE)
-			{
-				if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
-					continue;
-			}
-		}
-
-		/* Read and lock the next page. */
-		ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
-										RBM_NORMAL, ctx.bstrategy);
+		/* Lock the next page. */
+		Assert(BufferIsValid(ctx.buffer));
 		LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
+
+		ctx.blkno = BufferGetBlockNumber(ctx.buffer);
 		ctx.page = BufferGetPage(ctx.buffer);
 
 		/* Perform tuple checks */
@@ -798,6 +846,9 @@ verify_heapam(PG_FUNCTION_ARGS)
 		if (on_error_stop && ctx.is_corrupt)
 			break;
 	}
+	/* Ensure that the stream is completely read */
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
 
 	if (vmbuffer != InvalidBuffer)
 		ReleaseBuffer(vmbuffer);
-- 
2.34.1

Reply via email to