From c93ee8b6dfd5f345603c327e82b50f1dd8f31cf0 Mon Sep 17 00:00:00 2001
From: Junwang Zhao <zhjwpku@gmail.com>
Date: Mon, 1 Dec 2025 12:16:46 +0800
Subject: [PATCH v2 1/2] Add fast path for foreign key constraint checks

Add a fast path optimization for foreign key constraint checks that
bypasses the SPI executor for simple foreign keys by directly probing
the unique index on the referenced table.

The fast path applies when the referenced table is not partitioned,
and the constraint does not involve temporal semantics. It extracts
the FK value, scans the unique index directly, and locks the tuple
with KEY SHARE lock, matching SPI behavior.

This avoids SPI overhead and improves performance for bulk operations
with many FK checks.

Refactoring: Extract tuple locking logic into ExecLockTableTuple() for
reuse.

Author: Amit Langote, Junwang Zhao

Discussion:
---
 src/backend/executor/nodeLockRows.c           | 164 +++++----
 src/backend/utils/adt/ri_triggers.c           | 323 +++++++++++++++++-
 src/include/executor/executor.h               |   9 +
 .../expected/fk-concurrent-pk-upd.out         |  58 ++++
 src/test/isolation/isolation_schedule         |   1 +
 .../isolation/specs/fk-concurrent-pk-upd.spec |  42 +++
 src/test/regress/expected/foreign_key.out     |  47 +++
 src/test/regress/sql/foreign_key.sql          |  64 ++++
 8 files changed, 635 insertions(+), 73 deletions(-)
 create mode 100644 src/test/isolation/expected/fk-concurrent-pk-upd.out
 create mode 100644 src/test/isolation/specs/fk-concurrent-pk-upd.spec

diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index a8afbf93b48..06c4784c0f5 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,10 +79,7 @@ lnext:
 		Datum		datum;
 		bool		isNull;
 		ItemPointerData tid;
-		TM_FailureData tmfd;
 		LockTupleMode lockmode;
-		int			lockflags = 0;
-		TM_Result	test;
 		TupleTableSlot *markSlot;
 
 		/* clear any leftover test tuple for this rel */
@@ -178,74 +175,11 @@ lnext:
 				break;
 		}
 
-		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-		if (!IsolationUsesXactSnapshot())
-			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-								markSlot, estate->es_output_cid,
-								lockmode, erm->waitPolicy,
-								lockflags,
-								&tmfd);
-
-		switch (test)
-		{
-			case TM_WouldBlock:
-				/* couldn't lock tuple in SKIP LOCKED mode */
-				goto lnext;
-
-			case TM_SelfModified:
-
-				/*
-				 * The target tuple was already updated or deleted by the
-				 * current command, or by a later command in the current
-				 * transaction.  We *must* ignore the tuple in the former
-				 * case, so as to avoid the "Halloween problem" of repeated
-				 * update attempts.  In the latter case it might be sensible
-				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_update and heap_delete to not
-				 * complain about updating "invisible" tuples, which seems
-				 * pretty scary (table_tuple_lock will not complain, but few
-				 * callers expect TM_Invisible, and we're not one of them). So
-				 * for now, treat the tuple as deleted and do not process.
-				 */
-				goto lnext;
-
-			case TM_Ok:
-
-				/*
-				 * Got the lock successfully, the locked tuple saved in
-				 * markSlot for, if needed, EvalPlanQual testing below.
-				 */
-				if (tmfd.traversed)
-					epq_needed = true;
-				break;
-
-			case TM_Updated:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				elog(ERROR, "unexpected table_tuple_lock status: %u",
-					 test);
-				break;
-
-			case TM_Deleted:
-				if (IsolationUsesXactSnapshot())
-					ereport(ERROR,
-							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-							 errmsg("could not serialize access due to concurrent update")));
-				/* tuple was deleted so don't return it */
-				goto lnext;
-
-			case TM_Invisible:
-				elog(ERROR, "attempted to lock invisible tuple");
-				break;
-
-			default:
-				elog(ERROR, "unrecognized table_tuple_lock status: %u",
-					 test);
-		}
+		/* skip tuple if it couldn't be locked */
+		if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+								estate->es_snapshot, estate->es_output_cid,
+								lockmode, erm->waitPolicy, &epq_needed))
+			goto lnext;
 
 		/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
 		erm->curCtid = tid;
@@ -280,6 +214,94 @@ lnext:
 	return slot;
 }
 
+
+/*
+ * ExecLockTableTuple
+ * 		Locks tuple with the specified TID in lockmode following given wait
+ * 		policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+				   Snapshot snapshot, CommandId cid,
+				   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+				   bool *tuple_concurrently_updated)
+{
+	TM_FailureData tmfd;
+	int			lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+	TM_Result	test;
+
+	if (tuple_concurrently_updated)
+		*tuple_concurrently_updated = false;
+
+	if (!IsolationUsesXactSnapshot())
+		lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+	test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+							waitPolicy, lockflags, &tmfd);
+
+	switch (test)
+	{
+		case TM_WouldBlock:
+			/* couldn't lock tuple in SKIP LOCKED mode */
+			return false;
+
+		case TM_SelfModified:
+			/*
+			 * The target tuple was already updated or deleted by the
+			 * current command, or by a later command in the current
+			 * transaction.  We *must* ignore the tuple in the former
+			 * case, so as to avoid the "Halloween problem" of repeated
+			 * update attempts.  In the latter case it might be sensible
+			 * to fetch the updated tuple instead, but doing so would
+			 * require changing heap_update and heap_delete to not
+			 * complain about updating "invisible" tuples, which seems
+			 * pretty scary (table_tuple_lock will not complain, but few
+			 * callers expect TM_Invisible, and we're not one of them). So
+			 * for now, treat the tuple as deleted and do not process.
+			 */
+			return false;
+
+		case TM_Ok:
+			/*
+			 * Got the lock successfully, the locked tuple saved in
+			 * slot for EvalPlanQual, if asked by the caller.
+			 */
+			if (tmfd.traversed && tuple_concurrently_updated)
+				*tuple_concurrently_updated = true;
+			break;
+
+		case TM_Updated:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			elog(ERROR, "unexpected table_tuple_lock status: %u",
+				 test);
+			break;
+
+		case TM_Deleted:
+			if (IsolationUsesXactSnapshot())
+				ereport(ERROR,
+						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+						 errmsg("could not serialize access due to concurrent update")));
+			/* tuple was deleted so don't return it */
+			return false;
+
+		case TM_Invisible:
+			elog(ERROR, "attempted to lock invisible tuple");
+			return false;
+
+		default:
+			elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+			return false;
+	}
+
+	return true;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 059fc5ebf60..cfb85b9d753 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -24,12 +24,15 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/skey.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/index.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_namespace.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
@@ -238,6 +241,188 @@ pg_noreturn static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 										   TupleTableSlot *violatorslot, TupleDesc tupdesc,
 										   int queryno, bool is_restrict, bool partgone);
 
+static bool
+ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo, Relation pk_rel)
+{
+	/*
+	 * Partitioned referenced tables are skipped for simplicity, since
+	 * they require routing the probe through the correct partition using
+	 * PartitionDirectory.
+	 * This can be added later as a separate patch once the core mechanism
+	 * is stable.
+	 */
+	if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return false;
+
+	/*
+	 * Temporal foreign keys use range overlap and containment semantics
+	 * (&&, <@, range_agg()) that inherently involve aggregation and
+	 * multiple-row reasoning, so they stay on the SPI path.
+	 */
+	if (riinfo->hasperiod)
+		return false;
+
+	return true;
+}
+
+/*
+ * get_fkey_unique_index
+ *  Returns the unique index used by a supposedly foreign key constraint
+ *
+ * XXX This is very similar to get_constraint_index; probably they should be
+ * unified.
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+	Oid			result = InvalidOid;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+	if (HeapTupleIsValid(tp))
+	{
+		Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+		if (contup->contype == CONSTRAINT_FOREIGN)
+			result = contup->conindid;
+		ReleaseSysCache(tp);
+	}
+
+	if (!OidIsValid(result))
+		elog(ERROR, "unique index not found for foreign key constraint %u",
+			 conoid);
+
+	return result;
+}
+
+/*
+ * ri_CheckPermissions
+ *   Check that the new user has permissions to look into the schema of
+ *   and SELECT from 'query_rel'
+ *
+ * Provided for non-SQL implementors of an RI_Plan.
+ */
+static void
+ri_CheckPermissions(Relation query_rel)
+{
+	AclResult aclresult;
+
+	/* USAGE on schema. */
+	aclresult = object_aclcheck(NamespaceRelationId,
+								RelationGetNamespace(query_rel),
+								GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_SCHEMA,
+					   get_namespace_name(RelationGetNamespace(query_rel)));
+
+	/* SELECT on relation. */
+	aclresult = pg_class_aclcheck(RelationGetRelid(query_rel), GetUserId(),
+								  ACL_SELECT);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_TABLE,
+					   RelationGetRelationName(query_rel));
+}
+
+/*
+ * This checks that the index key of the tuple specified in 'new_slot' matches
+ * the key that has already been found in the PK index relation 'idxrel'.
+ *
+ * Returns true if the index key of the tuple matches the existing index
+ * key, false otherwise.
+ */
+static bool
+recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
+						 TupleTableSlot *new_slot)
+{
+	IndexInfo *indexInfo = BuildIndexInfo(idxrel);
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	bool		matched = true;
+
+	/* PK indexes never have these. */
+	Assert(indexInfo->ii_Expressions == NIL &&
+		   indexInfo->ii_ExclusionOps == NULL);
+
+	/* Form the index values and isnull flags given the table tuple. */
+	FormIndexDatum(indexInfo, new_slot, NULL, values, isnull);
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		ScanKeyData		*skey = &skeys[i];
+
+		/* A PK column can never be set to NULL. */
+		Assert(!isnull[i]);
+		if (!DatumGetBool(FunctionCall2Coll(&skey->sk_func,
+											skey->sk_collation,
+											skey->sk_argument,
+											values[i])))
+		{
+			matched = false;
+			break;
+		}
+	}
+
+	return matched;
+}
+
+/*
+ * Doesn't include any cache for now.
+ */
+static void
+build_scankeys_from_cache(const RI_ConstraintInfo *riinfo,
+						  Relation pk_rel, Relation fk_rel,
+						  Relation idx_rel, int num_pk,
+						  Datum *pk_vals, char *pk_nulls,
+						  ScanKey skeys)
+{
+	/* Use PK = FK equality operator. */
+	const Oid *eq_oprs = riinfo->pf_eq_oprs;
+
+	Assert(num_pk == riinfo->nkeys);
+
+	/*
+	 * May need to cast each of the individual values of the foreign key
+	 * to the corresponding PK column's type if the equality operator
+	 * demands it.
+	 */
+	for (int i = 0; i < riinfo->nkeys; i++)
+	{
+		if (pk_nulls[i] != 'n')
+		{
+			Oid  eq_opr = eq_oprs[i];
+			Oid  typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+			RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+			if (OidIsValid(entry->cast_func_finfo.fn_oid))
+				pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+										   pk_vals[i],
+										   Int32GetDatum(-1), /* typmod */
+										   BoolGetDatum(false)); /* implicit coercion */
+		} else {
+			Assert(false);
+		}
+	}
+
+	/*
+	 * Set up ScanKeys for the index scan. This is essentially how
+	 * ExecIndexBuildScanKeys() sets them up.
+	 */
+	for (int i = 0; i < num_pk; i++)
+	{
+		int		pkattrno = i + 1;
+		Oid		lefttype,
+				righttype;
+		Oid		operator = eq_oprs[i];
+		Oid		opfamily = idx_rel->rd_opfamily[i];
+		int  strat;
+		RegProcedure regop = get_opcode(operator);
+
+		get_op_opfamily_properties(operator, opfamily, false, &strat,
+								   &lefttype, &righttype);
+		ScanKeyEntryInitialize(&skeys[i], 0, pkattrno, strat, righttype,
+							   idx_rel->rd_indcollation[i], regop,
+							   pk_vals[i]);
+	}
+}
 
 /*
  * RI_FKey_check -
@@ -349,6 +534,132 @@ RI_FKey_check(TriggerData *trigdata)
 			break;
 	}
 
+	/* Fast path, for simple cases, probe the unique index directly */
+	if (ri_fastpath_is_applicable(riinfo, pk_rel))
+	{
+		Oid			idxoid;
+		Relation	idxrel;
+		int			num_pk;
+		Datum		pk_vals[INDEX_MAX_KEYS];
+		char		pk_nulls[INDEX_MAX_KEYS];
+		ScanKeyData	skey[INDEX_MAX_KEYS];
+		IndexScanDesc	scan;
+		TupleTableSlot *outslot;
+		Oid				saved_userid;
+		int				saved_sec_context;
+		bool			tuple_concurrently_updated;
+		int				tuples_processed = 0;
+
+		elog(DEBUG1,
+			 "RI fastpath: constraint \"%s\" using fast path",
+			 NameStr(riinfo->conname));
+
+		/*
+		 * Extract the unique key from the provided slot and choose the
+		 * equality operators to use when scanning the index below.
+		 */
+		ri_ExtractValues(fk_rel, newslot, riinfo, false, pk_vals, pk_nulls);
+
+		/*
+		 * Switch to referenced table's owner to perform the below operations as.
+		 * This matches what ri_PerformCheck() does.
+		 */
+		GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
+		SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+							   saved_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+							   SECURITY_NOFORCE_RLS);
+		ri_CheckPermissions(pk_rel);
+
+		PushActiveSnapshot(GetTransactionSnapshot());
+		CommandCounterIncrement();
+		UpdateActiveSnapshotCommandId();
+
+		/*
+		 * Open the constraint index to be scanned.
+		 *
+		 * Handle partitioned 'pk_rel' later, skipped in ri_fastpath_is_applicable
+		 */
+		idxoid = get_fkey_unique_index(riinfo->constraint_id);
+		idxrel = index_open(idxoid, RowShareLock);
+		num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+
+		build_scankeys_from_cache(riinfo, pk_rel, fk_rel, idxrel, num_pk,
+								  pk_vals, pk_nulls, skey);
+
+		scan = index_beginscan(pk_rel, idxrel, GetActiveSnapshot(), NULL, riinfo->nkeys, 0);
+
+		/* Install the ScanKeys. */
+		index_rescan(scan, skey, num_pk, NULL, 0);
+
+		/* should be cached, avoid create for each row */
+		outslot = table_slot_create(pk_rel, NULL);
+
+		/* Look for the tuple, and if found, try to lock it in key share mode. */
+		if (!index_getnext_slot(scan, ForwardScanDirection, outslot))
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   newslot,
+							   NULL,
+							   RI_PLAN_CHECK_LOOKUPPK, false, false);
+
+		/*
+		 * If we fail to lock the tuple for whatever reason, assume it doesn't
+		 * exist.  If the locked tuple is the one that was found to be updated
+		 * concurrently, retry.
+		 */
+		if (ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+							   GetActiveSnapshot(),
+							   GetCurrentCommandId(false),
+							   LockTupleKeyShare,
+							   LockWaitBlock,
+							   &tuple_concurrently_updated))
+		{
+			bool		matched = true;
+
+			/*
+			 * If the matched table tuple has been updated, check if the key is
+			 * still the same.
+			 *
+			 * This emulates EvalPlanQual() in the executor.
+			 */
+			if (tuple_concurrently_updated &&
+				!recheck_matched_pk_tuple(idxrel, skey, outslot))
+				matched = false;
+
+			if (matched)
+				tuples_processed = 1;
+		}
+
+		index_endscan(scan);
+		ExecDropSingleTupleTableSlot(outslot);
+
+		/* Don't release lock until commit. */
+		index_close(idxrel, NoLock);
+
+		PopActiveSnapshot();
+
+		/* Restore UID and security context */
+		SetUserIdAndSecContext(saved_userid, saved_sec_context);
+
+		if (tuples_processed == 1)
+		{
+			table_close(pk_rel, RowShareLock);
+			return PointerGetDatum(NULL);
+		}
+		else
+		{
+			ri_ReportViolation(riinfo,
+							   pk_rel, fk_rel,
+							   newslot,
+							   NULL,
+							   RI_PLAN_CHECK_LOOKUPPK, false, false);
+		}
+	}
+
+	/* Fall back to SPI */
+	elog(DEBUG1, "RI fastpath: constraint \"%s\" falling back to SPI",
+		 NameStr(riinfo->conname));
+
 	SPI_connect();
 
 	/* Fetch or prepare a saved plan for the real check */
@@ -3165,8 +3476,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
 		 * moment since that will never be generated for implicit coercions.
 		 */
 		op_input_types(eq_opr, &lefttype, &righttype);
-		Assert(lefttype == righttype);
-		if (typeid == lefttype)
+
+		/*
+		 * Don't need to cast if the values that will be passed to the
+		 * operator will be of expected operand type(s).  The operator can be
+		 * cross-type (such as when called by ri_LookupKeyInPkRel()), in which
+		 * case, we only need the cast if the right operand value doesn't match
+		 * the type expected by the operator.
+		 */
+		if ((lefttype == righttype && typeid == lefttype) ||
+			(lefttype != righttype && typeid == righttype))
 			castfunc = InvalidOid;	/* simplest case */
 		else
 		{
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index fa2b657fb2f..8155aa7ae79 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -303,6 +303,15 @@ extern void ExecShutdownNode(PlanState *node);
 extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
+/*
+ * functions in nodeLockRows.c
+ */
+
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+							   Snapshot snapshot, CommandId cid,
+							   LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+							   bool *tuple_concurrently_updated);
+
 /* ----------------------------------------------------------------
  *		ExecProcNode
  *
diff --git a/src/test/isolation/expected/fk-concurrent-pk-upd.out b/src/test/isolation/expected/fk-concurrent-pk-upd.out
new file mode 100644
index 00000000000..9bbec638ac9
--- /dev/null
+++ b/src/test/isolation/expected/fk-concurrent-pk-upd.out
@@ -0,0 +1,58 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s2ukey s1i s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2c: COMMIT;
+step s1i: <... completed>
+ERROR:  insert or update on table "child" violates foreign key constraint "child_parent_key_fkey"
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         2|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+(0 rows)
+
+
+starting permutation: s2uaux s1i s2c s1c s2s s1s
+step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1);
+step s2c: COMMIT;
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|bar
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
+
+starting permutation: s2ukey s1i s2ukey2 s2c s1c s2s s1s
+step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
+step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
+step s2ukey2: UPDATE parent SET parent_key = 1 WHERE parent_key = 2;
+step s2c: COMMIT;
+step s1i: <... completed>
+step s1c: COMMIT;
+step s2s: SELECT * FROM parent;
+parent_key|aux
+----------+---
+         1|foo
+(1 row)
+
+step s1s: SELECT * FROM child;
+child_key|parent_key
+---------+----------
+        1|         1
+(1 row)
+
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 112f05a3677..124d4cc289f 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -37,6 +37,7 @@ test: fk-partitioned-2
 test: fk-snapshot
 test: fk-snapshot-2
 test: fk-snapshot-3
+test: fk-concurrent-pk-upd
 test: subxid-overflow
 test: eval-plan-qual
 test: eval-plan-qual-trigger
diff --git a/src/test/isolation/specs/fk-concurrent-pk-upd.spec b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
new file mode 100644
index 00000000000..cba05a85f78
--- /dev/null
+++ b/src/test/isolation/specs/fk-concurrent-pk-upd.spec
@@ -0,0 +1,42 @@
+# Tests that an INSERT on referencing table correctly fails when
+# the referenced value disappears due to a concurrent update
+setup
+{
+  CREATE TABLE parent (
+    parent_key int PRIMARY KEY,
+    aux   text NOT NULL
+  );
+
+  CREATE TABLE child (
+    child_key int PRIMARY KEY,
+    parent_key int NOT NULL REFERENCES parent
+  );
+
+  INSERT INTO parent VALUES (1, 'foo');
+}
+
+teardown
+{
+  DROP TABLE parent, child;
+}
+
+session s1
+setup  { BEGIN; }
+step s1i { INSERT INTO child VALUES (1, 1); }
+step s1c { COMMIT; }
+step s1s { SELECT * FROM child; }
+
+session s2
+setup  { BEGIN; }
+step s2ukey { UPDATE parent SET parent_key = 2 WHERE parent_key = 1; }
+step s2uaux { UPDATE parent SET aux = 'bar' WHERE parent_key = 1; }
+step s2ukey2 { UPDATE parent SET parent_key = 1 WHERE parent_key = 2; }
+step s2c { COMMIT; }
+step s2s { SELECT * FROM parent; }
+
+# fail
+permutation s2ukey s1i s2c s1c s2s s1s
+# ok
+permutation s2uaux s1i s2c s1c s2s s1s
+# ok
+permutation s2ukey s1i s2ukey2 s2c s1c s2s s1s
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 7f9e0ebb82d..eb7d393ea25 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -370,6 +370,53 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 --
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+SET ROLE regress_foreign_key_user;
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+RESET ROLE;
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+ERROR:  permission denied for table pktable
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+--
 -- Check initial check upon ALTER TABLE
 --
 CREATE TABLE PKTABLE ( ptest1 int, ptest2 int, PRIMARY KEY(ptest1, ptest2) );
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 4a6172b8e56..4b2198348d2 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -242,6 +242,70 @@ SELECT * FROM PKTABLE;
 DROP TABLE FKTABLE;
 DROP TABLE PKTABLE;
 
+--
+-- Check RLS
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
+
+-- Enable RLS on PKTABLE and Create policies
+ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
+CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+SET ROLE regress_foreign_key_user;
+
+INSERT INTO FKTABLE VALUES (3, 5);
+INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
+
+RESET ROLE;
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+DROP USER regress_foreign_key_user;
+
+--
+-- Check ACL
+--
+CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
+CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
+
+-- Insert test data into PKTABLE
+INSERT INTO PKTABLE VALUES (1, 'Test1');
+INSERT INTO PKTABLE VALUES (2, 'Test2');
+INSERT INTO PKTABLE VALUES (3, 'Test3');
+
+-- Grant usage on PKTABLE to user regress_foreign_key_user
+CREATE USER regress_foreign_key_user NOLOGIN;
+GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
+
+ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
+
+-- Inserting into FKTABLE should work
+INSERT INTO FKTABLE VALUES (3, 5);
+
+-- Revoke usage on PKTABLE from user regress_foreign_key_user
+REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
+
+-- Inserting into FKTABLE should fail
+INSERT INTO FKTABLE VALUES (2, 6);
+
+DROP TABLE FKTABLE;
+DROP TABLE PKTABLE;
+
+DROP USER regress_foreign_key_user;
+
 --
 -- Check initial check upon ALTER TABLE
 --
-- 
2.41.0

