On Mon, Mar 8, 2021 at 11:41 PM Amit Langote <amitlangot...@gmail.com> wrote:
> On Thu, Mar 4, 2021 at 5:15 AM Tom Lane <t...@sss.pgh.pa.us> wrote:
> >  I guess I'm disturbed by the idea
> > that we'd totally replace the implementation technology for only one
> > variant of foreign key checks.  That means that there'll be a lot
> > of minor details that don't act the same depending on context.  One
> > point I was just reminded of by [1] is that the SPI approach enforces
> > permissions checks on the table access, which I do not see being done
> > anywhere in your patch.  Now, maybe it's fine not to have such checks,
> > on the grounds that the existence of the RI constraint is sufficient
> > permission (the creator had to have REFERENCES permission to make it).
> > But I'm not sure about that.  Should we add SELECT permissions checks
> > to this code path to make it less different?
> >
> > In the same vein, the existing code actually runs the query as the
> > table owner (cf. SetUserIdAndSecContext in ri_PerformCheck), another
> > nicety you haven't bothered with.  Maybe that is invisible for a
> > pure SELECT query but I'm not sure I would bet on it.  At the very
> > least you're betting that the index-related operators you invoke
> > aren't going to care, and that nobody is going to try to use this
> > difference to create a security exploit via a trojan-horse index.
>
> How about we do at the top of ri_ReferencedKeyExists() what
> ri_PerformCheck() always does before executing a query, which is this:
>
>    /* Switch to proper UID to perform check as */
>    GetUserIdAndSecContext(&save_userid, &save_sec_context);
>    SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
>                           save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
>                           SECURITY_NOFORCE_RLS);
>
> And then also check the permissions of the switched user on the scan
> target relation's schema (ACL_USAGE) and the relation itself
> (ACL_SELECT).
>
> IOW, this:
>
> +   Oid         save_userid;
> +   int         save_sec_context;
> +   AclResult   aclresult;
> +
> +   /* Switch to proper UID to perform check as */
> +   GetUserIdAndSecContext(&save_userid, &save_sec_context);
> +   SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
> +                          save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
> +                          SECURITY_NOFORCE_RLS);
> +
> +   /* Check namespace permissions. */
> +   aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
> +                                     GetUserId(), ACL_USAGE);
> +   if (aclresult != ACLCHECK_OK)
> +       aclcheck_error(aclresult, OBJECT_SCHEMA,
> +                      get_namespace_name(RelationGetNamespace(pk_rel)));
> +   /* Check the user has SELECT permissions on the referenced relation. */
> +   aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
> +                                 ACL_SELECT);
> +   if (aclresult != ACLCHECK_OK)
> +       aclcheck_error(aclresult, OBJECT_TABLE,
> +                      RelationGetRelationName(pk_rel));
>
>     /*
>      * Extract the unique key from the provided slot and choose the equality
> @@ -414,6 +436,9 @@ ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
>     index_endscan(scan);
>     ExecDropSingleTupleTableSlot(outslot);
>
> +   /* Restore UID and security context */
> +   SetUserIdAndSecContext(save_userid, save_sec_context);
> +
>     /* Don't release lock until commit. */
>     index_close(idxrel, NoLock);

I've included these changes in the updated patch.

> > Shall we mention RLS restrictions?  If we don't worry about that,
> > I think REFERENCES privilege becomes a full bypass of RLS, at
> > least for unique-key columns.
>
> Seeing what check_enable_rls() does when running under the security
> context set by ri_PerformCheck(), it indeed seems that RLS is bypassed
> when executing these RI queries.  The following comment in
> check_enable_rls() seems to say so:
>
>          * InNoForceRLSOperation indicates that we should not apply RLS even
>          * if the table has FORCE RLS set - IF the current user is the owner.
>          * This is specifically to ensure that referential integrity checks
>          * are able to still run correctly.

I've added a comment to note that the new way of "selecting" the
referenced tuple effectively bypasses RLS, as is the case when
selecting via SPI.

> > I wonder also what happens if the referenced table isn't a plain
> > heap with a plain btree index.  Maybe you're accessing it at the
> > right level of abstraction so things will just work with some
> > other access methods, but I'm not sure about that.
>
> I believe that I've made ri_ReferencedKeyExists() use the appropriate
> APIs to scan the index, lock the returned table tuple, etc., but do
> you think we might be better served by introducing a new set of APIs
> for this use case?

I concur that by using the interfaces defined in genam.h and
tableam.h, patch accounts for cases involving other access methods.

That said, I had overlooked one bit in the new code that is specific
to btree AM, which is the hard-coding of BTEqualStrategyNumber in the
following:

        /* Initialize the scankey. */
        ScanKeyInit(&skey[i],
                    pkattno,
                    BTEqualStrategyNumber,
                    regop,
                    pk_vals[i]);

In the updated patch, I've added code to look up the index-specific
strategy number to pass here.

> > Lastly, ri_PerformCheck is pretty careful about not only which
> > snapshot it uses, but which *pair* of snapshots it uses, because
> > sometimes it needs to worry about data changes since the start
> > of the transaction.  You've ignored all of that complexity AFAICS.
> > That's okay (I think) for RI_FKey_check which was passing
> > detectNewRows = false, but for sure it's not okay for
> > ri_Check_Pk_Match.  (I kind of thought we had isolation tests
> > that would catch that, but apparently not.)
>
> Okay, let me closely check the ri_Check_Pk_Match() case and see if
> there's any live bug.

As mentioned in my earlier reply, there doesn't seem to be a need for
ri_Check_Pk_Match() to set the crosscheck snapshot as it is basically
unused.

> > So, this is a cute idea, and the speedup is pretty impressive,
> > but I don't think it's anywhere near committable.  I also wonder
> > whether we really want ri_triggers.c having its own copy of
> > low-level stuff like the tuple-locking code you copied.  Seems
> > like a likely maintenance hazard, so maybe some more refactoring
> > is needed.
>
> Okay, I will see if there's a way to avoid copying too much code.

I thought sharing the tuple-locking code with ExecLockRows(), which
seemed closest in semantics to what the new code is doing, might not
be such a bad idea, but not sure I came up with a great interface for
the shared function.  Actually, there are other places having their
own copies of tuple-locking logic, but they deal with the locking
result in their own unique ways, so I didn't get excited about finding
a way to make the new function accommodate their needs.  I also admit
that I may have totally misunderstood what refactoring you were
referring to in your comment.

Updated patches attached.  Sorry about the delay.

--
Amit Langote
EDB: http://www.enterprisedb.com
From 2a13b6d04800521fcd523846186cabe9cdeea2a4 Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Thu, 10 Dec 2020 20:21:29 +0900
Subject: [PATCH v7 2/2] Avoid using SPI for some RI checks

This modifies the subroutines called by RI trigger functions that
want to check if a given referenced value exists in the referenced
relation to simply scan the foreign key constraint's unique index.
That replaces the current way of issuing a
`SELECT 1 FROM referenced_relation WHERE ref_key = $1` query
through SPI to do the same.  This saves a lot of work, especially
when inserting into or updating a referencing relation.
---
 src/backend/executor/nodeLockRows.c | 161 ++++----
 src/backend/utils/adt/ri_triggers.c | 616 ++++++++++++++++++----------
 src/include/executor/executor.h     |   9 +
 3 files changed, 488 insertions(+), 298 deletions(-)

diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index b2e5c30079..ae07465127 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -75,11 +75,9 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
+		bool		skip;
 
 		/* clear any leftover test tuple for this rel */
 		markSlot = EvalPlanQualSlot(&node->lr_epqstate, erm->relation, erm->rti);
@@ -175,74 +173,12 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
 
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		skip = !ExecLockTableTuple(erm->relation, &tid, markSlot,
+								   estate->es_snapshot, estate->es_output_cid,
+								   lockmode, erm->waitPolicy, &epq_needed);
+		if (skip)
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -277,6 +213,91 @@ lnext:
 	return slot;
 }
 
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with given TID with given lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed)
+{
+	TM_FailureData tmfd;
+	int			lockflags = 0;
+	TM_Result	test;
+
+	lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && epq_needed)
+				*epq_needed = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 09a2ad2881..93b994dc2b 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,22 +23,27 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/partition.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
+#include "partitioning/partdesc.h"
 #include "storage/bufmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -48,6 +53,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/ruleutils.h"
@@ -68,16 +74,11 @@
 #define RI_KEYS_NONE_NULL				2
 
 /* RI query type codes */
-/* these queries are executed against the PK (referenced) table: */
-#define RI_PLAN_CHECK_LOOKUPPK			1
-#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK	2
-#define RI_PLAN_LAST_ON_PK				RI_PLAN_CHECK_LOOKUPPK_FROM_PK
-/* these queries are executed against the FK (referencing) table: */
-#define RI_PLAN_CASCADE_DEL_DODELETE	3
-#define RI_PLAN_CASCADE_UPD_DOUPDATE	4
-#define RI_PLAN_RESTRICT_CHECKREF		5
-#define RI_PLAN_SETNULL_DOUPDATE		6
-#define RI_PLAN_SETDEFAULT_DOUPDATE		7
+#define RI_PLAN_CASCADE_DEL_DODELETE	1
+#define RI_PLAN_CASCADE_UPD_DOUPDATE	2
+#define RI_PLAN_RESTRICT_CHECKREF		3
+#define RI_PLAN_SETNULL_DOUPDATE		4
+#define RI_PLAN_SETDEFAULT_DOUPDATE		5
 
 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
@@ -224,8 +225,331 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 							   Relation pk_rel, Relation fk_rel,
 							   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-							   int queryno, bool partgone) pg_attribute_noreturn();
+							   bool on_fk, bool partgone) pg_attribute_noreturn();
+static Relation find_leaf_pk_rel(Relation root_pk_rel, const RI_ConstraintInfo *riinfo,
+				 Datum *pk_vals, char *pk_nulls,
+				 Oid root_idxoid, Oid *leaf_idxoid);
 
+/*
+ * Checks whether a tuple containing the same unique key as extracted from the
+ * tuple provided in 'slot' exists in 'pk_rel'.  The key is extracted using the
+ * constraint's index given in 'riinfo', which is also scanned to check the
+ * existence of the key.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation ('fk_rel' is non-NULL), or the one being deleted from the
+ * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
+ */
+static bool
+ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
+					   TupleTableSlot *slot,
+					   const RI_ConstraintInfo *riinfo)
+{
+	Oid			constr_id = riinfo->constraint_id;
+	Oid			idxoid;
+	Relation	idxrel;
+	Relation	leaf_pk_rel = NULL;
+	int			num_pk;
+	int			i;
+	bool		found = false;
+	const Oid  *eq_oprs;
+	Datum		pk_vals[INDEX_MAX_KEYS];
+	char		pk_nulls[INDEX_MAX_KEYS];
+	ScanKeyData skey[INDEX_MAX_KEYS];
+	IndexScanDesc	scan;
+	TupleTableSlot *outslot;
+	Oid			save_userid;
+	int			save_sec_context;
+	AclResult	aclresult;
+
+	/*
+	 * Extract the unique key from the provided slot and choose the equality
+	 * operators to use when scanning the index below.
+	 */
+	if (fk_rel)
+	{
+		ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
+		/* Use PK = FK equality operator. */
+		eq_oprs = riinfo->pf_eq_oprs;
+
+		/*
+		 * May neeed to cast each of the individual values of the foreign key
+		 * to the corresponding PK column's type if the equality operator
+		 * demands it.
+		 */
+		for (i = 0; i < riinfo->nkeys; i++)
+		{
+			if (pk_nulls[i] != 'n')
+			{
+				Oid		eq_opr = eq_oprs[i];
+				Oid		typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+				RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+				if (OidIsValid(entry->cast_func_finfo.fn_oid))
+					pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+											   pk_vals[i],
+											   Int32GetDatum(-1), /* typmod */
+											   BoolGetDatum(false)); /* implicit coercion */
+			}
+		}
+	}
+	else
+	{
+		ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
+		/* Use PK = PK equality operator. */
+		eq_oprs = riinfo->pp_eq_oprs;
+	}
+
+	/*
+	 * Switch to referenced table's owner to perform the below operations
+	 * as.  This matches what ri_PerformCheck() does.
+	 *
+	 * Note that as with queries done by ri_PerformCheck(), the way we select
+	 * the referenced row below effectively bypasses any RLS policies that may
+	 * be present on the referenced table.
+	 */
+	GetUserIdAndSecContext(&save_userid, &save_sec_context);
+	SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
+
+	/*
+	 * Also check that the new user has permissions to look into the schema
+	 * of and SELECT from the referenced table.
+	 */
+	aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
+									  GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(pk_rel)));
+	aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(pk_rel));
+
+	/*
+	 * Open the constraint index to be scanned.
+	 *
+	 * If the target table is partitioned, we must look up the leaf partition
+	 * and its corresponding unique index to search the keys in.
+	 */
+	idxoid = get_constraint_index(constr_id);
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Oid		leaf_idxoid;
+
+		leaf_pk_rel = find_leaf_pk_rel(pk_rel, riinfo,
+									   pk_vals, pk_nulls,
+									   idxoid, &leaf_idxoid);
+
+		/*
+		 * If no suitable leaf partition exists, neither can the key we're
+		 * looking for.
+		 */
+		if (leaf_pk_rel == NULL)
+			goto done;
+
+		pk_rel = leaf_pk_rel;
+		idxoid = leaf_idxoid;
+	}
+	idxrel = index_open(idxoid, RowShareLock);
+
+	/* Set up ScanKeys for the index scan. */
+	num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+	for (i = 0; i < num_pk; i++)
+	{
+		int			pkattno = i + 1;
+		Oid			operator = eq_oprs[i];
+		Oid			opfamily = idxrel->rd_opfamily[i];
+		StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
+		RegProcedure regop = get_opcode(operator);
+
+		/* Initialize the scankey. */
+		ScanKeyInit(&skey[i],
+					pkattno,
+					strat,
+					regop,
+					pk_vals[i]);
+
+		skey[i].sk_collation = idxrel->rd_indcollation[i];
+
+		/*
+		 * Check for null value.  Should not occur, because callers currently
+		 * take care of the cases in which they do occur.
+		 */
+		if (pk_nulls[i] == 'n')
+			skey[i].sk_flags |= SK_ISNULL;
+	}
+
+	/*
+	 * Start the scan. To make the changes of the current command visible to
+	 * the scan and for subsequent locking of the tuple (if any) found,
+	 * increment the command counter.
+	 */
+	CommandCounterIncrement();
+	PushActiveSnapshot(GetTransactionSnapshot());
+	scan = index_beginscan(pk_rel, idxrel, GetActiveSnapshot(), num_pk, 0);
+	index_rescan(scan, skey, num_pk, NULL, 0);
+
+	/* Try to find the tuple */
+	outslot = table_slot_create(pk_rel, NULL);
+	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+		found = true;
+
+	/* Found tuple, try to lock it in key share mode. */
+	if (found)
+		found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+								   GetActiveSnapshot(),
+								   GetCurrentCommandId(false),
+								   LockTupleKeyShare,
+								   LockWaitBlock, NULL);
+
+	PopActiveSnapshot();
+	index_endscan(scan);
+	ExecDropSingleTupleTableSlot(outslot);
+
+	/* Don't release lock until commit. */
+	index_close(idxrel, NoLock);
+
+	/* Close leaf partition relation if any. */
+	if (leaf_pk_rel)
+		table_close(leaf_pk_rel, NoLock);
+
+done:
+	/* Restore UID and security context */
+	SetUserIdAndSecContext(save_userid, save_sec_context);
+
+	return found;
+}
+
+/*
+ * Finds the leaf partition of the partitioned relation 'root_pk_rel' that
+ * might contain the specified unique key.
+ *
+ * Returns NULL if no such leaf partition is found.
+ *
+ * This works because the unique key defined on the root relation always
+ * contains the partition key columns of all ancestors leading up to a
+ * given leaf partition.
+ */
+static Relation
+find_leaf_pk_rel(Relation root_pk_rel, const RI_ConstraintInfo *riinfo,
+				 Datum *pk_vals, char *pk_nulls,
+				 Oid root_idxoid, Oid *leaf_idxoid)
+{
+	Relation	pk_rel = root_pk_rel;
+	const AttrNumber *pk_attnums = riinfo->pk_attnums;
+	Oid			constr_idxoid = root_idxoid;
+
+	*leaf_idxoid = InvalidOid;
+
+	/*
+	 * Descend through partitioned parents to find the leaf partition that
+	 * would accept a row with the provided key values.
+	 */
+	while (true)
+	{
+		PartitionKey partkey = RelationGetPartitionKey(pk_rel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(pk_rel);
+		Datum	partkey_vals[PARTITION_MAX_KEYS];
+		bool	partkey_isnull[PARTITION_MAX_KEYS];
+		AttrNumber *root_partattrs = partkey->partattrs;
+		int		i,
+				j;
+		int		partidx;
+		Oid		partoid;
+
+		/*
+		 * Collect partition key values from the unique key.
+		 *
+		 * Because we only have the root table's copy of pk_attnums, must map
+		 * any non-root table's partition key attribute numbers to the root
+		 * table's.
+		 */
+		if (pk_rel != root_pk_rel)
+		{
+			/*
+			 * map->attnums will contain root table attribute numbers for each
+			 * attribute of the current partitioned relation.
+			 */
+			AttrMap *map = build_attrmap_by_name_if_req(RelationGetDescr(root_pk_rel),
+														RelationGetDescr(pk_rel));
+
+			if (map)
+			{
+				root_partattrs = palloc(partkey->partnatts *
+										sizeof(AttrNumber));
+				for (i = 0; i < partkey->partnatts; i++)
+				{
+					AttrNumber	partattno = partkey->partattrs[i];
+
+					root_partattrs[i] = map->attnums[partattno - 1];
+				}
+
+				free_attrmap(map);
+			}
+		}
+
+		/*
+		 * Referenced key specification does not allow expressions, so there
+		 * would not be expressions in the partition keys either.
+		 */
+		Assert(partkey->partexprs == NIL);
+		for (i = 0, j = 0; i < partkey->partnatts; i++)
+		{
+			int		k;
+
+			for (k = 0; k < riinfo->nkeys; k++)
+			{
+				if (root_partattrs[i] == pk_attnums[k])
+				{
+					partkey_vals[j] = pk_vals[k];
+					partkey_isnull[j] = (pk_nulls[k] == 'n' ? true : false);
+					j++;
+					break;
+				}
+			}
+		}
+		/* Had better have found values for all of the partition keys. */
+		Assert(j == partkey->partnatts);
+
+		if (root_partattrs != partkey->partattrs)
+			pfree(root_partattrs);
+
+		partidx = get_partition_for_tuple(partkey, partdesc,
+										  partkey_vals, partkey_isnull);
+
+		/* close any intermediate parents we opened */
+		if (pk_rel != root_pk_rel)
+			table_close(pk_rel, NoLock);
+
+		/* No partition found. */
+		if (partidx < 0)
+			return NULL;
+
+		Assert(partidx < partdesc->nparts);
+		partoid = partdesc->oids[partidx];
+
+		pk_rel = table_open(partoid, RowShareLock);
+		constr_idxoid = index_get_partition(pk_rel, constr_idxoid);
+
+		/*
+		 * Return if the partition is a leaf, else find its partition in the
+		 * next iteration.
+		 */
+		if (partdesc->is_leaf[partidx])
+		{
+			*leaf_idxoid = constr_idxoid;
+			return pk_rel;
+		}
+	}
+
+	Assert(false);
+	return NULL;
+}
 
 /*
  * RI_FKey_check -
@@ -239,8 +563,6 @@ RI_FKey_check(TriggerData *trigdata)
 	Relation	fk_rel;
 	Relation	pk_rel;
 	TupleTableSlot *newslot;
-	RI_QueryKey qkey;
-	SPIPlanPtr	qplan;
 
 	riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
 									trigdata->tg_relation, false);
@@ -320,9 +642,9 @@ RI_FKey_check(TriggerData *trigdata)
 
 					/*
 					 * MATCH PARTIAL - all non-null columns must match. (not
-					 * implemented, can be done by modifying the query below
-					 * to only include non-null columns, or by writing a
-					 * special version here)
+					 * implemented, can be done by modifying
+					 * ri_ReferencedKeyExists() to only include non-null
+					 * columns.
 					 */
 					break;
 #endif
@@ -337,70 +659,12 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
-	/* Fetch or prepare a saved plan for the real check */
-	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
-
-	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
-	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-		const char *pk_only;
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * corresponding FK attributes.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-			Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pf_eq_oprs[i],
-							paramname, fk_type);
-			querysep = "AND";
-			queryoids[i] = fk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
-	}
-
-	/*
-	 * Now check that foreign key exists in PK table
-	 */
-	ri_PerformCheck(riinfo, &qkey, qplan,
-					fk_rel, pk_rel,
-					NULL, newslot,
-					false,
-					SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
+	if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
+		ri_ReportViolation(riinfo,
+						   pk_rel, fk_rel,
+						   newslot,
+						   NULL,
+						   true, false);
 
 	table_close(pk_rel, RowShareLock);
 
@@ -455,81 +719,10 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 				  TupleTableSlot *oldslot,
 				  const RI_ConstraintInfo *riinfo)
 {
-	SPIPlanPtr	qplan;
-	RI_QueryKey qkey;
-	bool		result;
-
 	/* Only called for non-null rows */
 	Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-	if (SPI_connect() != SPI_OK_CONNECT)
-		elog(ERROR, "SPI_connect failed");
-
-	/*
-	 * Fetch or prepare a saved plan for checking PK table with values coming
-	 * from a PK row
-	 */
-	ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
-
-	if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
-	{
-		StringInfoData querybuf;
-		char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
-		char		attname[MAX_QUOTED_NAME_LEN];
-		char		paramname[16];
-		const char *querysep;
-		const char *pk_only;
-		Oid			queryoids[RI_MAX_NUMKEYS];
-
-		/* ----------
-		 * The query string built is
-		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-		 *		   FOR KEY SHARE OF x
-		 * The type id's for the $ parameters are those of the
-		 * PK attributes themselves.
-		 * ----------
-		 */
-		initStringInfo(&querybuf);
-		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-			"" : "ONLY ";
-		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-						 pk_only, pkrelname);
-		querysep = "WHERE";
-		for (int i = 0; i < riinfo->nkeys; i++)
-		{
-			Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
-			quoteOneName(attname,
-						 RIAttName(pk_rel, riinfo->pk_attnums[i]));
-			sprintf(paramname, "$%d", i + 1);
-			ri_GenerateQual(&querybuf, querysep,
-							attname, pk_type,
-							riinfo->pp_eq_oprs[i],
-							paramname, pk_type);
-			querysep = "AND";
-			queryoids[i] = pk_type;
-		}
-		appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-		/* Prepare and save the plan */
-		qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-							 &qkey, fk_rel, pk_rel);
-	}
-
-	/*
-	 * We have a plan now. Run it.
-	 */
-	result = ri_PerformCheck(riinfo, &qkey, qplan,
-							 fk_rel, pk_rel,
-							 oldslot, NULL,
-							 true,	/* treat like update */
-							 SPI_OK_SELECT);
-
-	if (SPI_finish() != SPI_OK_FINISH)
-		elog(ERROR, "SPI_finish failed");
-
-	return result;
+	return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
 }
 
 
@@ -1545,15 +1738,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 					 errtableconstraint(fk_rel,
 										NameStr(fake_riinfo.conname))));
 
-		/*
-		 * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
-		 * query, which isn't true, but will cause it to use
-		 * fake_riinfo.fk_attnums as we need.
-		 */
 		ri_ReportViolation(&fake_riinfo,
 						   pk_rel, fk_rel,
 						   slot, tupdesc,
-						   RI_PLAN_CHECK_LOOKUPPK, false);
+						   true, false);
 
 		ExecDropSingleTupleTableSlot(slot);
 	}
@@ -1770,7 +1958,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 			fake_riinfo.pk_attnums[i] = i + 1;
 
 		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
-						   slot, tupdesc, 0, true);
+						   slot, tupdesc, true, true);
 	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
@@ -1907,26 +2095,25 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 {
 	/*
 	 * Inherited constraints with a common ancestor can share ri_query_cache
-	 * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
-	 * Except in that case, the query processes the other table involved in
-	 * the FK constraint (i.e., not the table on which the trigger has been
-	 * fired), and so it will be the same for all members of the inheritance
-	 * tree.  So we may use the root constraint's OID in the hash key, rather
-	 * than the constraint's own OID.  This avoids creating duplicate SPI
-	 * plans, saving lots of work and memory when there are many partitions
-	 * with similar FK constraints.
+	 * entries, because each query processes the other table involved in the
+	 * FK constraint (i.e., not the table on which the trigger has been fired),
+	 * and so it will be the same for all members of the inheritance tree.  So
+	 * we may use the root constraint's OID in the hash key, rather than the
+	 * constraint's own OID.  This avoids creating duplicate SPI plans, saving
+	 * lots of work and memory when there are many partitions with similar FK
+	 * constraints.
 	 *
 	 * (Note that we must still have a separate RI_ConstraintInfo for each
 	 * constraint, because partitions can have different column orders,
 	 * resulting in different pk_attnums[] or fk_attnums[] array contents.)
 	 *
+	 * (Note also that for a standalone or non-inherited constraint,
+	 * constraint_root_id is same as constraint_id.)
+	 *
 	 * We assume struct RI_QueryKey contains no padding bytes, else we'd need
 	 * to use memset to clear them.
 	 */
-	if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
-		key->constr_id = riinfo->constraint_root_id;
-	else
-		key->constr_id = riinfo->constraint_id;
+	key->constr_id = riinfo->constraint_root_id;
 	key->constr_queryno = constr_queryno;
 }
 
@@ -2195,19 +2382,11 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
 			 RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
 	SPIPlanPtr	qplan;
-	Relation	query_rel;
+	/* There are currently no queries that run on PK table. */
+	Relation	query_rel = fk_rel;
 	Oid			save_userid;
 	int			save_sec_context;
 
-	/*
-	 * Use the query type code to determine whether the query is run against
-	 * the PK or FK table; we'll do the check as that table's owner
-	 */
-	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
-		query_rel = pk_rel;
-	else
-		query_rel = fk_rel;
-
 	/* Switch to proper UID to perform check as */
 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
 	SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2240,9 +2419,10 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 				TupleTableSlot *oldslot, TupleTableSlot *newslot,
 				bool detectNewRows, int expect_OK)
 {
-	Relation	query_rel,
-				source_rel;
-	bool		source_is_pk;
+	/* There are currently no queries that run on PK table. */
+	Relation	query_rel = fk_rel,
+				source_rel = pk_rel;
+	bool		source_is_pk = true;
 	Snapshot	test_snapshot;
 	Snapshot	crosscheck_snapshot;
 	int			limit;
@@ -2252,33 +2432,6 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 	Datum		vals[RI_MAX_NUMKEYS * 2];
 	char		nulls[RI_MAX_NUMKEYS * 2];
 
-	/*
-	 * Use the query type code to determine whether the query is run against
-	 * the PK or FK table; we'll do the check as that table's owner
-	 */
-	if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
-		query_rel = pk_rel;
-	else
-		query_rel = fk_rel;
-
-	/*
-	 * The values for the query are taken from the table on which the trigger
-	 * is called - it is normally the other one with respect to query_rel. An
-	 * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
-	 * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
-	 * need some less klugy way to determine this.
-	 */
-	if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
-	{
-		source_rel = fk_rel;
-		source_is_pk = false;
-	}
-	else
-	{
-		source_rel = pk_rel;
-		source_is_pk = true;
-	}
-
 	/* Extract the parameters to be passed into the query */
 	if (newslot)
 	{
@@ -2355,14 +2508,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 				 errhint("This is most likely due to a rule having rewritten the query.")));
 
 	/* XXX wouldn't it be clearer to do this part at the caller? */
-	if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
-		expect_OK == SPI_OK_SELECT &&
-		(SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
+	if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
 		ri_ReportViolation(riinfo,
 						   pk_rel, fk_rel,
 						   newslot ? newslot : oldslot,
 						   NULL,
-						   qkey->constr_queryno, false);
+						   false, false);
 
 	return SPI_processed != 0;
 }
@@ -2393,9 +2544,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 /*
  * Produce an error report
  *
- * If the failed constraint was on insert/update to the FK table,
- * we want the key names and values extracted from there, and the error
- * message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table (on_fk is
+ * true), we want the key names and values extracted from there, and the
+ * error message to look like 'key blah is not present in PK'.
  * Otherwise, the attr names and values come from the PK table and the
  * message looks like 'key blah is still referenced from FK'.
  */
@@ -2403,22 +2554,20 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-				   int queryno, bool partgone)
+				   bool on_fk, bool partgone)
 {
 	StringInfoData key_names;
 	StringInfoData key_values;
-	bool		onfk;
 	const int16 *attnums;
 	Oid			rel_oid;
 	AclResult	aclresult;
 	bool		has_perm = true;
 
 	/*
-	 * Determine which relation to complain about.  If tupdesc wasn't passed
-	 * by caller, assume the violator tuple came from there.
+	 * If tupdesc wasn't passed by caller, assume the violator tuple came from
+	 * there.
 	 */
-	onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
-	if (onfk)
+	if (on_fk)
 	{
 		attnums = riinfo->fk_attnums;
 		rel_oid = fk_rel->rd_id;
@@ -2520,7 +2669,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 						   key_names.data, key_values.data,
 						   RelationGetRelationName(fk_rel)),
 				 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
-	else if (onfk)
+	else if (on_fk)
 		ereport(ERROR,
 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2827,7 +2976,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not.  The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -2883,8 +3035,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_ReferencedKeyExists()), in
+		 * which case, we only need the cast if the right operand value
+		 * doesn't match the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 071e363d54..2da52e7ba5 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -231,6 +231,15 @@ extern bool ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in execLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *epq_needed);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
-- 
2.24.1

From 489133ced948bcd6892ee7ac22f2b13962d5ee8d Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Tue, 12 Jan 2021 14:17:31 +0900
Subject: [PATCH v7 1/2] Export get_partition_for_tuple()

Currently, only execPartition.c can see it, although a subsequent
change will require it to be callable from another module.  To make
this possible, also change the interface to accept the partitioning
information using more widely available structs.
---
 src/backend/executor/execPartition.c | 14 +++++++-------
 src/include/executor/execPartition.h |  3 +++
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index b8da4c5967..f6323d3c8d 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -183,8 +183,6 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
 								  EState *estate,
 								  Datum *values,
 								  bool *isnull);
-static int	get_partition_for_tuple(PartitionDispatch pd, Datum *values,
-									bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
 												  Datum *values,
 												  bool *isnull,
@@ -331,7 +329,9 @@ ExecFindPartition(ModifyTableState *mtstate,
 		 * these values, error out.
 		 */
 		if (partdesc->nparts == 0 ||
-			(partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
+			(partidx = get_partition_for_tuple(dispatch->key,
+											   dispatch->partdesc,
+											   values, isnull)) < 0)
 		{
 			char	   *val_desc;
 
@@ -1311,13 +1311,13 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * Return value is index of the partition (>= 0 and < partdesc->nparts) if one
  * found or -1 if none found.
  */
-static int
-get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
+int
+get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull)
 {
 	int			bound_offset;
 	int			part_index = -1;
-	PartitionKey key = pd->key;
-	PartitionDesc partdesc = pd->partdesc;
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
 	/* Route as appropriate based on partitioning strategy. */
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index d30ffde7d9..e5888d54f1 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -125,5 +125,8 @@ extern PartitionPruneState *ExecCreatePartitionPruneState(PlanState *planstate,
 extern Bitmapset *ExecFindMatchingSubPlans(PartitionPruneState *prunestate);
 extern Bitmapset *ExecFindInitialMatchingSubPlans(PartitionPruneState *prunestate,
 												  int nsubplans);
+extern int get_partition_for_tuple(PartitionKey key,
+						PartitionDesc partdesc,
+						Datum *values, bool *isnull);
 
 #endif							/* EXECPARTITION_H */
-- 
2.24.1

Reply via email to