From d9fe2afcc9abe096334c058585cc577a1af86221 Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Thu, 23 Apr 2020 21:11:58 +0900
Subject: [PATCH v2] ri_triggers.c: Use root constraint OID as key to
 ri_query_cache

All child constraints of a given foreign key constraint can use the
same RI query and the resulting plan, that is, no need to create as
many copies of the query and the plan as there are partitions, as
happens now due to the child constraint OID being used in the key
for ri_query_cache.

Keisuke Kuroda, Amit Langote
---
 src/backend/utils/adt/ri_triggers.c | 75 ++++++++++++++++++++++++++++++++++---
 1 file changed, 69 insertions(+), 6 deletions(-)

diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 02b1a38..223225a 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -95,13 +95,21 @@
  * RI_ConstraintInfo
  *
  * Information extracted from an FK pg_constraint entry.  This is cached in
- * ri_constraint_cache.
+ * ri_constraint_cache using constraint_id or constraint_root_id if the latter
+ * is not 0.
  */
 typedef struct RI_ConstraintInfo
 {
-	Oid			constraint_id;	/* OID of pg_constraint entry (hash key) */
+	Oid			constraint_id;	/* OID of pg_constraint entry */
+	Oid			constraint_parent;	/* OID of parent of this constraint */
+	Oid			constraint_root_id;	/* OID of the constraint in some ancestor
+									 * of the partition (most commonly root
+									 * ancestor) from which this constraint was
+									 * inherited; 0 if the constraint has no
+									 * parent */
 	bool		valid;			/* successfully initialized? */
-	uint32		oidHashValue;	/* hash value of pg_constraint OID */
+	uint32		oidHashValue;	/* hash value of constraint_id */
+	uint32		rootHashValue;	/* hash value of constraint_root_id */
 	NameData	conname;		/* name of the FK constraint */
 	Oid			pk_relid;		/* referenced relation */
 	Oid			fk_relid;		/* referencing relation */
@@ -221,6 +229,8 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 							   Relation pk_rel, Relation fk_rel,
 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 							   int queryno, bool partgone) pg_attribute_noreturn();
+static Oid get_ri_constraint_root(const RI_ConstraintInfo *riinfo);
+static Oid get_ri_constraint_root_recurse(Oid constrOid);
 
 
 /*
@@ -1892,7 +1902,7 @@ ri_GenerateQualCollation(StringInfo buf, Oid collation)
  *	Construct a hashtable key for a prepared SPI plan of an FK constraint.
  *
  *		key: output argument, *key is filled in based on the other arguments
- *		riinfo: info from pg_constraint entry
+ *		riinfo: info derived from pg_constraint entry
  *		constr_queryno: an internal number identifying the query type
  *			(see RI_PLAN_XXX constants at head of file)
  * ----------
@@ -1902,10 +1912,20 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 				 int32 constr_queryno)
 {
 	/*
+	 * Using constraint_root_id instead of constraint_id means that for all
+	 * partitions descending from the same ancestor in which the constraint
+	 * originates will share the same SPI plan.  It is okay to share the plan,
+	 * because the same query is used for all partitions for all RI query
+	 * types.  That is true despite the fact that riinfo->fk_attnums may
+	 * differ among partitions sharing the query, because they all refer to
+	 * the same logical column set.
+	 *
 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
 	 * to use memset to clear them.
 	 */
-	key->constr_id = riinfo->constraint_id;
+	key->constr_id = OidIsValid(riinfo->constraint_root_id) ?
+						riinfo->constraint_root_id :
+						riinfo->constraint_id;
 	key->constr_queryno = constr_queryno;
 }
 
@@ -2010,6 +2030,36 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 }
 
 /*
+ * get_ri_constraint_root
+ *		Returns a given RI constraint's root parent or 0 if it's not inherited
+ */
+static Oid
+get_ri_constraint_root(const RI_ConstraintInfo *riinfo)
+{
+	if (!OidIsValid(riinfo->constraint_parent))
+		return InvalidOid;
+
+	return get_ri_constraint_root_recurse(riinfo->constraint_parent);
+}
+
+static Oid
+get_ri_constraint_root_recurse(Oid constrOid)
+{
+	HeapTuple	tuple;
+	Oid			constrParentOid;
+
+	tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+	constrParentOid = ((Form_pg_constraint) GETSTRUCT(tuple))->conparentid;
+	ReleaseSysCache(tuple);
+	if (OidIsValid(constrParentOid))
+		return get_ri_constraint_root_recurse(constrParentOid);
+
+	return constrOid;
+}
+
+/*
  * Fetch or create the RI_ConstraintInfo struct for an FK constraint.
  */
 static const RI_ConstraintInfo *
@@ -2051,8 +2101,12 @@ ri_LoadConstraintInfo(Oid constraintOid)
 
 	/* And extract data */
 	Assert(riinfo->constraint_id == constraintOid);
+	riinfo->constraint_parent = conForm->conparentid;
+	riinfo->constraint_root_id = get_ri_constraint_root(riinfo);
 	riinfo->oidHashValue = GetSysCacheHashValue1(CONSTROID,
 												 ObjectIdGetDatum(constraintOid));
+	riinfo->rootHashValue = GetSysCacheHashValue1(CONSTROID,
+												  ObjectIdGetDatum(riinfo->constraint_root_id));
 	memcpy(&riinfo->conname, &conForm->conname, sizeof(NameData));
 	riinfo->pk_relid = conForm->confrelid;
 	riinfo->fk_relid = conForm->conrelid;
@@ -2117,7 +2171,16 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue)
 		RI_ConstraintInfo *riinfo = dlist_container(RI_ConstraintInfo,
 													valid_link, iter.cur);
 
-		if (hashvalue == 0 || riinfo->oidHashValue == hashvalue)
+		/*
+		 * We look for changes to two specific pg_constraint entries here --
+		 * the one matching the constraint given by riinfo->constraint_id and
+		 * also the one given by riinfo->constraint_root_id.  The latter too
+		 * because if its parent is updated, it is no longer the root
+		 * constraint.
+		 */
+		if (hashvalue == 0 ||
+			riinfo->oidHashValue == hashvalue ||
+			riinfo->rootHashValue == hashvalue)
 		{
 			riinfo->valid = false;
 			/* Remove invalidated entries from the list, too */
-- 
1.8.3.1

