On Wed, Sep 7, 2022 at 2:49 AM Robert Haas <robertmh...@gmail.com> wrote:

>
> But here's one random idea: add a successor[] array and an lp_valid[]
> array. In the first loop, set lp_valid[offset] = true if it passes the
> check_lp() checks, and set successor[A] = B if A redirects to B or has
> a CTID link to B, without matching xmin/xmax. Then, in a second loop,
> iterate over the successor[] array. If successor[A] = B && lp_valid[A]
> && lp_valid[B], then check whether A.xmax = B.xmin; if so, then
> complain if predecessor[B] is already set, else set predecessor[B] =
> A. Then, in the third loop, iterate over the predecessor array just as
> you're doing now. Then it's clear that we do the lp_valid checks
> exactly once for every offset that might need them, and in order. And
> it's also clear that the predecessor-based checks can never happen
> unless the lp_valid checks passed for both of the offsets involved.
>
>
ok, I have introduced a new approach to first construct a successor array
and then loop over the successor array to construct a predecessor array.

-- 
Regards,
Himanshu Upadhyaya
EnterpriseDB: http://www.enterprisedb.com
From 4a32d7cbdec0d090e99a9a58cb4deb5cb4c03aef Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhy...@enterprisedb.com>
Date: Mon, 19 Sep 2022 13:50:47 +0530
Subject: [PATCH v3] HOT chain validation in verify_heapam

---
 contrib/amcheck/verify_heapam.c | 209 ++++++++++++++++++++++++++++++++
 1 file changed, 209 insertions(+)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index c875f3e5a2..3da4c09665 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -399,6 +399,9 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber] = {0};
+		OffsetNumber successor[MaxOffsetNumber] = {0};
+		bool		lp_valid[MaxOffsetNumber] = {false};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -433,6 +436,8 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextoffnum;
+
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,6 +474,12 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				/*
+				 * make entry in successor array, redirected tuple will be
+				 * validated at the time when we loop over successor array
+				 */
+				successor[ctx.offnum] = rdoffnum;
 				continue;
 			}
 
@@ -504,9 +515,200 @@ verify_heapam(PG_FUNCTION_ARGS)
 			/* It should be safe to examine the tuple's header, at least */
 			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
 			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+			lp_valid[ctx.offnum] = true;
 
 			/* Ok, ready to check this next tuple */
 			check_tuple(&ctx);
+
+			/*
+			 * Now add data to successor array, it will later be used to
+			 * generate the predecessor array. We need to access the tuple's
+			 * header to populate the predecessor array but tuple is not
+			 * necessarily sanity checked yet so delaying construction of
+			 * predecessor array until all tuples are sanity checked.
+			 */
+			nextoffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextoffnum != ctx.offnum)
+			{
+				successor[ctx.offnum] = nextoffnum;
+			}
+		}
+
+		/*
+		 * Loop over offset and populate predecessor array from all entries
+		 * that are present in successor array.
+		 */
+		ctx.attnum = -1;
+		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
+			 ctx.offnum = OffsetNumberNext(ctx.offnum))
+		{
+			ItemId		curr_lp;
+			ItemId		next_lp;
+			HeapTupleHeader curr_htup;
+			HeapTupleHeader next_htup;
+			TransactionId curr_xmax;
+			TransactionId next_xmin;
+			OffsetNumber nextoffnum = successor[ctx.offnum];
+
+			curr_lp = PageGetItemId(ctx.page, ctx.offnum);
+			if (nextoffnum == 0)
+			{
+				/*
+				 * Either last updated tuple in chain or corruption raised for
+				 * this tuple.
+				 */
+				continue;
+			}
+			if (ItemIdIsRedirected(curr_lp) && lp_valid[nextoffnum])
+			{
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);;
+				if (!HeapTupleHeaderIsHeapOnly(next_htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap only tuple",
+											   (unsigned) nextoffnum));
+				}
+				if ((next_htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap updated tuple",
+											   (unsigned) nextoffnum));
+				}
+				continue;
+			}
+
+			/*
+			 * Add line pointer offset to predecessor array. 1) If xmax is
+			 * matching with xmin of next Tuple(reaching via its t_ctid). 2)
+			 * If next tuple is in the same page. Raise corruption if We have
+			 * two tuples having same predecessor.
+			 *
+			 * We add offset to predecessor irrespective of
+			 * transaction(t_xmin) status. We will do validation related to
+			 * transaction status(and also all other validations) when we loop
+			 * over predecessor array.
+			 */
+			if (lp_valid[nextoffnum] && lp_valid[ctx.offnum] &&
+				TransactionIdIsValid(curr_xmax) &&
+				TransactionIdEquals(curr_xmax, next_xmin))
+			{
+				curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+				curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+
+				next_lp = PageGetItemId(ctx.page, nextoffnum);
+				next_htup = (HeapTupleHeader) PageGetItem(ctx.page, next_lp);
+				next_xmin = HeapTupleHeaderGetXmin(next_htup);
+				if (predecessor[nextoffnum] != 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+											   (unsigned) nextoffnum, (unsigned) predecessor[nextoffnum]));
+					continue;
+				}
+				predecessor[nextoffnum] = ctx.offnum;
+				/* Non matching xmax with xmin is not a corruption */
+			}
+
+		}
+
+		/* Loop over offset and validate data in predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			TransactionId curr_xmax;
+			CommandId	pred_cmin;
+			CommandId	curr_cmin;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+
+			if (ctx.offnum == 0)
+			{
+				/*
+				 * Either root of chain or xmin-aborted tuple from an
+				 * abandoned portion of a HOT chain.
+				 */
+				continue;
+			}
+
+			/*
+			 * Validation via predecessor array: 1) If the predecessor’s
+			 * xmin is aborted or in progress, the current tuples xmin should
+			 * be aborted or in progress respectively. Also Both xmin must be
+			 * equal. 2) If the predecessor’s xmin is not frozen, then
+			 * current tuple’s shouldn’t be either. 3) If the
+			 * predecessor’s xmin is equal to the current tuple’s xmin,
+			 * the current tuple’s cmin should be greater than
+			 * predecessor’s cmin. 4) If the current tuple is not HOT then
+			 * its predecessor’s tuple must not be HEAP_HOT_UPDATED. 5) If
+			 * the current Tuple is HOT then its predecessor’s tuple must be
+			 * HEAP_HOT_UPDATED.
+			 */
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			curr_xmax = HeapTupleHeaderGetUpdateXid(curr_htup);
+			curr_cmin = HeapTupleHeaderGetRawCommandId(curr_htup);
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			pred_cmin = HeapTupleHeaderGetRawCommandId(pred_htup);
+
+			if (!TransactionIdEquals(pred_xmin, curr_xmin) &&
+				!TransactionIdDidCommit(pred_xmin))
+			{
+				report_corruption(&ctx,
+								  psprintf("tuple with uncommitted xmin %u was updated to produce a tuple at offset %u with differing xmin %u",
+										   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
+			}
+			if (pred_xmin != FrozenTransactionId && curr_xmin == FrozenTransactionId)
+			{
+				report_corruption(&ctx,
+								  psprintf("unfrozen tuple was updated to produce a tuple at offset %u which is not frozen",
+										   (unsigned) currentoffnum));
+			}
+
+			/*
+			 * Not a corruption If current tuple is updated/deleted by
+			 * different transaction, means t_cid will point to cmax (that is
+			 * command id of deleting transaction) and cid of predecessor not
+			 * necessarily be smaller than cid of current tuple. t_cid can
+			 * hold combo command id but we are not worrying here as combo
+			 * command id of next updated tuple(if present) must be greater
+			 * than combo command id of current tuple. So here we are not
+			 * checking HEAP_COMBOCID flag and simply doing t_cid comparison.
+			 */
+			if (TransactionIdEquals(pred_xmin, curr_xmin) &&
+				(TransactionIdEquals(curr_xmin, curr_xmax) ||
+				 !TransactionIdIsValid(curr_xmax)) && pred_cmin >= curr_cmin)
+			{
+				report_corruption(&ctx,
+								  psprintf("tuple with xmin %u has cid %u was updated to produce a tuple at offset %u with same xmin but has cid %u",
+										   (unsigned) pred_xmin, (unsigned) pred_cmin, (unsigned) currentoffnum, (unsigned) curr_cmin));
+			}
+			if (HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				!HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+			if (!HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
 		}
 
 		/* clean up */
@@ -640,6 +842,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -651,6 +854,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
-- 
2.25.1

Reply via email to