Hello,

On 2019-Jan-21, Amit Langote wrote:

> On 2018/12/01 4:12, Alvaro Herrera wrote:
> > Here's a more credible version of this patch series.
> 
> Are you going to post rebased patches soon?

Yes, very soon -- right now, in fact :-)

Two preliminary patches in this series are already pushed, per a nearby
bugfix.  I also split out the new index_get_partition routine to
catalog/partition.c, per comment from Jesper, and put it on its own
patch.  0003 is the interesting bits in this submission.

Note that there is a change in constraint naming on partitions.  This
affects some preexisting test output ... and I'm not liking the changes
much, anymore.  I'll have a look about how to revert to the previous
behavior.

As you noticed in the other thread, the part of the FK clone routine
that attaches to an existing constraint needed to be refactored into its
own routine.  I did that, though the split is different from what you
were proposing.

Jesper also mentioned removing the "#if 0" code.  Actually what I need
to be doing is reinstating that check in the cases where it's possible.
I haven't done that yet.

I also dropped the part that changed how psql reports constraints in \d,
since there's a separate commitfest entry for that one.

Hmm, I just noticed that there's an ereport that fails i18n by way of
using a ternary operator in the first argument of errmsg.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From b294c1060a9fd28ec2cab752354769ef928422ac Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 28 Nov 2018 11:52:00 -0300
Subject: [PATCH 1/3] Rework deleteObjectsInList to allow objtype-specific
 checks

This doesn't change any functionality yet.
---
 src/backend/catalog/dependency.c | 41 ++++++++++++++++++++++++----------------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 2c54895831..16e5fadbbe 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -228,29 +228,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 	int			i;
 
 	/*
-	 * Keep track of objects for event triggers, if necessary.
+	 * Invoke pre-deletion callbacks and keep track of objects for event
+	 * triggers, if necessary.
 	 */
-	if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
+	for (i = 0; i < targetObjects->numrefs; i++)
 	{
-		for (i = 0; i < targetObjects->numrefs; i++)
+		const ObjectAddress *thisobj = &targetObjects->refs[i];
+		Oid			objectClass = getObjectClass(thisobj);
+
+		if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
 		{
-			const ObjectAddress *thisobj = &targetObjects->refs[i];
-			const ObjectAddressExtra *extra = &targetObjects->extras[i];
-			bool		original = false;
-			bool		normal = false;
-
-			if (extra->flags & DEPFLAG_ORIGINAL)
-				original = true;
-			if (extra->flags & DEPFLAG_NORMAL)
-				normal = true;
-			if (extra->flags & DEPFLAG_REVERSE)
-				normal = true;
-
-			if (EventTriggerSupportsObjectClass(getObjectClass(thisobj)))
+			if (EventTriggerSupportsObjectClass(objectClass))
 			{
+				bool		original = false;
+				bool		normal = false;
+				const ObjectAddressExtra *extra = &targetObjects->extras[i];
+
+				if (extra->flags & DEPFLAG_ORIGINAL)
+					original = true;
+				if (extra->flags & DEPFLAG_NORMAL ||
+					extra->flags & DEPFLAG_REVERSE)
+					normal = true;
+
 				EventTriggerSQLDropAddObject(thisobj, original, normal);
 			}
 		}
+
+		/* Invoke class-specific pre-deletion checks */
+		switch (objectClass)
+		{
+			default:
+				break;
+		}
 	}
 
 	/*
-- 
2.11.0

>From 838e99c49839ae2b6aa45d43abb13a3156b2267b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 22 Jan 2019 18:00:31 -0300
Subject: [PATCH 2/3] index_get_partition

---
 src/backend/catalog/partition.c  | 35 +++++++++++++++++++++++++++++++++++
 src/backend/commands/tablecmds.c | 40 +++++++++++-----------------------------
 src/include/catalog/partition.h  |  1 +
 3 files changed, 47 insertions(+), 29 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 62d1ec60ba..a2573aada5 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -148,6 +148,41 @@ get_partition_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
 }
 
 /*
+ * Return the OID of the index, in the given partition, that is a child of the
+ * given index or InvalidOid if there isn't one.
+ */
+Oid
+index_get_partition(Relation partition, Oid indexId)
+{
+	List	   *idxlist = RelationGetIndexList(partition);
+	ListCell   *l;
+
+	foreach(l, idxlist)
+	{
+		Oid			partIdx = lfirst_oid(l);
+		HeapTuple	tup;
+		Form_pg_class classForm;
+		bool		ispartition;
+
+		tup = SearchSysCache1(RELOID, ObjectIdGetDatum(partIdx));
+		if (!tup)
+			elog(ERROR, "cache lookup failed for relation %u", partIdx);
+		classForm = (Form_pg_class) GETSTRUCT(tup);
+		ispartition = classForm->relispartition;
+		ReleaseSysCache(tup);
+		if (!ispartition)
+			continue;
+		if (get_partition_parent(lfirst_oid(l)) == indexId)
+		{
+			list_free(idxlist);
+			return partIdx;
+		}
+	}
+
+	return InvalidOid;
+}
+
+/*
  * map_partition_varattnos - maps varattno of any Vars in expr from the
  * attno's of 'from_rel' to the attno's of 'to_rel' partition, each of which
  * may be either a leaf partition or a partitioned table, but both of which
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 28a137bb53..5ee71b09fb 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -15394,36 +15394,18 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 static void
 refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl)
 {
-	Relation	pg_inherits;
-	ScanKeyData key;
-	HeapTuple	tuple;
-	SysScanDesc scan;
+	Oid			existingIdx;
 
-	pg_inherits = table_open(InheritsRelationId, AccessShareLock);
-	ScanKeyInit(&key, Anum_pg_inherits_inhparent,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(RelationGetRelid(parentIdx)));
-	scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true,
-							  NULL, 1, &key);
-	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
-	{
-		Form_pg_inherits inhForm;
-		Oid			tab;
-
-		inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
-		tab = IndexGetRelation(inhForm->inhrelid, false);
-		if (tab == RelationGetRelid(partitionTbl))
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
-							RelationGetRelationName(partIdx),
-							RelationGetRelationName(parentIdx)),
-					 errdetail("Another index is already attached for partition \"%s\".",
-							   RelationGetRelationName(partitionTbl))));
-	}
-
-	systable_endscan(scan);
-	table_close(pg_inherits, AccessShareLock);
+	existingIdx = index_get_partition(partitionTbl,
+									  RelationGetRelid(parentIdx));
+	if (OidIsValid(existingIdx))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+						RelationGetRelationName(partIdx),
+						RelationGetRelationName(parentIdx)),
+				 errdetail("Another index is already attached for partition \"%s\".",
+						   RelationGetRelationName(partitionTbl))));
 }
 
 /*
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index 5685d2fd57..7f0b198bcb 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -35,6 +35,7 @@ typedef struct PartitionDescData
 
 extern Oid	get_partition_parent(Oid relid);
 extern List *get_partition_ancestors(Oid relid);
+extern Oid	index_get_partition(Relation partition, Oid indexId);
 extern List *map_partition_varattnos(List *expr, int fromrel_varno,
 						Relation to_rel, Relation from_rel,
 						bool *found_whole_row);
-- 
2.11.0

>From adbc09d817fb5db89c78f41e34663c9bc8f9f944 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 22 Jan 2019 18:04:37 -0300
Subject: [PATCH 3/3] Support FKs referencing partitiones tables

---
 doc/src/sgml/ref/create_table.sgml        |    7 +-
 src/backend/catalog/dependency.c          |    3 +
 src/backend/catalog/heap.c                |   22 +
 src/backend/catalog/pg_constraint.c       |   15 +-
 src/backend/commands/tablecmds.c          | 1000 ++++++++++++++++++++++-------
 src/backend/utils/adt/ri_triggers.c       |  228 ++++++-
 src/backend/utils/adt/ruleutils.c         |   18 +
 src/include/catalog/heap.h                |    2 +
 src/include/commands/tablecmds.h          |    6 +-
 src/include/commands/trigger.h            |    1 +
 src/include/utils/ruleutils.h             |    1 +
 src/test/regress/expected/foreign_key.out |  178 ++++-
 src/test/regress/sql/foreign_key.sql      |  114 +++-
 13 files changed, 1314 insertions(+), 281 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 857515ec8f..656a33f90f 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -378,9 +378,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
       however, you can define these constraints on individual partitions.
-      Also, while it's possible to define <literal>PRIMARY KEY</literal>
-      constraints on partitioned tables, creating foreign keys that
-      reference a partitioned table is not yet supported.
      </para>
 
      <para>
@@ -994,9 +991,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       addition of a foreign key constraint requires a
       <literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table.
       Note that foreign key constraints cannot be defined between temporary
-      tables and permanent tables.  Also note that while it is possible to
-      define a foreign key on a partitioned table, it is not possible to
-      declare a foreign key that references a partitioned table.
+      tables and permanent tables.
      </para>
 
      <para>
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 16e5fadbbe..70e46a52ab 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -257,6 +257,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 		/* Invoke class-specific pre-deletion checks */
 		switch (objectClass)
 		{
+			case OCLASS_CLASS:
+				pre_drop_class_check(thisobj->objectId, thisobj->objectSubId);
+				break;
 			default:
 				break;
 		}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 1153688a1c..fcb746c807 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1782,6 +1782,28 @@ RemoveAttrDefaultById(Oid attrdefId)
 }
 
 /*
+ * Checks to be run before just dropping a relation.
+ */
+void
+pre_drop_class_check(Oid relationId, Oid objectSubId)
+{
+	Relation	relation;
+
+	/* caller must hold strong lock already, if they're dropping */
+	relation = relation_open(relationId, NoLock);
+
+	/*
+	 * For leaf partitions, this is our last chance to verify any foreign keys
+	 * that may point to the partition as referenced table.
+	 */
+	if (relation->rd_rel->relkind == RELKIND_RELATION &&
+		relation->rd_rel->relispartition)
+		CheckNoForeignKeyRefs(relation, true);
+
+	relation_close(relation, NoLock);
+}
+
+/*
  * heap_drop_with_catalog	- removes specified relation from catalogs
  *
  * Note that this routine is not responsible for dropping objects that are
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 446b54b9ff..f1e59dc7a1 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -302,10 +302,12 @@ CreateConstraintEntry(const char *constraintName,
 	if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
 	{
 		/*
-		 * Register normal dependency on the unique index that supports a
-		 * foreign-key constraint.  (Note: for indexes associated with unique
-		 * or primary-key constraints, the dependency runs the other way, and
-		 * is not made here.)
+		 * Register dependency on the unique index that supports a foreign-key
+		 * constraint.  (Note: for indexes associated with unique or
+		 * primary-key constraints, the dependency runs the other way, and is
+		 * not made here.)  For standalone constraints, this is a normal
+		 * dependency; for a constraint that is implementation part of a
+		 * larger one, it's internal-auto.
 		 */
 		ObjectAddress relobject;
 
@@ -313,7 +315,10 @@ CreateConstraintEntry(const char *constraintName,
 		relobject.objectId = indexRelId;
 		relobject.objectSubId = 0;
 
-		recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
+		recordDependencyOn(&conobject, &relobject,
+						   OidIsValid(parentConstrId) ?
+						   DEPENDENCY_INTERNAL_AUTO :
+						   DEPENDENCY_NORMAL);
 	}
 
 	if (foreignNKeys > 0)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 5ee71b09fb..2c37903642 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -411,10 +411,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
 						  Relation rel, Constraint *fkconstraint, Oid parentConstr,
 						  bool recurse, bool recursing,
 						  LOCKMODE lockmode);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+					   Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks, int16 *pkattnum, int16 *fkattnum,
+					   Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+					   bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+						Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks,
+						int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+						Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok,
+						bool create_constraint);
 static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
 						   List **cloned);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint);
 static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 				   Relation partRel, List *clone, List **cloned);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+							  Constraint *fkconstraint, Oid constraintOid,
+							  Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+							   Constraint *fkconstraint, Oid constraintOid,
+							   Oid indexOid);
+static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+							 Oid parentConstrOid, int numfks,
+							 AttrNumber *mapped_conkey, AttrNumber *confkey,
+							 Oid *conpfeqop);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
 					 DropBehavior behavior,
 					 bool recurse, bool recursing,
@@ -3463,7 +3485,8 @@ AlterTableGetLockLevel(List *cmds)
 
 				/*
 				 * Removing constraints can affect SELECTs that have been
-				 * optimised assuming the constraint holds true.
+				 * optimised assuming the constraint holds true. See also
+				 * CloneFkReferenced.
 				 */
 			case AT_DropConstraint: /* as DROP INDEX */
 			case AT_DropNotNull:	/* may change some SQL plans */
@@ -7045,11 +7068,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 			break;
 
 		case CONSTR_FOREIGN:
-
 			/*
-			 * Note that we currently never recurse for FK constraints, so the
-			 * "recurse" flag is silently ignored.
-			 *
 			 * Assign or validate constraint name
 			 */
 			if (newConstraint->conname)
@@ -7228,6 +7247,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
  * lock on the rel, and have done appropriate validity checks for it.
  * We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof).  We also need the appropriate triggers to be created on each leaf
+ * partition.
  */
 static ObjectAddress
 ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7247,7 +7272,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	int			numfks,
 				numpks;
 	Oid			indexOid;
-	Oid			constrOid;
 	bool		old_check_ok;
 	ObjectAddress address;
 	ListCell   *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7265,12 +7289,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * Validity checks (permission checks wait till we have the column
 	 * numbers)
 	 */
-	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("cannot reference partitioned table \"%s\"",
-						RelationGetRelationName(pkrel))));
-
 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
 		if (!recurse)
@@ -7288,7 +7306,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 					 errdetail("This feature is not yet supported on partitioned tables.")));
 	}
 
-	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("referenced relation \"%s\" is not a table",
@@ -7498,8 +7517,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
 			ereport(ERROR,
 					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("foreign key constraint \"%s\" "
-							"cannot be implemented",
+					 errmsg(fkconstraint->conname ?
+							"foreign key constraint \"%s\" cannot be implemented" :
+							"foreign key constraint cannot be implemented",
 							fkconstraint->conname),
 					 errdetail("Key columns \"%s\" and \"%s\" "
 							   "are of incompatible types: %s and %s.",
@@ -7587,15 +7607,121 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	}
 
 	/*
+	 * Create all the constraint and trigger objects, recursing to partitions
+	 * as necessary.  First handle the referenced side.
+	 */
+	address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+									 indexOid,
+									 InvalidOid,	/* no parent constraint */
+									 numfks,
+									 pkattnum,
+									 fkattnum,
+									 pfeqoperators,
+									 ppeqoperators,
+									 ffeqoperators,
+									 old_check_ok);
+
+	/*
+	 * Now handle the referencing side.  We don't need the topmost constraint
+	 * on this side, because the constraint we created above fills that role;
+	 * if this recurses for any partitions then more constraints are added.
+	 */
+	addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+							indexOid,
+							address.objectId,
+							numfks,
+							pkattnum,
+							fkattnum,
+							pfeqoperators,
+							ppeqoperators,
+							ffeqoperators,
+							old_check_ok,
+							false);
+
+	/*
+	 * Close pk table, but keep lock until we've committed.
+	 */
+	table_close(pkrel, NoLock);
+
+	return address;
+}
+
+/*
+ * addFkRecurseReferenced
+ *		recursive subroutine for ATAddForeignKeyConstraint, referenced side
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions.  If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * On constraint names: It's not possible in general to give all the cascaded
+ * constraints the same name, so we don't try.  (Also, even when it *is*
+ * possible, it goes counter to the SQL-standard rule that constraint names
+ * must be unique within a schema.)  Therefore we apply the given name to the
+ * top-most constraint, and use generated names when cascading to partitions
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstraint is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+					   Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks,
+					   int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+					   Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+	ObjectAddress address;
+	Oid			constrOid;
+	char	   *conname;
+
+	/*
+	 * Verify relkind for each referenced partition.  At the top level, this
+	 * is redundant with a previous check, but we need it when recursing.
+	 */
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("referenced relation \"%s\" is not a table",
+						RelationGetRelationName(pkrel))));
+
+	/*
+	 * Caller supplies us with a constraint name; however, it may be used in
+	 * this partition, so come up with a different one in that case.
+	 */
+	if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+							 RelationGetRelid(rel),
+							 fkconstraint->conname))
+		conname = ChooseConstraintName(RelationGetRelationName(rel),
+									   strVal(linitial(fkconstraint->fk_attrs)),
+									   "fkey",
+									   RelationGetNamespace(rel), NIL);
+	else
+		conname = fkconstraint->conname;
+
+	/*
 	 * Record the FK constraint in pg_constraint.
 	 */
-	constrOid = CreateConstraintEntry(fkconstraint->conname,
+	constrOid = CreateConstraintEntry(conname,
 									  RelationGetNamespace(rel),
 									  CONSTRAINT_FOREIGN,
 									  fkconstraint->deferrable,
 									  fkconstraint->initdeferred,
 									  fkconstraint->initially_valid,
-									  parentConstr,
+									  parentConstraint,
 									  RelationGetRelid(rel),
 									  fkattnum,
 									  numfks,
@@ -7607,7 +7733,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 									  pfeqoperators,
 									  ppeqoperators,
 									  ffeqoperators,
-									  numpks,
+									  numfks,
 									  fkconstraint->fk_upd_action,
 									  fkconstraint->fk_del_action,
 									  fkconstraint->fk_matchtype,
@@ -7620,98 +7746,280 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 									  false);	/* is_internal */
 	ObjectAddressSet(address, ConstraintRelationId, constrOid);
 
-	/*
-	 * Create the triggers that will enforce the constraint.  We only want the
-	 * action triggers to appear for the parent partitioned relation, even
-	 * though the constraints also exist below.
-	 */
-	createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
-							 constrOid, indexOid, !recursing);
+	/* make new constraint visible, in case we add more */
+	CommandCounterIncrement();
 
 	/*
-	 * Tell Phase 3 to check that the constraint is satisfied by existing
-	 * rows. We can skip this during table creation, when requested explicitly
-	 * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
-	 * recreating a constraint following a SET DATA TYPE operation that did
-	 * not impugn its validity.
+	 * If the referenced table is a plain relation, create the action triggers
+	 * that enforce the constraint.
 	 */
-	if (!old_check_ok && !fkconstraint->skip_validation)
+	if (pkrel->rd_rel->relkind == RELKIND_RELATION)
 	{
-		NewConstraint *newcon;
-
-		newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-		newcon->name = fkconstraint->conname;
-		newcon->contype = CONSTR_FOREIGN;
-		newcon->refrelid = RelationGetRelid(pkrel);
-		newcon->refindid = indexOid;
-		newcon->conid = constrOid;
-		newcon->qual = (Node *) fkconstraint;
-
-		tab->constraints = lappend(tab->constraints, newcon);
+		createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+									   fkconstraint,
+									   constrOid, indexOid);
 	}
 
 	/*
-	 * When called on a partitioned table, recurse to create the constraint on
-	 * the partitions also.
+	 * If the referenced table is partitioned, recurse on ourselves to handle
+	 * each partition.  We need one pg_constraint row created for each
+	 * partition in addition to the pg_constraint row for the parent table.
 	 */
-	if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc;
-		Relation	pg_constraint;
-		List	   *cloned = NIL;
-		ListCell   *cell;
+		PartitionDesc pd = RelationGetPartitionDesc(pkrel);
+		int			i;
 
-		pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
-		partdesc = RelationGetPartitionDesc(rel);
-
-		for (i = 0; i < partdesc->nparts; i++)
+		for (i = 0; i < pd->nparts; i++)
 		{
-			Oid			partitionId = partdesc->oids[i];
-			Relation	partition = table_open(partitionId, lockmode);
+			Relation	partRel;
+			AttrNumber *map;
+			AttrNumber *mapped_pkattnum;
+			Oid			partIndexId;
 
-			CheckTableNotInUse(partition, "ALTER TABLE");
+			partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
 
-			CloneFkReferencing(pg_constraint, rel, partition,
-							   list_make1_oid(constrOid),
-							   &cloned);
+			/*
+			 * Map the attribute numbers in the referenced side of the FK
+			 * definition to match the partition's column layout.
+			 */
+			map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+													RelationGetDescr(pkrel),
+													gettext_noop("could not convert row type"));
+			if (map)
+			{
+				int			j;
 
-			table_close(partition, NoLock);
-		}
-		table_close(pg_constraint, RowExclusiveLock);
+				mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+				for (j = 0; j < numfks; j++)
+					mapped_pkattnum[j] = map[pkattnum[j] - 1];
+			}
+			else
+				mapped_pkattnum = pkattnum;
 
-		foreach(cell, cloned)
-		{
-			ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
-			Relation    partition = table_open(cc->relid, lockmode);
-			AlteredTableInfo *childtab;
-			NewConstraint *newcon;
+			/* do the deed */
+			partIndexId = index_get_partition(partRel, indexOid);
+			if (!OidIsValid(partIndexId))
+				elog(ERROR, "index for %u not found in partition %s",
+					 indexOid, RelationGetRelationName(partRel));
+			addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+								   partIndexId, constrOid, numfks,
+								   mapped_pkattnum, fkattnum,
+								   pfeqoperators, ppeqoperators, ffeqoperators,
+								   old_check_ok);
 
-			/* Find or create work queue entry for this partition */
-			childtab = ATGetQueueEntry(wqueue, partition);
-
-			newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-			newcon->name = cc->constraint->conname;
-			newcon->contype = CONSTR_FOREIGN;
-			newcon->refrelid = cc->refrelid;
-			newcon->refindid = cc->conindid;
-			newcon->conid = cc->conid;
-			newcon->qual = (Node *) fkconstraint;
-
-			childtab->constraints = lappend(childtab->constraints, newcon);
-
-			table_close(partition, lockmode);
+			/* Done -- clean up (but keep the lock) */
+			table_close(partRel, NoLock);
+			if (map)
+			{
+				pfree(mapped_pkattnum);
+				pfree(map);
+			}
 		}
 	}
 
-	/*
-	 * Close pk table, but keep lock until we've committed.
-	 */
-	table_close(pkrel, NoLock);
-
 	return address;
 }
 
 /*
+ * addFkRecurseReferencing
+ *		recursive subroutine for ATAddForeignKeyConstraint, referencing side
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ * create_constraint indicates whether to create a constraint; if false, the
+ * given parentConstr is the origin of all others.
+ *
+ * Note we never try to use the constraint name assigned in fkconstraint.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+						Relation pkrel, Oid indexOid, Oid parentConstr,
+						int numfks, int16 *pkattnum, int16 *fkattnum,
+						Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+						bool old_check_ok, bool create_constraint)
+{
+	Oid			constrOid;
+	char	   *conname;
+
+	AssertArg(OidIsValid(parentConstr));
+
+	/*
+	 * Verify relkind for each referencing partition.  At the top level, this
+	 * is redundant with a previous check, but we need it when recursing.
+	 */
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("referenced relation \"%s\" is not a table",
+						RelationGetRelationName(pkrel))));
+
+	if (!create_constraint)
+		constrOid = parentConstr;
+	else
+	{
+		/* Figure out a good constraint name to use */
+		conname = ChooseConstraintName(RelationGetRelationName(rel),
+									   strVal(linitial(fkconstraint->fk_attrs)),
+									   "fkey",
+									   RelationGetNamespace(rel), NIL);
+
+		/*
+		 * Record the FK constraint in pg_constraint.
+		 */
+		constrOid = CreateConstraintEntry(conname,
+										  RelationGetNamespace(rel),
+										  CONSTRAINT_FOREIGN,
+										  fkconstraint->deferrable,
+										  fkconstraint->initdeferred,
+										  fkconstraint->initially_valid,
+										  parentConstr,
+										  RelationGetRelid(rel),
+										  fkattnum,
+										  numfks,
+										  numfks,
+										  InvalidOid,
+										  indexOid,
+										  RelationGetRelid(pkrel),
+										  pkattnum,
+										  pfeqoperators,
+										  ppeqoperators,
+										  ffeqoperators,
+										  numfks,
+										  fkconstraint->fk_upd_action,
+										  fkconstraint->fk_del_action,
+										  fkconstraint->fk_matchtype,
+										  NULL, /* no exclusion constraint */
+										  NULL, /* no check constraint */
+										  NULL,
+										  true, /* islocal */
+										  0,	/* inhcount */
+										  true, /* isnoinherit */
+										  false);	/* is_internal */
+		/* make constraint visible */
+		CommandCounterIncrement();
+	}
+
+	/*
+	 * If the referencing relation is a plain table, add the check triggers to
+	 * it and, if necessary, schedule it to be checked in Phase 3.
+	 *
+	 * If the relation is partitioned, drill down to do it to its partitions.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		createForeignKeyCheckTriggers(RelationGetRelid(rel),
+									  RelationGetRelid(pkrel),
+									  fkconstraint,
+									  constrOid,
+									  indexOid);
+
+		/*
+		 * Tell Phase 3 to check that the constraint is satisfied by existing
+		 * rows. We can skip this during table creation, when requested
+		 * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+		 * and when we're recreating a constraint following a SET DATA TYPE
+		 * operation that did not impugn its validity.
+		 */
+		if (!old_check_ok && !fkconstraint->skip_validation)
+		{
+			NewConstraint *newcon;
+			AlteredTableInfo *tab;
+
+			tab = ATGetQueueEntry(wqueue, rel);
+
+			newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+			newcon->name = fkconstraint->conname;
+			newcon->contype = CONSTR_FOREIGN;
+			newcon->refrelid = RelationGetRelid(pkrel);
+			newcon->refindid = indexOid;
+			newcon->conid = constrOid;
+			newcon->qual = (Node *) fkconstraint;
+
+			tab->constraints = lappend(tab->constraints, newcon);
+		}
+	}
+	else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		PartitionDesc pd = RelationGetPartitionDesc(rel);
+
+		for (int i = 0; i < pd->nparts; i++)
+		{
+			Relation	partRel;
+			AttrNumber *map;
+			AttrNumber *mapped_fkattnum;
+			List	   *partFKs;
+			ListCell   *cell;
+			bool		attached;
+
+			partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
+
+			/*
+			 * Map the attribute numbers in the referencing side of the FK
+			 * definition to match the partition's column layout.
+			 */
+			map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+													RelationGetDescr(rel),
+													gettext_noop("could not convert row type"));
+			if (map)
+			{
+				mapped_fkattnum = palloc(sizeof(AttrNumber) * numfks);
+				for (int j = 0; j < numfks; j++)
+					mapped_fkattnum[j] = map[fkattnum[j] - 1];
+			}
+			else
+				mapped_fkattnum = fkattnum;
+
+			partFKs = copyObject(RelationGetFKeyList(partRel));
+			attached = false;
+			foreach(cell, partFKs)
+			{
+				ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+
+				if (tryAttachPartitionForeignKey(fk, constrOid,
+												 numfks, pkattnum,
+												 mapped_fkattnum,
+												 pfeqoperators))
+				{
+					attached = true;
+					break;
+				}
+			}
+
+			if (!attached)
+			{
+				/* down the rabbit hole */
+				addFkRecurseReferencing(wqueue, fkconstraint, partRel, pkrel,
+										indexOid, constrOid, numfks, pkattnum,
+										mapped_fkattnum,
+										pfeqoperators, ppeqoperators, ffeqoperators,
+										old_check_ok, true);
+			}
+
+			/* Done -- clean up (but keep the lock) */
+			table_close(partRel, NoLock);
+			if (map)
+			{
+				pfree(mapped_fkattnum);
+				pfree(map);
+			}
+		}
+	}
+}
+
+/*
  * CloneForeignKeyConstraints
  *		Clone foreign keys from a partitioned table to a newly acquired
  *		partition.
@@ -7740,7 +8048,14 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 	rel = table_open(relationId, AccessExclusiveLock);
 	pg_constraint = table_open(ConstraintRelationId, RowShareLock);
 
-	/* Obtain the list of constraints to clone or attach */
+	/*
+	 * Clone constraints where the parent is in the referenced side.
+	 */
+	CloneFkReferenced(parentRel, rel, pg_constraint);
+
+	/*
+	 * Now search for constraints where the parent is in the referencing side.
+	 */
 	ScanKeyInit(&key,
 				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
 				F_OIDEQ, ObjectIdGetDatum(parentId));
@@ -7757,13 +8072,160 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 	/* Do the actual work, recursing to partitions as needed */
 	CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
 
-	/* We're done.  Clean up */
+	list_free(clone);
+
+	/* We're done.  Clean up, keeping locks till commit */
 	table_close(parentRel, NoLock);
-	table_close(rel, NoLock);	/* keep lock till commit */
+	table_close(rel, NoLock);
 	table_close(pg_constraint, RowShareLock);
 }
 
 /*
+ * CloneFkReferenced
+ *		Subroutine for CloneForeignKeyConstraints, referenced side
+ *
+ * Clone the FKs that reference the parent relation.  Used when partitionRel
+ * is created/attached.  (Recursion to partitions is effected by callee
+ * addFkRecurseReferenced, so this routine is not itself recursive.)
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint)
+{
+	AttrNumber *attmap;
+	ListCell   *cell;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+	HeapTuple	tuple;
+	List	   *clone = NIL;
+
+	/*
+	 * Search for any constraints where this partition is in the referenced
+	 * side.  However, we must ignore any constraint whose parent constraint
+	 * is also going to be cloned, to avoid duplicates.  So do it in two
+	 * steps: first construct the list of constraints to clone, then go over
+	 * that list cloning those whose parents are not in the list.  (We must
+	 * not rely on the parent being seen first, since catalog order could
+	 * return children first.)
+	 */
+	attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+										RelationGetDescr(parentRel),
+										gettext_noop("could not convert row type"));
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+	/* This is a seqscan, as we don't have a usable index ... */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true,
+							  NULL, 2, key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* ignore this constraint if the parent is already on the list */
+		if (list_member_oid(clone, constrForm->conparentid))
+			continue;
+
+		clone = lappend_oid(clone, constrForm->oid);
+	}
+	systable_endscan(scan);
+
+	foreach(cell, clone)
+	{
+		Oid			constrOid = lfirst_oid(cell);
+		Form_pg_constraint constrForm;
+		Form_pg_attribute att;
+		Relation	fkRel;
+		Oid			indexOid;
+		Oid			partIndexId;
+		int			numfks;
+		AttrNumber	conkey[INDEX_MAX_KEYS];
+		AttrNumber	mapped_confkey[INDEX_MAX_KEYS];
+		AttrNumber	confkey[INDEX_MAX_KEYS];
+		Oid			conpfeqop[INDEX_MAX_KEYS];
+		Oid			conppeqop[INDEX_MAX_KEYS];
+		Oid			conffeqop[INDEX_MAX_KEYS];
+		Constraint *fkconstraint;
+
+		tuple = SearchSysCache1(CONSTROID, constrOid);
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* skip children whose parents are going to be cloned, as above */
+		if (list_member_oid(clone, constrForm->conparentid))
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		/*
+		 * Because we're only expanding the key space at the referenced side,
+		 * we don't need to prevent any operation in the referencing table, so
+		 * AccessShareLock suffices (assumes that dropping the constraint
+		 * acquires AEL).
+		 */
+		fkRel = table_open(constrForm->conrelid, AccessShareLock);
+
+		indexOid = constrForm->conindid;
+		DeconstructFkConstraintRow(tuple,
+								   &numfks,
+								   conkey,
+								   confkey,
+								   conpfeqop,
+								   conppeqop,
+								   conffeqop);
+		for (int i = 0; i < numfks; i++)
+			mapped_confkey[i] = attmap[confkey[i] - 1];
+
+		fkconstraint = makeNode(Constraint);
+		/* for now this is all we need */
+		fkconstraint->conname = NameStr(constrForm->conname);
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+		fkconstraint->initially_valid = true;
+		fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+		/*
+		 * This is a bit grotty: we only need the first column name, which is
+		 * used to generate the constraint name.
+		 */
+		att = TupleDescAttr(RelationGetDescr(partitionRel), conkey[0] - 1);
+		fkconstraint->fk_attrs = list_make1(makeString(NameStr(att->attname)));
+
+		/*
+		 * Add the new foreign key constraint pointing to the new partition.
+		 * Because this new partition appears in the referenced side of the
+		 * constraint, we don't need to set up for Phase 3 check.
+		 */
+		partIndexId = index_get_partition(partitionRel, indexOid);
+		if (!OidIsValid(partIndexId))
+			elog(ERROR, "index for %u not found in partition %s",
+				 indexOid, RelationGetRelationName(partitionRel));
+		addFkRecurseReferenced(NULL,
+							   fkconstraint,
+							   fkRel,
+							   partitionRel,
+							   partIndexId,
+							   constrOid,
+							   numfks,
+							   mapped_confkey,
+							   conkey,
+							   conpfeqop,
+							   conppeqop,
+							   conffeqop,
+							   true);
+
+		table_close(fkRel, NoLock);
+		ReleaseSysCache(tuple);
+	}
+}
+
+/*
  * CloneFkReferencing
  *		Recursive subroutine for CloneForeignKeyConstraints, referencing side
  *
@@ -7810,7 +8272,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		Oid			conppeqop[INDEX_MAX_KEYS];
 		Oid			conffeqop[INDEX_MAX_KEYS];
 		Constraint *fkconstraint;
-		bool		attach_it;
+		bool		attached;
 		Oid			constrOid;
 		ObjectAddress parentAddr,
 					childAddr;
@@ -7829,120 +8291,43 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 			ReleaseSysCache(tuple);
 			continue;
 		}
+		if (list_member_oid(clone, constrForm->conparentid))
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
 
 		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
 
 		DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
 								   conpfeqop, conppeqop, conffeqop);
+
 		for (i = 0; i < numfks; i++)
 			mapped_conkey[i] = attmap[conkey[i] - 1];
 
 		/*
 		 * Before creating a new constraint, see whether any existing FKs are
-		 * fit for the purpose.  If one is, attach the parent constraint to it,
-		 * and don't clone anything.  This way we avoid the expensive
+		 * fit for the purpose.  If one is, attach the parent constraint to
+		 * it, and don't clone anything.  This way we avoid the expensive
 		 * verification step and don't end up with a duplicate FK.  This also
 		 * means we don't consider this constraint when recursing to
 		 * partitions.
 		 */
-		attach_it = false;
+		attached = false;
 		foreach(cell, partFKs)
 		{
 			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
-			Form_pg_constraint partConstr;
-			HeapTuple	partcontup;
-			Relation	trigrel;
-			HeapTuple	trigtup;
-			SysScanDesc scan;
-			ScanKeyData key;
 
-			attach_it = true;
-
-			/*
-			 * Do some quick & easy initial checks.  If any of these fail, we
-			 * cannot use this constraint, but keep looking.
-			 */
-			if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+			if (tryAttachPartitionForeignKey(fk,
+											 parentConstrOid,
+											 numfks,
+											 mapped_conkey,
+											 confkey,
+											 conpfeqop))
 			{
-				attach_it = false;
-				continue;
+				attached = true;
+				break;
 			}
-			for (i = 0; i < numfks; i++)
-			{
-				if (fk->conkey[i] != mapped_conkey[i] ||
-					fk->confkey[i] != confkey[i] ||
-					fk->conpfeqop[i] != conpfeqop[i])
-				{
-					attach_it = false;
-					break;
-				}
-			}
-			if (!attach_it)
-				continue;
-
-			/*
-			 * Looks good so far; do some more extensive checks.  Presumably
-			 * the check for 'convalidated' could be dropped, since we don't
-			 * really care about that, but let's be careful for now.
-			 */
-			partcontup = SearchSysCache1(CONSTROID,
-										 ObjectIdGetDatum(fk->conoid));
-			if (!partcontup)
-				elog(ERROR, "cache lookup failed for constraint %u",
-					 fk->conoid);
-			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
-			if (OidIsValid(partConstr->conparentid) ||
-				!partConstr->convalidated ||
-				partConstr->condeferrable != constrForm->condeferrable ||
-				partConstr->condeferred != constrForm->condeferred ||
-				partConstr->confupdtype != constrForm->confupdtype ||
-				partConstr->confdeltype != constrForm->confdeltype ||
-				partConstr->confmatchtype != constrForm->confmatchtype)
-			{
-				ReleaseSysCache(partcontup);
-				attach_it = false;
-				continue;
-			}
-
-			ReleaseSysCache(partcontup);
-
-			/*
-			 * Looks good!  Attach this constraint.  Note that the action
-			 * triggers are no longer needed, so remove them.  We identify
-			 * them because they have our constraint OID, as well as being
-			 * on the referenced rel.
-			 */
-			trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
-			ScanKeyInit(&key,
-						Anum_pg_trigger_tgconstraint,
-						BTEqualStrategyNumber, F_OIDEQ,
-						ObjectIdGetDatum(fk->conoid));
-
-			scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
-									  NULL, 1, &key);
-			while ((trigtup = systable_getnext(scan)) != NULL)
-			{
-				Form_pg_trigger	trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
-
-				if (trgform->tgconstrrelid != fk->conrelid)
-					continue;
-				if (trgform->tgrelid != fk->confrelid)
-					continue;
-
-				deleteDependencyRecordsForClass(TriggerRelationId,
-												trgform->oid,
-												ConstraintRelationId,
-												DEPENDENCY_INTERNAL);
-				CatalogTupleDelete(trigrel, &trigtup->t_self);
-			}
-
-			systable_endscan(scan);
-			heap_close(trigrel, RowExclusiveLock);
-
-			ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
-			CommandCounterIncrement();
-			attach_it = true;
-			break;
 		}
 
 		/*
@@ -7950,7 +8335,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		 * create a new one.  In fact, there's no need to recurse for this
 		 * constraint to partitions, either.
 		 */
-		if (attach_it)
+		if (attached)
 		{
 			ReleaseSysCache(tuple);
 			continue;
@@ -7997,8 +8382,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		fkconstraint->deferrable = constrForm->condeferrable;
 		fkconstraint->initdeferred = constrForm->condeferred;
 
-		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-								 constrOid, constrForm->conindid, false);
+		/* If this is a plain relation, create the check triggers */
+		if (partRel->rd_rel->relkind == RELKIND_RELATION)
+			createForeignKeyCheckTriggers(RelationGetRelid(partRel),
+										  constrForm->confrelid,
+										  fkconstraint, constrOid,
+										  constrForm->conindid);
 
 		if (cloned)
 		{
@@ -8034,6 +8423,9 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
 		int			i;
 
+		/* make previously created constraints visible */
+		CommandCounterIncrement();
+
 		for (i = 0; i < partdesc->nparts; i++)
 		{
 			Relation	childRel;
@@ -8050,6 +8442,125 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 }
 
 /*
+ * When the parent of a partition receives a foreign key, we must propagate
+ * that foreign key to the partition.  However, the partition might already
+ * have an equivalent foreign key; this routine compares the given
+ * ForeignKeyCacheInfo (in the partition) to the FK defined by the other
+ * parameters.  If they are equivalent, create the link between the two
+ * constraints and return true.
+ *
+ * If no FK in the partition matches the rest of the params, return false.
+ * Caller must create a new constraint.
+ */
+static bool
+tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
+							 Oid parentConstrOid,
+							 int numfks,
+							 AttrNumber *mapped_conkey,
+							 AttrNumber *confkey,
+							 Oid *conpfeqop)
+{
+	HeapTuple	parentConstrTup;
+	Form_pg_constraint parentConstr;
+	HeapTuple	partcontup;
+	Form_pg_constraint partConstr;
+	Relation	trigrel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	trigtup;
+
+	parentConstrTup = SearchSysCache1(CONSTROID,
+									  ObjectIdGetDatum(parentConstrOid));
+	if (!parentConstrTup)
+		elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
+	parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
+
+	/*
+	 * Do some quick & easy initial checks.  If any of these fail, we cannot
+	 * use this constraint.
+	 */
+	if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
+	{
+		ReleaseSysCache(parentConstrTup);
+		return false;
+	}
+	for (int i = 0; i < numfks; i++)
+	{
+		if (fk->conkey[i] != mapped_conkey[i] ||
+			fk->confkey[i] != confkey[i] ||
+			fk->conpfeqop[i] != conpfeqop[i])
+		{
+			ReleaseSysCache(parentConstrTup);
+			return false;
+		}
+	}
+
+	/*
+	 * Looks good so far; do some more extensive checks.  Presumably the check
+	 * for 'convalidated' could be dropped, since we don't really care about
+	 * that, but let's be careful for now.
+	 */
+	partcontup = SearchSysCache1(CONSTROID,
+								 ObjectIdGetDatum(fk->conoid));
+	if (!partcontup)
+		elog(ERROR, "cache lookup failed for constraint %u",
+			 fk->conoid);
+	partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+	if (OidIsValid(partConstr->conparentid) ||
+		!partConstr->convalidated ||
+		partConstr->condeferrable != parentConstr->condeferrable ||
+		partConstr->condeferred != parentConstr->condeferred ||
+		partConstr->confupdtype != parentConstr->confupdtype ||
+		partConstr->confdeltype != parentConstr->confdeltype ||
+		partConstr->confmatchtype != parentConstr->confmatchtype)
+	{
+		ReleaseSysCache(parentConstrTup);
+		ReleaseSysCache(partcontup);
+		return false;
+	}
+
+	ReleaseSysCache(partcontup);
+	ReleaseSysCache(parentConstrTup);
+
+	/*
+	 * Looks good!  Attach this constraint.  Note that the action triggers are
+	 * no longer needed, so remove them.  We identify them because they have
+	 * our constraint OID, as well as being on the referenced rel.
+	 */
+	trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
+	ScanKeyInit(&key,
+				Anum_pg_trigger_tgconstraint,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(fk->conoid));
+
+	scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true,
+							  NULL, 1, &key);
+	while ((trigtup = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup);
+
+		if (trgform->tgconstrrelid != fk->conrelid)
+			continue;
+		if (trgform->tgrelid != fk->confrelid)
+			continue;
+
+		deleteDependencyRecordsForClass(TriggerRelationId,
+										trgform->oid,
+										ConstraintRelationId,
+										DEPENDENCY_INTERNAL);
+		CatalogTupleDelete(trigrel, &trigtup->t_self);
+	}
+
+	systable_endscan(scan);
+	table_close(trigrel, RowExclusiveLock);
+
+	ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
+	CommandCounterIncrement();
+	return true;
+}
+
+
+/*
  * ALTER TABLE ALTER CONSTRAINT
  *
  * Update the attributes of a constraint.
@@ -9081,37 +9592,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
 }
 
 /*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
-						 Oid constraintOid, Oid indexOid, bool create_action)
-{
-	/*
-	 * For the referenced side, create action triggers, if requested.  (If the
-	 * referencing side is partitioned, there is still only one trigger, which
-	 * runs on the referenced side and points to the top of the referencing
-	 * hierarchy.)
-	 */
-	if (create_action)
-		createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
-									   indexOid);
-
-	/*
-	 * For the referencing side, create the check triggers.  We only need
-	 * these on the partitions.
-	 */
-	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
-									  fkconstraint, constraintOid, indexOid);
-
-	CommandCounterIncrement();
-}
-
-/*
  * ALTER TABLE DROP CONSTRAINT
  *
  * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
@@ -15047,6 +15527,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 
 	partRel = table_openrv(name, ShareUpdateExclusiveLock);
 
+	/* Ensure that foreign keys still hold after this detach */
+	CheckNoForeignKeyRefs(partRel, false);
+
 	/* All inheritance related checks are performed within the function */
 	RemoveInheritance(partRel, rel);
 
@@ -15536,3 +16019,82 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
 	if (opened)
 		table_close(classRel, RowExclusiveLock);
 }
+
+/*
+ * During an operation that removes a partition from a partitioned table,
+ * verify that any foreign keys pointing to the partitioned table would not
+ * become invalid.  (Used for ALTER TABLE ... DETACH as well as DROP).
+ * An error raised if any referenced values exist.
+ */
+void
+CheckNoForeignKeyRefs(Relation partition, bool isDrop)
+{
+	Relation	pg_constraint;
+	HeapTuple	tuple;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+
+	/*
+	 * Quick exit if there are no indexes.
+	 */
+	if (RelationGetIndexList(partition) == NIL)
+		return;
+
+	/*
+	 * In the DROP case, we can skip this check when this is a partitioned
+	 * partition, because its partitions will go through this also, and we'd
+	 * run the check twice uselessly.
+	 *
+	 * In the DETACH case, this is only called for the top-level relation, so
+	 * we must run it nevertheless.
+	 */
+	if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
+	/* Search for constraints referencing this table */
+	pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+	/* XXX This is a seqscan, as we don't have a usable index */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true,
+							  NULL, 2, key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+		Trigger		trig;
+		Relation	rel;
+
+		/*
+		 * We only need to process constraints that are part of larger ones.
+		 */
+		if (!OidIsValid(constrForm->conparentid))
+			continue;
+
+		/* prevent data changes into the referencing table until commit */
+		rel = table_open(constrForm->conrelid, ShareLock);
+
+		MemSet(&trig, 0, sizeof(trig));
+		trig.tgoid = InvalidOid;
+		trig.tgname = NameStr(constrForm->conname);
+		trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+		trig.tgisinternal = true;
+		trig.tgconstrrelid = RelationGetRelid(partition);
+		trig.tgconstrindid = constrForm->conindid;
+		trig.tgconstraint = constrForm->oid;
+		trig.tgdeferrable = false;
+		trig.tginitdeferred = false;
+		/* we needn't fill in remaining fields */
+
+		RI_Final_Check(&trig, rel, partition);
+
+		table_close(rel, NoLock);
+	}
+
+	systable_endscan(scan);
+	table_close(pg_constraint, AccessShareLock);
+}
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index e1aa3d0044..5a31018680 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -56,6 +56,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -240,7 +241,7 @@ static void ri_ExtractValues(Relation rel, HeapTuple tup,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   HeapTuple violator, TupleDesc tupdesc,
-				   int queryno) pg_attribute_noreturn();
+				   int queryno, bool partgone) pg_attribute_noreturn();
 
 
 /* ----------
@@ -398,18 +399,22 @@ RI_FKey_check(TriggerData *trigdata)
 		char		paramname[16];
 		const char *querysep;
 		Oid			queryoids[RI_MAX_NUMKEYS];
+		const char *pk_only;
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * corresponding FK attributes.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (i = 0; i < riinfo->nkeys; i++)
 		{
@@ -533,19 +538,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		char		attname[MAX_QUOTED_NAME_LEN];
 		char		paramname[16];
 		const char *querysep;
+		const char *pk_only;
 		Oid			queryoids[RI_MAX_NUMKEYS];
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * PK attributes themselves.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (i = 0; i < riinfo->nkeys; i++)
 		{
@@ -1711,6 +1720,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	RangeTblEntry *fkrte;
 	const char *sep;
 	const char *fk_only;
+	const char *pk_only;
 	int			i;
 	int			save_nestlevel;
 	char		workmembuf[32];
@@ -1770,7 +1780,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	/*----------
 	 * The query string built is:
 	 *	SELECT fk.keycols FROM [ONLY] relname fk
-	 *	 LEFT OUTER JOIN ONLY pkrelname pk
+	 *	 LEFT OUTER JOIN [ONLY] pkrelname pk
 	 *	 ON (pk.pkkeycol1=fk.keycol1 [AND ...])
 	 *	 WHERE pk.pkkeycol1 IS NULL AND
 	 * For MATCH SIMPLE:
@@ -1797,9 +1807,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	quoteRelationName(fkrelname, fk_rel);
 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
 		"" : "ONLY ";
+	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
 	appendStringInfo(&querybuf,
-					 " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
-					 fk_only, fkrelname, pkrelname);
+					 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+					 fk_only, fkrelname, pk_only, pkrelname);
 
 	strcpy(pkattname, "pk.");
 	strcpy(fkattname, "fk.");
@@ -1952,7 +1964,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 		ri_ReportViolation(&fake_riinfo,
 						   pk_rel, fk_rel,
 						   tuple, tupdesc,
-						   RI_PLAN_CHECK_LOOKUPPK);
+						   RI_PLAN_CHECK_LOOKUPPK, false);
 	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
@@ -1966,6 +1978,179 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	return true;
 }
 
+/* ----------
+ * RI_Final_Check -
+ *
+ */
+void
+RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+	const RI_ConstraintInfo *riinfo;
+	StringInfoData querybuf;
+	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
+	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
+	const char *sep;
+	const char *fk_only;
+	int			spi_result;
+	SPIPlanPtr	qplan;
+	int			i;
+
+	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+	/* XXX handle the non-permission case?? */
+
+	/*----------
+	 * The query string built is:
+	 *  SELECT fk.keycols FROM [ONLY] relname fk
+	 *    JOIN pkrelname pk
+	 *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+	 *    WHERE (<partition constraint>) AND
+	 * For MATCH SIMPLE:
+	 *   (fk.keycol1 IS NOT NULL [AND ...])
+	 * For MATCH FULL:
+	 *   (fk.keycol1 IS NOT NULL [OR ...])
+	 *
+	 * We attach COLLATE clauses to the operators when comparing columns
+	 * that have different collations.
+	 *----------
+	 */
+	initStringInfo(&querybuf);
+	appendStringInfoString(&querybuf, "SELECT ");
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+		sep = ", ";
+	}
+
+	quoteRelationName(pkrelname, pk_rel);
+	quoteRelationName(fkrelname, fk_rel);
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	appendStringInfo(&querybuf,
+					 " FROM %s%s fk JOIN %s pk ON",
+					 fk_only, fkrelname, pkrelname);
+	strcpy(pkattname, "pk.");
+	strcpy(fkattname, "fk.");
+	sep = "(";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+		quoteOneName(pkattname + 3,
+					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
+		quoteOneName(fkattname + 3,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		ri_GenerateQual(&querybuf, sep,
+						pkattname, pk_type,
+						riinfo->pf_eq_oprs[i],
+						fkattname, fk_type);
+		if (pk_coll != fk_coll)
+			ri_GenerateQualCollation(&querybuf, pk_coll);
+		sep = "AND";
+	}
+
+	appendStringInfo(&querybuf, ") WHERE %s AND (",
+					 pg_get_partconstrdef_string(RelationGetRelid(pk_rel),
+												 "pk"));
+
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf,
+						 "%sfk.%s IS NOT NULL",
+						 sep, fkattname);
+		switch (riinfo->confmatchtype)
+		{
+			case FKCONSTR_MATCH_SIMPLE:
+				sep = " AND ";
+				break;
+			case FKCONSTR_MATCH_FULL:
+				sep = " OR ";
+				break;
+			case FKCONSTR_MATCH_PARTIAL:
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("MATCH PARTIAL not yet implemented")));
+				break;
+			default:
+				elog(ERROR, "unrecognized confmatchtype: %d",
+					 riinfo->confmatchtype);
+				break;
+		}
+	}
+	appendStringInfoChar(&querybuf, ')');
+
+	/*
+	 * RI_Initial_Check changes work_mem here.
+	 */
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	/*
+	 * Generate the plan.  We don't need to cache it, and there are no
+	 * arguments to the plan.
+	 */
+	qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+	if (qplan == NULL)
+		elog(ERROR, "SPI_prepare returned %s for %s",
+			 SPI_result_code_string(SPI_result), querybuf.data);
+
+	/*
+	 * Run the plan.  For safety we force a current snapshot to be used. (In
+	 * transaction-snapshot mode, this arguably violates transaction isolation
+	 * rules, but we really haven't got much choice.) We don't need to
+	 * register the snapshot, because SPI_execute_snapshot will see to it. We
+	 * need at most one tuple returned, so pass limit = 1.
+	 */
+	spi_result = SPI_execute_snapshot(qplan,
+									  NULL, NULL,
+									  GetLatestSnapshot(),
+									  InvalidSnapshot,
+									  true, false, 1);
+
+	/* Check result */
+	if (spi_result != SPI_OK_SELECT)
+		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+	/* Did we find a tuple that would violate the constraint? */
+	if (SPI_processed > 0)
+	{
+		HeapTuple	tuple = SPI_tuptable->vals[0];
+		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
+		RI_ConstraintInfo fake_riinfo;
+
+		/*
+		 * The columns to look at in the result tuple are 1..N, not whatever
+		 * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
+		 * will behave properly.
+		 *
+		 * In addition to this, we have to pass the correct tupdesc to
+		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
+		 * or fk_rel's tupdesc.
+		 */
+		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+		for (i = 0; i < fake_riinfo.nkeys; i++)
+			fake_riinfo.pk_attnums[i] = i + 1;
+
+		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+						   tuple, tupdesc, 0, true);
+	}
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+}
+
 
 /* ----------
  * Local functions below
@@ -2167,6 +2352,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 	/* Find or create a hashtable entry for the constraint */
 	riinfo = ri_LoadConstraintInfo(constraintOid);
 
+#if 0
 	/* Do some easy cross-checks against the trigger call data */
 	if (rel_is_pk)
 	{
@@ -2175,6 +2361,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
 				 trigger->tgname, RelationGetRelationName(trig_rel));
 	}
+#endif
 
 	return riinfo;
 }
@@ -2480,7 +2667,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						   pk_rel, fk_rel,
 						   new_tuple ? new_tuple : old_tuple,
 						   NULL,
-						   qkey->constr_queryno);
+						   qkey->constr_queryno, false);
 
 	return SPI_processed != 0;
 }
@@ -2524,7 +2711,7 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   HeapTuple violator, TupleDesc tupdesc,
-				   int queryno)
+				   int queryno, bool partgone)
 {
 	StringInfoData key_names;
 	StringInfoData key_values;
@@ -2564,9 +2751,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 	 *
 	 * Check table-level permissions next and, failing that, column-level
 	 * privileges.
+	 *
+	 * When a partition at the referenced side is being detached/dropped, we
+	 * needn't check, since the user must be the table owner anyway.
 	 */
-
-	if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+	if (partgone)
+		has_perm = true;
+	else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
 	{
 		aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
 		if (aclresult != ACLCHECK_OK)
@@ -2616,7 +2807,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 		}
 	}
 
-	if (onfk)
+	if (partgone)
+		ereport(ERROR,
+				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+				 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+						RelationGetRelationName(pk_rel),
+						NameStr(riinfo->conname)),
+				 errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+						   key_names.data, key_values.data,
+						   RelationGetRelationName(fk_rel))));
+	else if (onfk)
 		ereport(ERROR,
 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 302df16b4a..0e95aa949f 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1819,6 +1819,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
 }
 
 /*
+ * pg_get_partconstrdef_string
+ *
+ * Returns the partition constraint as a C-string for the input relation, with
+ * the given alias.  No pretty-printing.
+ */
+char *
+pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
+{
+	Expr	   *constr_expr;
+	List	   *context;
+
+	constr_expr = get_partition_qual_relid(partitionId);
+	context = deparse_context_for(aliasname, partitionId);
+
+	return deparse_expression((Node *) constr_expr, context, true, false);
+}
+
+/*
  * pg_get_constraintdef
  *
  * Returns the definition for the constraint, ie, everything that needs to
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 625b7e5c43..f110e3e7c0 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -77,6 +77,8 @@ extern void heap_create_init_fork(Relation rel);
 
 extern void heap_drop_with_catalog(Oid relid);
 
+extern void pre_drop_class_check(Oid relationId, Oid objectSubId);
+
 extern void heap_truncate(List *relids);
 
 extern void heap_truncate_one_rel(Relation rel);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index ec3bb90b01..6d67c5e9c3 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -52,6 +52,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
 
 extern void CheckTableNotInUse(Relation rel, const char *stmt);
 
+extern void CheckNoForeignKeyRefs(Relation partition, bool isDrop);
+
 extern void ExecuteTruncate(TruncateStmt *stmt);
 extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 					DropBehavior behavior, bool restart_seqs);
@@ -76,10 +78,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
 
 extern void check_of_type(HeapTuple typetuple);
 
-extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
-						 Constraint *fkconstraint, Oid constraintOid,
-						 Oid indexOid, bool create_action);
-
 extern void register_on_commit_action(Oid relid, OnCommitAction action);
 extern void remove_on_commit_action(Oid relid);
 
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index 9f212ac24b..3c9a45a072 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 							  HeapTuple old_row, HeapTuple new_row);
 extern bool RI_Initial_Check(Trigger *trigger,
 				 Relation fk_rel, Relation pk_rel);
+extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel);
 
 /* result values for RI_FKey_trigger_type: */
 #define RI_TRIGGER_PK	1		/* is a trigger on the PK relation */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 3ebc01e714..7c49e9d0a8 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
 extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
 
 extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
 
 extern char *pg_get_constraintdef_command(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 76040a7601..412e30a5f5 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1478,19 +1478,6 @@ drop table pktable2, fktable2;
 --
 -- Foreign keys and partitioned tables
 --
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1526,10 +1513,10 @@ DETAIL:  This feature is not yet supported on partitioned tables.
 -- these inserts, targeting both the partition directly as well as the
 -- partitioned table, should all fail
 INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
-ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey"
 DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
-ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey"
 DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
 ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
@@ -1620,12 +1607,12 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x);
 CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT;
 INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;  -- fails
-ERROR:  insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_1_x_fkey"
 DETAIL:  MATCH FULL does not allow mixing of null and nonnull key values.
 TRUNCATE fk_partitioned_fk_full;
 ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;
 INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);  -- fails
-ERROR:  insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_1_x_fkey"
 DETAIL:  MATCH FULL does not allow mixing of null and nonnull key values.
 DROP TABLE fk_partitioned_fk_full;
 -- ON UPDATE SET NULL
@@ -1670,7 +1657,7 @@ INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
 -- this fails, because the defaults for the referencing table are not present
 -- in the referenced table:
 UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
-ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_3_a_fkey"
 DETAIL:  Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk".
 -- but inserting the row we can make it work:
 INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
@@ -1930,7 +1917,7 @@ create schema fkpart1
   create table fk_part_1_1 partition of fk_part_1 for values in (1);
 alter table fkpart1.fk_part add foreign key (a) references fkpart1.pkey;
 insert into fkpart1.fk_part values (1);		-- should fail
-ERROR:  insert or update on table "fk_part_1_1" violates foreign key constraint "fk_part_a_fkey"
+ERROR:  insert or update on table "fk_part_1_1" violates foreign key constraint "fk_part_1_1_a_fkey"
 DETAIL:  Key (a)=(1) is not present in table "pkey".
 insert into fkpart1.pkey values (1);
 insert into fkpart1.fk_part values (1);
@@ -1940,12 +1927,161 @@ DETAIL:  Key (a)=(1) is still referenced from table "fk_part".
 alter table fkpart1.fk_part detach partition fkpart1.fk_part_1;
 create table fkpart1.fk_part_1_2 partition of fkpart1.fk_part_1 for values in (2);
 insert into fkpart1.fk_part_1 values (2);	-- should fail
-ERROR:  insert or update on table "fk_part_1_2" violates foreign key constraint "fk_part_a_fkey"
+ERROR:  insert or update on table "fk_part_1_2" violates foreign key constraint "fk_part_1_a_fkey"
 DETAIL:  Key (a)=(2) is not present in table "pkey".
 delete from fkpart1.pkey where a = 1;
-ERROR:  update or delete on table "pkey" violates foreign key constraint "fk_part_a_fkey" on table "fk_part_1"
+ERROR:  update or delete on table "pkey" violates foreign key constraint "fk_part_1_a_fkey" on table "fk_part_1"
 DETAIL:  Key (a)=(1) is still referenced from table "fk_part_1".
 \set VERBOSITY terse	\\ -- suppress cascade details
 drop schema fkpart0, fkpart1 cascade;
 NOTICE:  drop cascades to 5 other objects
 \set VERBOSITY default
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+create table fk3 partition of fk for values from (3500) to (5000);
+-- these should fail: referenced value not present
+insert into fk values (1);
+ERROR:  insert or update on table "fk1" violates foreign key constraint "fk1_a_fkey"
+DETAIL:  Key (a)=(1) is not present in table "pk".
+insert into fk values (1000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1000) is not present in table "pk".
+insert into fk values (2000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(2000) is not present in table "pk".
+insert into fk values (3000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(3000) is not present in table "pk".
+insert into fk values (4000);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4000) is not present in table "pk".
+insert into fk values (4500);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4500) is not present in table "pk".
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+-- should fail: referencing value present
+delete from pk where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+delete from pk where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+delete from pk where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+delete from pk where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+delete from pk where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+delete from pk where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+update pk set a = 2 where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+update pk set a = 1002 where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+update pk set a = 2002 where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+update pk set a = 3002 where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+update pk set a = 4002 where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+update pk set a = 4502 where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+alter table droppk detach partition droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+alter table droppk detach partition droppk2;
+ERROR:  removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+drop table droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+drop table droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+drop table droppk2;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 9ed1166c66..c0e00c80fb 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1111,18 +1111,6 @@ drop table pktable2, fktable2;
 -- Foreign keys and partitioned tables
 --
 
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1395,3 +1383,105 @@ delete from fkpart1.pkey where a = 1;
 \set VERBOSITY terse	\\ -- suppress cascade details
 drop schema fkpart0, fkpart1 cascade;
 \set VERBOSITY default
+
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+
+create table fk3 partition of fk for values from (3500) to (5000);
+
+-- these should fail: referenced value not present
+insert into fk values (1);
+insert into fk values (1000);
+insert into fk values (2000);
+insert into fk values (3000);
+insert into fk values (4000);
+insert into fk values (4500);
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+
+-- should fail: referencing value present
+delete from pk where a = 1;
+delete from pk where a = 1000;
+delete from pk where a = 2000;
+delete from pk where a = 3000;
+delete from pk where a = 4000;
+delete from pk where a = 4500;
+update pk set a = 2 where a = 1;
+update pk set a = 1002 where a = 1000;
+update pk set a = 2002 where a = 2000;
+update pk set a = 3002 where a = 3000;
+update pk set a = 4002 where a = 4000;
+update pk set a = 4502 where a = 4500;
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+alter table droppk2 detach partition droppk2_d;
+alter table droppk detach partition droppk1;
+alter table droppk detach partition droppk2;
+alter table droppk2 detach partition droppk21;
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+drop table droppk2;
+drop table droppk21;
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
-- 
2.11.0

Reply via email to