I wrote:
> In bug #18163 [1], Alexander proved the misgivings I had in [2]
> about catcache detoasting being possibly unsafe:
> ...
> Attached is a POC patch for fixing this.

The cfbot pointed out that this needed a rebase.  No substantive
changes.

                        regards, tom lane

diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index 2e2e4d9f1f..2d72e8106c 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1372,6 +1372,7 @@ SearchCatCacheMiss(CatCache *cache,
 	SysScanDesc scandesc;
 	HeapTuple	ntp;
 	CatCTup    *ct;
+	bool		stale;
 	Datum		arguments[CATCACHE_MAXKEYS];
 
 	/* Initialize local parameter array */
@@ -1380,16 +1381,6 @@ SearchCatCacheMiss(CatCache *cache,
 	arguments[2] = v3;
 	arguments[3] = v4;
 
-	/*
-	 * Ok, need to make a lookup in the relation, copy the scankey and fill
-	 * out any per-call fields.
-	 */
-	memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
-	cur_skey[0].sk_argument = v1;
-	cur_skey[1].sk_argument = v2;
-	cur_skey[2].sk_argument = v3;
-	cur_skey[3].sk_argument = v4;
-
 	/*
 	 * Tuple was not found in cache, so we have to try to retrieve it directly
 	 * from the relation.  If found, we will add it to the cache; if not
@@ -1404,9 +1395,28 @@ SearchCatCacheMiss(CatCache *cache,
 	 * will eventually age out of the cache, so there's no functional problem.
 	 * This case is rare enough that it's not worth expending extra cycles to
 	 * detect.
+	 *
+	 * Another case, which we *must* handle, is that the tuple could become
+	 * outdated during CatalogCacheCreateEntry's attempt to detoast it (since
+	 * AcceptInvalidationMessages can run during TOAST table access).  We do
+	 * not want to return already-stale catcache entries, so we loop around
+	 * and do the table scan again if that happens.
 	 */
 	relation = table_open(cache->cc_reloid, AccessShareLock);
 
+	do
+	{
+		/*
+		 * Ok, need to make a lookup in the relation, copy the scankey and
+		 * fill out any per-call fields.  (We must re-do this when retrying,
+		 * because systable_beginscan scribbles on the scankey.)
+		 */
+		memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
+		cur_skey[0].sk_argument = v1;
+		cur_skey[1].sk_argument = v2;
+		cur_skey[2].sk_argument = v3;
+		cur_skey[3].sk_argument = v4;
+
 	scandesc = systable_beginscan(relation,
 								  cache->cc_indexoid,
 								  IndexScanOK(cache, cur_skey),
@@ -1415,12 +1425,19 @@ SearchCatCacheMiss(CatCache *cache,
 								  cur_skey);
 
 	ct = NULL;
+	stale = false;
 
 	while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
 	{
 		ct = CatalogCacheCreateEntry(cache, ntp, arguments,
 									 hashValue, hashIndex,
 									 false);
+		/* upon failure, we must start the scan over */
+		if (ct == NULL)
+		{
+			stale = true;
+			break;
+		}
 		/* immediately set the refcount to 1 */
 		ResourceOwnerEnlarge(CurrentResourceOwner);
 		ct->refcount++;
@@ -1429,6 +1446,7 @@ SearchCatCacheMiss(CatCache *cache,
 	}
 
 	systable_endscan(scandesc);
+	} while (stale);
 
 	table_close(relation, AccessShareLock);
 
@@ -1451,6 +1469,9 @@ SearchCatCacheMiss(CatCache *cache,
 									 hashValue, hashIndex,
 									 true);
 
+		/* Creating a negative cache entry shouldn't fail */
+		Assert(ct != NULL);
+
 		CACHE_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
 				   cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
 		CACHE_elog(DEBUG2, "SearchCatCache(%s): put neg entry in bucket %d",
@@ -1663,7 +1684,8 @@ SearchCatCacheList(CatCache *cache,
 	 * We have to bump the member refcounts temporarily to ensure they won't
 	 * get dropped from the cache while loading other members. We use a PG_TRY
 	 * block to ensure we can undo those refcounts if we get an error before
-	 * we finish constructing the CatCList.
+	 * we finish constructing the CatCList.  ctlist must be valid throughout
+	 * the PG_TRY block.
 	 */
 	ctlist = NIL;
 
@@ -1672,19 +1694,23 @@ SearchCatCacheList(CatCache *cache,
 		ScanKeyData cur_skey[CATCACHE_MAXKEYS];
 		Relation	relation;
 		SysScanDesc scandesc;
-
-		/*
-		 * Ok, need to make a lookup in the relation, copy the scankey and
-		 * fill out any per-call fields.
-		 */
-		memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
-		cur_skey[0].sk_argument = v1;
-		cur_skey[1].sk_argument = v2;
-		cur_skey[2].sk_argument = v3;
-		cur_skey[3].sk_argument = v4;
+		bool		stale;
 
 		relation = table_open(cache->cc_reloid, AccessShareLock);
 
+		do
+		{
+			/*
+			 * Ok, need to make a lookup in the relation, copy the scankey and
+			 * fill out any per-call fields.  (We must re-do this when
+			 * retrying, because systable_beginscan scribbles on the scankey.)
+			 */
+			memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
+			cur_skey[0].sk_argument = v1;
+			cur_skey[1].sk_argument = v2;
+			cur_skey[2].sk_argument = v3;
+			cur_skey[3].sk_argument = v4;
+
 		scandesc = systable_beginscan(relation,
 									  cache->cc_indexoid,
 									  IndexScanOK(cache, cur_skey),
@@ -1695,6 +1721,8 @@ SearchCatCacheList(CatCache *cache,
 		/* The list will be ordered iff we are doing an index scan */
 		ordered = (scandesc->irel != NULL);
 
+		stale = false;
+
 		while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
 		{
 			uint32		hashValue;
@@ -1740,6 +1768,30 @@ SearchCatCacheList(CatCache *cache,
 				ct = CatalogCacheCreateEntry(cache, ntp, arguments,
 											 hashValue, hashIndex,
 											 false);
+				/* upon failure, we must start the scan over */
+				if (ct == NULL)
+				{
+					/*
+					 * Release refcounts on any items we already had.  We dare
+					 * not try to free them if they're now unreferenced, since
+					 * an error while doing that would result in the PG_CATCH
+					 * below doing extra refcount decrements.  Besides, we'll
+					 * likely re-adopt those items in the next iteration, so
+					 * it's not worth complicating matters to try to get rid
+					 * of them.
+					 */
+					foreach(ctlist_item, ctlist)
+					{
+						ct = (CatCTup *) lfirst(ctlist_item);
+						Assert(ct->c_list == NULL);
+						Assert(ct->refcount > 0);
+						ct->refcount--;
+					}
+					/* Reset ctlist in preparation for new try */
+					ctlist = NIL;
+					stale = true;
+					break;
+				}
 			}
 
 			/* Careful here: add entry to ctlist, then bump its refcount */
@@ -1749,6 +1801,7 @@ SearchCatCacheList(CatCache *cache,
 		}
 
 		systable_endscan(scandesc);
+		} while (stale);
 
 		table_close(relation, AccessShareLock);
 
@@ -1865,6 +1918,10 @@ ReleaseCatCacheListWithOwner(CatCList *list, ResourceOwner resowner)
  * CatalogCacheCreateEntry
  *		Create a new CatCTup entry, copying the given HeapTuple and other
  *		supplied data into it.  The new entry initially has refcount 0.
+ *
+ * Returns NULL if we attempt to detoast the tuple and observe that it
+ * became stale.  (This cannot happen for a negative entry.)  Caller must
+ * retry the tuple lookup in that case.
  */
 static CatCTup *
 CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
@@ -1888,9 +1945,24 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
 		 * entry, and also protects us against the possibility of the toast
 		 * tuples being freed before we attempt to fetch them, in case of
 		 * something using a slightly stale catcache entry.
+		 *
+		 * If we see SharedInvalidMessageCounter advance during detoasting,
+		 * assume that the tuple we are interested in might have been
+		 * invalidated, and report failure.  (Since we don't yet have a
+		 * catcache entry representing the tuple, it's impossible to tell for
+		 * sure, and we have to be conservative.)
 		 */
 		if (HeapTupleHasExternal(ntp))
+		{
+			uint64		inval_count = SharedInvalidMessageCounter;
+
 			dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc);
+			if (inval_count != SharedInvalidMessageCounter)
+			{
+				heap_freetuple(dtp);
+				return NULL;
+			}
+		}
 		else
 			dtp = ntp;
 

Reply via email to