>>>>> "Amit" == Amit Kapila <amit.kapil...@gmail.com> writes:

 Amit> (c) Change the logic during rewrite such that it can detect this
 Amit> situation and skip copying the tuple in the main table,

So I dug into this one and it looks to me like the best approach. Here's
a draft patch against 10-stable, if nobody can think of any showstoppers
then I'll do the rest of the versions and commit them.

-- 
Andrew (irc:RhodiumToad)

diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c
index e3d09db0a8..fd091f2de5 100644
--- a/src/backend/access/heap/tuptoaster.c
+++ b/src/backend/access/heap/tuptoaster.c
@@ -1455,6 +1455,50 @@ toast_get_valid_index(Oid toastoid, LOCKMODE lock)
 
 
 /* ----------
+ * toast_validate_tuple -
+ *
+ *  Given one HeapTuple, test whether all of its external toast rows (if any)
+ *  exist and are the correct length.
+ *
+ * tup: tuple
+ * tupleDesc: tupdesc
+ */
+bool
+toast_validate_tuple(HeapTuple tup, TupleDesc tupleDesc)
+{
+	Form_pg_attribute *att = tupleDesc->attrs;
+	int			numAttrs = tupleDesc->natts;
+	int			i;
+	Datum		toast_values[MaxTupleAttributeNumber];
+	bool		toast_isnull[MaxTupleAttributeNumber];
+
+	/*
+	 * Caller can check this if they like, but it doesn't hurt to do it here
+	 * too.
+	 */
+	if (!HeapTupleHasExternal(tup))
+		return true;
+
+	/*
+	 * Break down the tuple into fields.
+	 */
+	Assert(numAttrs <= MaxTupleAttributeNumber);
+	heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
+
+	for (i = 0; i < numAttrs; i++)
+	{
+		if (!toast_isnull[i] &&
+			att[i]->attlen == -1 &&
+			!att[i]->attisdropped &&
+			!toast_validate_datum(toast_values[i]))
+			return false;
+	}
+
+	return true;
+}
+
+
+/* ----------
  * toast_save_datum -
  *
  *	Save one single datum into the secondary relation and return
@@ -2037,6 +2081,141 @@ toast_fetch_datum(struct varlena *attr)
 }
 
 /* ----------
+ * toast_validate_datum -
+ *
+ *  Test whether a Datum's external toast rows (if any) exist with the correct
+ *  lengths.
+ *
+ *  This should track toast_fetch_datum to the extent that it returns false if
+ *  toast_fetch_datum would have errored out due to missing chunks or incorrect
+ *  lengths. (Other kinds of "can't happen" errors can still be thrown.)
+ * ----------
+ */
+bool
+toast_validate_datum(Datum value)
+{
+	bool		result = true;
+	Relation	toastrel;
+	Relation   *toastidxs;
+	ScanKeyData toastkey;
+	SysScanDesc toastscan;
+	HeapTuple	ttup;
+	TupleDesc	toasttupDesc;
+	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
+	struct varatt_external toast_pointer;
+	int32		ressize;
+	int32		residx,
+				nextidx;
+	int32		numchunks;
+	Pointer		chunk;
+	bool		isnull;
+	int32		chunksize;
+	int			num_indexes;
+	int			validIndex;
+	SnapshotData SnapshotToast;
+
+	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+		return true;
+
+	/* Must copy to access aligned fields */
+	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+	ressize = toast_pointer.va_extsize;
+	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
+
+	/*
+	 * Open the toast relation and its indexes
+	 */
+	toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
+	toasttupDesc = toastrel->rd_att;
+
+	/* Look for the valid index of the toast relation */
+	validIndex = toast_open_indexes(toastrel,
+									AccessShareLock,
+									&toastidxs,
+									&num_indexes);
+
+	/*
+	 * Setup a scan key to fetch from the index by va_valueid
+	 */
+	ScanKeyInit(&toastkey,
+				(AttrNumber) 1,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(toast_pointer.va_valueid));
+
+	/*
+	 * Read the chunks by index
+	 *
+	 * Note that because the index is actually on (valueid, chunkidx) we will
+	 * see the chunks in chunkidx order, even though we didn't explicitly ask
+	 * for it.
+	 */
+	nextidx = 0;
+
+	init_toast_snapshot(&SnapshotToast);
+	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
+										   &SnapshotToast, 1, &toastkey);
+	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
+	{
+		/*
+		 * Have a chunk, extract the sequence number and the data
+		 */
+		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
+		Assert(!isnull);
+		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
+		Assert(!isnull);
+		if (!VARATT_IS_EXTENDED(chunk))
+		{
+			chunksize = VARSIZE(chunk) - VARHDRSZ;
+		}
+		else if (VARATT_IS_SHORT(chunk))
+		{
+			/* could happen due to heap_form_tuple doing its thing */
+			chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
+		}
+		else
+		{
+			/* should never happen */
+			elog(ERROR, "found toasted toast chunk for toast value %u in %s",
+				 toast_pointer.va_valueid,
+				 RelationGetRelationName(toastrel));
+			chunksize = 0;		/* keep compiler quiet */
+		}
+
+		/*
+		 * Some checks on the data we've found
+		 */
+		if (residx != nextidx ||
+			(residx < numchunks - 1 &&
+			 chunksize != TOAST_MAX_CHUNK_SIZE) ||
+			(residx == numchunks - 1 &&
+			 ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)) ||
+			(residx >= numchunks))
+		{
+			result = false;
+			break;
+		}
+
+		nextidx++;
+	}
+
+	/*
+	 * Final checks
+	 */
+	if (result && nextidx != numchunks)
+		result = false;
+
+	/*
+	 * End scan and close relations
+	 */
+	systable_endscan_ordered(toastscan);
+	toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
+	heap_close(toastrel, AccessShareLock);
+
+	return result;
+}
+
+/* ----------
  * toast_fetch_datum_slice -
  *
  *	Reconstruct a segment of a Datum from the chunks saved
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index dfd089f63c..b241b67a5e 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -945,6 +945,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
 		HeapTuple	tuple;
 		Buffer		buf;
 		bool		isdead;
+		bool		validate_toast = false;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -979,6 +980,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
 				break;
 			case HEAPTUPLE_RECENTLY_DEAD:
 				tups_recently_dead += 1;
+				validate_toast = true;
 				/* fall through */
 			case HEAPTUPLE_LIVE:
 				/* Live or recently dead, must copy it */
@@ -1022,6 +1024,20 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
 
 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
+		/*
+		 * OldestXmin going backwards can mean we have a RECENTLY_DEAD row that
+		 * contains toast pointers whose toast rows have already been vacuumed
+		 * away (or in the worst but unlikely case, recycled). If so, then the
+		 * row must really be dead to all snapshots that could access it, so
+		 * treat it as DEAD instead.
+		 */
+		if (validate_toast &&
+			!toast_validate_tuple(tuple, oldTupDesc))
+		{
+			isdead = true;
+			tups_recently_dead -= 1;
+		}
+
 		if (isdead)
 		{
 			tups_vacuumed += 1;
diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h
index fd9f83ac44..83b59257ef 100644
--- a/src/include/access/tuptoaster.h
+++ b/src/include/access/tuptoaster.h
@@ -236,4 +236,22 @@ extern Size toast_datum_size(Datum value);
  */
 extern Oid	toast_get_valid_index(Oid toastoid, LOCKMODE lock);
 
+/* ----------
+ * toast_validate_tuple -
+ *
+ *  Given one HeapTuple, test whether all of its external toast rows (if any)
+ *  exist and are the correct length.
+ *  ----------
+ */
+extern bool toast_validate_tuple(HeapTuple tup, TupleDesc tupleDesc);
+
+/* ----------
+ * toast_validate_datum -
+ *
+ *  Test whether a Datum's external toast rows (if any) exist with the correct
+ *  lengths.
+ *  ----------
+ */
+extern bool toast_validate_datum(Datum value);
+
 #endif							/* TUPTOASTER_H */

Reply via email to