Here's a more credible version of this patch series.

0001 refactors some duplicative code that interprets a pg_constraint row
for a foreign key back into a Constraint node.  Only what is necessary
for current features is read.

0002 moves some code that I added in 3de241dba86f to
src/backend/catalog/pg_constraint.c so that it appears in tablecmds.c
instead.  After developing the current patch I realized that the latter
is its natural home.

0003 changes the shape of a loop in deleteObjectsInList, so that I can
add object type-specific functions to be called before deleting an
object.  This is needed by 0004 and makes no sense on its own; I would
probably push both together, but splitting it like this makes it very
obvious what it is I'm doing.

0004 adds the feature itself

In 0004 there's also a new function "index_get_partition" which allows
to simplify some code I added in 8b08f7d4820f.  I will probably split it
up and commit separately because it's an obvious code beautify.

0005 modifies psql to show the constraint in a different way, which
makes more sense overall (rather than show the "internal" constraint
that concerns the partition, show the top-level contraint that concerns
the ancestor table on which it is defined.  This includes naming the
table in which the constraint is defined).

There's one half of 0005 that I think should be applied to pg11, because
when partitions have foreign keys, the display looks a bit odd
currently.  The other half I would probably merge into 0004 instead of
committing separately.  Even if not backpatched, it makes sense to apply
ahead of the rest because it changes expected output for a regression
test.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 563447095c3884a00ecfd933027524e252cf35f7 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 12 Nov 2018 16:05:16 -0300
Subject: [PATCH 1/5] Refactor DeconstructConstraintRow

---
 src/backend/catalog/pg_constraint.c | 200 ++++++++++++++++++++----------------
 src/backend/utils/adt/ri_triggers.c |  89 ++--------------
 src/backend/utils/cache/relcache.c  |  61 +----------
 src/include/catalog/pg_constraint.h |   3 +
 4 files changed, 125 insertions(+), 228 deletions(-)

diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index a8194b02fa..f71d89ff17 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -446,14 +446,11 @@ static void
 clone_fk_constraints(Relation pg_constraint, Relation parentRel,
 					 Relation partRel, List *clone, List **cloned)
 {
-	TupleDesc	tupdesc;
 	AttrNumber *attmap;
 	List	   *partFKs;
 	List	   *subclone = NIL;
 	ListCell   *cell;
 
-	tupdesc = RelationGetDescr(pg_constraint);
-
 	/*
 	 * The constraint key may differ, if the columns in the partition are
 	 * different.  This map is used to convert them.
@@ -483,9 +480,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel,
 		int			nelem;
 		ListCell   *cell;
 		int			i;
-		ArrayType  *arr;
-		Datum		datum;
-		bool		isnull;
 
 		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
 		if (!tuple)
@@ -502,93 +496,11 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel,
 
 		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
 
-		datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null conkey");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != INT2OID)
-			elog(ERROR, "conkey is not a 1-D smallint array");
-		memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
-
+		DeconstructConstraintRow(tuple, &nelem, conkey, confkey,
+								 conpfeqop, conppeqop, conffeqop);
 		for (i = 0; i < nelem; i++)
 			mapped_conkey[i] = attmap[conkey[i] - 1];
 
-		datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null confkey");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != INT2OID)
-			elog(ERROR, "confkey is not a 1-D smallint array");
-		memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
-
-		datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null conpfeqop");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != OIDOID)
-			elog(ERROR, "conpfeqop is not a 1-D OID array");
-		memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
-
-		datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null conpfeqop");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != OIDOID)
-			elog(ERROR, "conpfeqop is not a 1-D OID array");
-		memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
-
-		datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null conppeqop");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != OIDOID)
-			elog(ERROR, "conppeqop is not a 1-D OID array");
-		memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
-
-		datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
-							tupdesc, &isnull);
-		if (isnull)
-			elog(ERROR, "null conffeqop");
-		arr = DatumGetArrayTypeP(datum);
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != OIDOID)
-			elog(ERROR, "conffeqop is not a 1-D OID array");
-		memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
-
 		/*
 		 * Before creating a new constraint, see whether any existing FKs are
 		 * fit for the purpose.  If one is, attach the parent constraint to it,
@@ -1531,6 +1443,114 @@ get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
 }
 
 /*
+ * Extract data from the pg_constraint row of a foreign-key constraint.
+ *
+ * The last three can be passed as NULL.
+ */
+void
+DeconstructConstraintRow(HeapTuple tuple, int *numfks,
+						 AttrNumber *conkey, AttrNumber *confkey,
+						 Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
+{
+	Oid			constrId;
+	Datum		adatum;
+	bool		isNull;
+	ArrayType  *arr;
+	int			numkeys;
+
+	constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+	/*
+	 * We expect the arrays to be 1-D arrays of the right types; verify that.
+	 * We don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of values.
+	 */
+	adatum = SysCacheGetAttr(CONSTROID, tuple,
+							 Anum_pg_constraint_conkey, &isNull);
+	if (isNull)
+		elog(ERROR, "null conkey for constraint %u", constrId);
+	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+	if (ARR_NDIM(arr) != 1 ||
+		ARR_HASNULL(arr) ||
+		ARR_ELEMTYPE(arr) != INT2OID)
+		elog(ERROR, "conkey is not a 1-D smallint array");
+	numkeys = ARR_DIMS(arr)[0];
+	if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
+		elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
+	memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
+	if ((Pointer) arr != DatumGetPointer(adatum))
+		pfree(arr);				/* free de-toasted copy, if any */
+
+	adatum = SysCacheGetAttr(CONSTROID, tuple,
+							 Anum_pg_constraint_confkey, &isNull);
+	if (isNull)
+		elog(ERROR, "null confkey for constraint %u", constrId);
+	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+	if (ARR_NDIM(arr) != 1 ||
+		ARR_DIMS(arr)[0] != numkeys ||
+		ARR_HASNULL(arr) ||
+		ARR_ELEMTYPE(arr) != INT2OID)
+		elog(ERROR, "confkey is not a 1-D smallint array");
+	memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
+	if ((Pointer) arr != DatumGetPointer(adatum))
+		pfree(arr);				/* free de-toasted copy, if any */
+
+	if (pf_eq_oprs)
+	{
+		adatum = SysCacheGetAttr(CONSTROID, tuple,
+								 Anum_pg_constraint_conpfeqop, &isNull);
+		if (isNull)
+			elog(ERROR, "null conpfeqop for constraint %u", constrId);
+		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+		/* see TryReuseForeignKey if you change the test below */
+		if (ARR_NDIM(arr) != 1 ||
+			ARR_DIMS(arr)[0] != numkeys ||
+			ARR_HASNULL(arr) ||
+			ARR_ELEMTYPE(arr) != OIDOID)
+			elog(ERROR, "conpfeqop is not a 1-D Oid array");
+		memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+		if ((Pointer) arr != DatumGetPointer(adatum))
+			pfree(arr);			/* free de-toasted copy, if any */
+	}
+
+	if (pp_eq_oprs)
+	{
+		adatum = SysCacheGetAttr(CONSTROID, tuple,
+								 Anum_pg_constraint_conppeqop, &isNull);
+		if (isNull)
+			elog(ERROR, "null conppeqop for constraint %u", constrId);
+		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+		if (ARR_NDIM(arr) != 1 ||
+			ARR_DIMS(arr)[0] != numkeys ||
+			ARR_HASNULL(arr) ||
+			ARR_ELEMTYPE(arr) != OIDOID)
+			elog(ERROR, "conppeqop is not a 1-D Oid array");
+		memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+		if ((Pointer) arr != DatumGetPointer(adatum))
+			pfree(arr);			/* free de-toasted copy, if any */
+	}
+
+	if (ff_eq_oprs)
+	{
+		adatum = SysCacheGetAttr(CONSTROID, tuple,
+								 Anum_pg_constraint_conffeqop, &isNull);
+		if (isNull)
+			elog(ERROR, "null conffeqop for constraint %u", constrId);
+		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
+		if (ARR_NDIM(arr) != 1 ||
+			ARR_DIMS(arr)[0] != numkeys ||
+			ARR_HASNULL(arr) ||
+			ARR_ELEMTYPE(arr) != OIDOID)
+			elog(ERROR, "conffeqop is not a 1-D Oid array");
+		memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+		if ((Pointer) arr != DatumGetPointer(adatum))
+			pfree(arr);			/* free de-toasted copy, if any */
+	}
+
+	*numfks = numkeys;
+}
+
+/*
  * Determine whether a relation can be proven functionally dependent on
  * a set of grouping columns.  If so, return true and add the pg_constraint
  * OIDs of the constraints needed for the proof to the *constraintDeps list.
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index cdda860e73..b9aef07c1a 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -2188,10 +2188,6 @@ ri_LoadConstraintInfo(Oid constraintOid)
 	bool		found;
 	HeapTuple	tup;
 	Form_pg_constraint conForm;
-	Datum		adatum;
-	bool		isNull;
-	ArrayType  *arr;
-	int			numkeys;
 
 	/*
 	 * On the first call initialize the hashtable
@@ -2233,84 +2229,13 @@ ri_LoadConstraintInfo(Oid constraintOid)
 	riinfo->confdeltype = conForm->confdeltype;
 	riinfo->confmatchtype = conForm->confmatchtype;
 
-	/*
-	 * We expect the arrays to be 1-D arrays of the right types; verify that.
-	 * We don't need to use deconstruct_array() since the array data is just
-	 * going to look like a C array of values.
-	 */
-	adatum = SysCacheGetAttr(CONSTROID, tup,
-							 Anum_pg_constraint_conkey, &isNull);
-	if (isNull)
-		elog(ERROR, "null conkey for constraint %u", constraintOid);
-	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-	if (ARR_NDIM(arr) != 1 ||
-		ARR_HASNULL(arr) ||
-		ARR_ELEMTYPE(arr) != INT2OID)
-		elog(ERROR, "conkey is not a 1-D smallint array");
-	numkeys = ARR_DIMS(arr)[0];
-	if (numkeys <= 0 || numkeys > RI_MAX_NUMKEYS)
-		elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
-	riinfo->nkeys = numkeys;
-	memcpy(riinfo->fk_attnums, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
-	if ((Pointer) arr != DatumGetPointer(adatum))
-		pfree(arr);				/* free de-toasted copy, if any */
-
-	adatum = SysCacheGetAttr(CONSTROID, tup,
-							 Anum_pg_constraint_confkey, &isNull);
-	if (isNull)
-		elog(ERROR, "null confkey for constraint %u", constraintOid);
-	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-	if (ARR_NDIM(arr) != 1 ||
-		ARR_DIMS(arr)[0] != numkeys ||
-		ARR_HASNULL(arr) ||
-		ARR_ELEMTYPE(arr) != INT2OID)
-		elog(ERROR, "confkey is not a 1-D smallint array");
-	memcpy(riinfo->pk_attnums, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
-	if ((Pointer) arr != DatumGetPointer(adatum))
-		pfree(arr);				/* free de-toasted copy, if any */
-
-	adatum = SysCacheGetAttr(CONSTROID, tup,
-							 Anum_pg_constraint_conpfeqop, &isNull);
-	if (isNull)
-		elog(ERROR, "null conpfeqop for constraint %u", constraintOid);
-	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-	/* see TryReuseForeignKey if you change the test below */
-	if (ARR_NDIM(arr) != 1 ||
-		ARR_DIMS(arr)[0] != numkeys ||
-		ARR_HASNULL(arr) ||
-		ARR_ELEMTYPE(arr) != OIDOID)
-		elog(ERROR, "conpfeqop is not a 1-D Oid array");
-	memcpy(riinfo->pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
-	if ((Pointer) arr != DatumGetPointer(adatum))
-		pfree(arr);				/* free de-toasted copy, if any */
-
-	adatum = SysCacheGetAttr(CONSTROID, tup,
-							 Anum_pg_constraint_conppeqop, &isNull);
-	if (isNull)
-		elog(ERROR, "null conppeqop for constraint %u", constraintOid);
-	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-	if (ARR_NDIM(arr) != 1 ||
-		ARR_DIMS(arr)[0] != numkeys ||
-		ARR_HASNULL(arr) ||
-		ARR_ELEMTYPE(arr) != OIDOID)
-		elog(ERROR, "conppeqop is not a 1-D Oid array");
-	memcpy(riinfo->pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
-	if ((Pointer) arr != DatumGetPointer(adatum))
-		pfree(arr);				/* free de-toasted copy, if any */
-
-	adatum = SysCacheGetAttr(CONSTROID, tup,
-							 Anum_pg_constraint_conffeqop, &isNull);
-	if (isNull)
-		elog(ERROR, "null conffeqop for constraint %u", constraintOid);
-	arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-	if (ARR_NDIM(arr) != 1 ||
-		ARR_DIMS(arr)[0] != numkeys ||
-		ARR_HASNULL(arr) ||
-		ARR_ELEMTYPE(arr) != OIDOID)
-		elog(ERROR, "conffeqop is not a 1-D Oid array");
-	memcpy(riinfo->ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
-	if ((Pointer) arr != DatumGetPointer(adatum))
-		pfree(arr);				/* free de-toasted copy, if any */
+	DeconstructConstraintRow(tup,
+							 &riinfo->nkeys,
+							 riinfo->fk_attnums,
+							 riinfo->pk_attnums,
+							 riinfo->pf_eq_oprs,
+							 riinfo->pp_eq_oprs,
+							 riinfo->ff_eq_oprs);
 
 	ReleaseSysCache(tup);
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index c3071db1cd..1dda178c5a 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -4124,10 +4124,6 @@ RelationGetFKeyList(Relation relation)
 	{
 		Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
 		ForeignKeyCacheInfo *info;
-		Datum		adatum;
-		bool		isnull;
-		ArrayType  *arr;
-		int			nelem;
 
 		/* consider only foreign keys */
 		if (constraint->contype != CONSTRAINT_FOREIGN)
@@ -4138,58 +4134,11 @@ RelationGetFKeyList(Relation relation)
 		info->conrelid = constraint->conrelid;
 		info->confrelid = constraint->confrelid;
 
-		/* Extract data from conkey field */
-		adatum = fastgetattr(htup, Anum_pg_constraint_conkey,
-							 conrel->rd_att, &isnull);
-		if (isnull)
-			elog(ERROR, "null conkey for rel %s",
-				 RelationGetRelationName(relation));
-
-		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem < 1 ||
-			nelem > INDEX_MAX_KEYS ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != INT2OID)
-			elog(ERROR, "conkey is not a 1-D smallint array");
-
-		info->nkeys = nelem;
-		memcpy(info->conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
-
-		/* Likewise for confkey */
-		adatum = fastgetattr(htup, Anum_pg_constraint_confkey,
-							 conrel->rd_att, &isnull);
-		if (isnull)
-			elog(ERROR, "null confkey for rel %s",
-				 RelationGetRelationName(relation));
-
-		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem != info->nkeys ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != INT2OID)
-			elog(ERROR, "confkey is not a 1-D smallint array");
-
-		memcpy(info->confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
-
-		/* Likewise for conpfeqop */
-		adatum = fastgetattr(htup, Anum_pg_constraint_conpfeqop,
-							 conrel->rd_att, &isnull);
-		if (isnull)
-			elog(ERROR, "null conpfeqop for rel %s",
-				 RelationGetRelationName(relation));
-
-		arr = DatumGetArrayTypeP(adatum);	/* ensure not toasted */
-		nelem = ARR_DIMS(arr)[0];
-		if (ARR_NDIM(arr) != 1 ||
-			nelem != info->nkeys ||
-			ARR_HASNULL(arr) ||
-			ARR_ELEMTYPE(arr) != OIDOID)
-			elog(ERROR, "conpfeqop is not a 1-D OID array");
-
-		memcpy(info->conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+		DeconstructConstraintRow(htup, &info->nkeys,
+								 info->conkey,
+								 info->confkey,
+								 info->conpfeqop,
+								 NULL, NULL);
 
 		/* Add FK's node to the result list */
 		result = lappend(result, info);
diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h
index 0e4007389b..8405fe2eea 100644
--- a/src/include/catalog/pg_constraint.h
+++ b/src/include/catalog/pg_constraint.h
@@ -251,6 +251,9 @@ extern Oid	get_relation_idx_constraint_oid(Oid relationId, Oid indexId);
 
 extern Bitmapset *get_primary_key_attnos(Oid relid, bool deferrableOk,
 					   Oid *constraintOid);
+extern void DeconstructConstraintRow(HeapTuple tuple, int *numfks,
+						 AttrNumber *conkey, AttrNumber *confkey,
+						 Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs);
 
 extern bool check_functional_grouping(Oid relid,
 						  Index varno, Index varlevelsup,
-- 
2.11.0

>From eb68d572c815c34500f6ed720860f24386afe9cd Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 12 Nov 2018 16:18:45 -0300
Subject: [PATCH 2/5] move CloneForeignKeyConstraints to tablecmds.c

---
 src/backend/catalog/pg_constraint.c | 304 -----------------------------------
 src/backend/commands/tablecmds.c    | 306 ++++++++++++++++++++++++++++++++++++
 src/include/catalog/pg_constraint.h |   3 -
 3 files changed, 306 insertions(+), 307 deletions(-)

diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index f71d89ff17..2833e45cff 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -39,10 +39,6 @@
 #include "utils/tqual.h"
 
 
-static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
-					 Relation partRel, List *clone, List **cloned);
-
-
 /*
  * CreateConstraintEntry
  *	Create a constraint table entry.
@@ -378,306 +374,6 @@ CreateConstraintEntry(const char *constraintName,
 }
 
 /*
- * CloneForeignKeyConstraints
- *		Clone foreign keys from a partitioned table to a newly acquired
- *		partition.
- *
- * relationId is a partition of parentId, so we can be certain that it has the
- * same columns with the same datatypes.  The columns may be in different
- * order, though.
- *
- * The *cloned list is appended ClonedConstraint elements describing what was
- * created.
- */
-void
-CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
-{
-	Relation	pg_constraint;
-	Relation	parentRel;
-	Relation	rel;
-	ScanKeyData key;
-	SysScanDesc scan;
-	HeapTuple	tuple;
-	List	   *clone = NIL;
-
-	parentRel = heap_open(parentId, NoLock);	/* already got lock */
-	/* see ATAddForeignKeyConstraint about lock level */
-	rel = heap_open(relationId, AccessExclusiveLock);
-	pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
-
-	/* Obtain the list of constraints to clone or attach */
-	ScanKeyInit(&key,
-				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-				F_OIDEQ, ObjectIdGetDatum(parentId));
-	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-							  NULL, 1, &key);
-	while ((tuple = systable_getnext(scan)) != NULL)
-	{
-		Oid		oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
-
-		clone = lappend_oid(clone, oid);
-	}
-	systable_endscan(scan);
-
-	/* Do the actual work, recursing to partitions as needed */
-	clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
-
-	/* We're done.  Clean up */
-	heap_close(parentRel, NoLock);
-	heap_close(rel, NoLock);	/* keep lock till commit */
-	heap_close(pg_constraint, RowShareLock);
-}
-
-/*
- * clone_fk_constraints
- *		Recursive subroutine for CloneForeignKeyConstraints
- *
- * Clone the given list of FK constraints when a partition is attached.
- *
- * When cloning foreign keys to a partition, it may happen that equivalent
- * constraints already exist in the partition for some of them.  We can skip
- * creating a clone in that case, and instead just attach the existing
- * constraint to the one in the parent.
- *
- * This function recurses to partitions, if the new partition is partitioned;
- * of course, only do this for FKs that were actually cloned.
- */
-static void
-clone_fk_constraints(Relation pg_constraint, Relation parentRel,
-					 Relation partRel, List *clone, List **cloned)
-{
-	AttrNumber *attmap;
-	List	   *partFKs;
-	List	   *subclone = NIL;
-	ListCell   *cell;
-
-	/*
-	 * The constraint key may differ, if the columns in the partition are
-	 * different.  This map is used to convert them.
-	 */
-	attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
-										RelationGetDescr(parentRel),
-										gettext_noop("could not convert row type"));
-
-	partFKs = copyObject(RelationGetFKeyList(partRel));
-
-	foreach(cell, clone)
-	{
-		Oid			parentConstrOid = lfirst_oid(cell);
-		Form_pg_constraint constrForm;
-		HeapTuple	tuple;
-		AttrNumber	conkey[INDEX_MAX_KEYS];
-		AttrNumber	mapped_conkey[INDEX_MAX_KEYS];
-		AttrNumber	confkey[INDEX_MAX_KEYS];
-		Oid			conpfeqop[INDEX_MAX_KEYS];
-		Oid			conppeqop[INDEX_MAX_KEYS];
-		Oid			conffeqop[INDEX_MAX_KEYS];
-		Constraint *fkconstraint;
-		bool		attach_it;
-		Oid			constrOid;
-		ObjectAddress parentAddr,
-					childAddr;
-		int			nelem;
-		ListCell   *cell;
-		int			i;
-
-		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
-		if (!tuple)
-			elog(ERROR, "cache lookup failed for constraint %u",
-				 parentConstrOid);
-		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
-
-		/* only foreign keys */
-		if (constrForm->contype != CONSTRAINT_FOREIGN)
-		{
-			ReleaseSysCache(tuple);
-			continue;
-		}
-
-		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
-
-		DeconstructConstraintRow(tuple, &nelem, conkey, confkey,
-								 conpfeqop, conppeqop, conffeqop);
-		for (i = 0; i < nelem; i++)
-			mapped_conkey[i] = attmap[conkey[i] - 1];
-
-		/*
-		 * Before creating a new constraint, see whether any existing FKs are
-		 * fit for the purpose.  If one is, attach the parent constraint to it,
-		 * and don't clone anything.  This way we avoid the expensive
-		 * verification step and don't end up with a duplicate FK.  This also
-		 * means we don't consider this constraint when recursing to
-		 * partitions.
-		 */
-		attach_it = false;
-		foreach(cell, partFKs)
-		{
-			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
-			Form_pg_constraint partConstr;
-			HeapTuple	partcontup;
-
-			attach_it = true;
-
-			/*
-			 * Do some quick & easy initial checks.  If any of these fail, we
-			 * cannot use this constraint, but keep looking.
-			 */
-			if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
-			{
-				attach_it = false;
-				continue;
-			}
-			for (i = 0; i < nelem; i++)
-			{
-				if (fk->conkey[i] != mapped_conkey[i] ||
-					fk->confkey[i] != confkey[i] ||
-					fk->conpfeqop[i] != conpfeqop[i])
-				{
-					attach_it = false;
-					break;
-				}
-			}
-			if (!attach_it)
-				continue;
-
-			/*
-			 * Looks good so far; do some more extensive checks.  Presumably
-			 * the check for 'convalidated' could be dropped, since we don't
-			 * really care about that, but let's be careful for now.
-			 */
-			partcontup = SearchSysCache1(CONSTROID,
-										 ObjectIdGetDatum(fk->conoid));
-			if (!partcontup)
-				elog(ERROR, "cache lookup failed for constraint %u",
-					 fk->conoid);
-			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
-			if (OidIsValid(partConstr->conparentid) ||
-				!partConstr->convalidated ||
-				partConstr->condeferrable != constrForm->condeferrable ||
-				partConstr->condeferred != constrForm->condeferred ||
-				partConstr->confupdtype != constrForm->confupdtype ||
-				partConstr->confdeltype != constrForm->confdeltype ||
-				partConstr->confmatchtype != constrForm->confmatchtype)
-			{
-				ReleaseSysCache(partcontup);
-				attach_it = false;
-				continue;
-			}
-
-			ReleaseSysCache(partcontup);
-
-			/* looks good!  Attach this constraint */
-			ConstraintSetParentConstraint(fk->conoid, constrForm->oid);
-			CommandCounterIncrement();
-			attach_it = true;
-			break;
-		}
-
-		/*
-		 * If we attached to an existing constraint, there is no need to
-		 * create a new one.  In fact, there's no need to recurse for this
-		 * constraint to partitions, either.
-		 */
-		if (attach_it)
-		{
-			ReleaseSysCache(tuple);
-			continue;
-		}
-
-		constrOid =
-			CreateConstraintEntry(NameStr(constrForm->conname),
-								  constrForm->connamespace,
-								  CONSTRAINT_FOREIGN,
-								  constrForm->condeferrable,
-								  constrForm->condeferred,
-								  constrForm->convalidated,
-								  constrForm->oid,
-								  RelationGetRelid(partRel),
-								  mapped_conkey,
-								  nelem,
-								  nelem,
-								  InvalidOid,	/* not a domain constraint */
-								  constrForm->conindid, /* same index */
-								  constrForm->confrelid,	/* same foreign rel */
-								  confkey,
-								  conpfeqop,
-								  conppeqop,
-								  conffeqop,
-								  nelem,
-								  constrForm->confupdtype,
-								  constrForm->confdeltype,
-								  constrForm->confmatchtype,
-								  NULL,
-								  NULL,
-								  NULL,
-								  false,
-								  1, false, true);
-		subclone = lappend_oid(subclone, constrOid);
-
-		ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
-		recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
-
-		fkconstraint = makeNode(Constraint);
-		/* for now this is all we need */
-		fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
-		fkconstraint->fk_upd_action = constrForm->confupdtype;
-		fkconstraint->fk_del_action = constrForm->confdeltype;
-		fkconstraint->deferrable = constrForm->condeferrable;
-		fkconstraint->initdeferred = constrForm->condeferred;
-
-		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-								 constrOid, constrForm->conindid, false);
-
-		if (cloned)
-		{
-			ClonedConstraint *newc;
-
-			/*
-			 * Feed back caller about the constraints we created, so that they
-			 * can set up constraint verification.
-			 */
-			newc = palloc(sizeof(ClonedConstraint));
-			newc->relid = RelationGetRelid(partRel);
-			newc->refrelid = constrForm->confrelid;
-			newc->conindid = constrForm->conindid;
-			newc->conid = constrOid;
-			newc->constraint = fkconstraint;
-
-			*cloned = lappend(*cloned, newc);
-		}
-
-		ReleaseSysCache(tuple);
-	}
-
-	pfree(attmap);
-	list_free_deep(partFKs);
-
-	/*
-	 * If the partition is partitioned, recurse to handle any constraints that
-	 * were cloned.
-	 */
-	if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
-		subclone != NIL)
-	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
-		int			i;
-
-		for (i = 0; i < partdesc->nparts; i++)
-		{
-			Relation	childRel;
-
-			childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
-			clone_fk_constraints(pg_constraint,
-								 partRel,
-								 childRel,
-								 subclone,
-								 cloned);
-			heap_close(childRel, NoLock);	/* keep lock till commit */
-		}
-	}
-}
-
-/*
  * Test whether given name is currently used as a constraint name
  * for the given object (relation or domain).
  *
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 843ed48aa7..31ca651213 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -412,6 +412,10 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
 						  Relation rel, Constraint *fkconstraint, Oid parentConstr,
 						  bool recurse, bool recursing,
 						  LOCKMODE lockmode);
+static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
+						   List **cloned);
+static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+				   Relation partRel, List *clone, List **cloned);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
 					 DropBehavior behavior,
 					 bool recurse, bool recursing,
@@ -7631,6 +7635,308 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 }
 
 /*
+ * CloneForeignKeyConstraints
+ *		Clone foreign keys from a partitioned table to a newly acquired
+ *		partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes.  The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created, for the purposes of validating the constraint in ALTER TABLE's
+ * Phase 3.
+ */
+static void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+	Relation	pg_constraint;
+	Relation	parentRel;
+	Relation	rel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	tuple;
+	List	   *clone = NIL;
+
+	parentRel = heap_open(parentId, NoLock);	/* already got lock */
+	/* see ATAddForeignKeyConstraint about lock level */
+	rel = heap_open(relationId, AccessExclusiveLock);
+	pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+
+	/* Obtain the list of constraints to clone or attach */
+	ScanKeyInit(&key,
+				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(parentId));
+	scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+							  NULL, 1, &key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Oid		oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+		clone = lappend_oid(clone, oid);
+	}
+	systable_endscan(scan);
+
+	/* Do the actual work, recursing to partitions as needed */
+	CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+
+	/* We're done.  Clean up */
+	heap_close(parentRel, NoLock);
+	heap_close(rel, NoLock);	/* keep lock till commit */
+	heap_close(pg_constraint, RowShareLock);
+}
+
+/*
+ * CloneFkReferencing
+ *		Recursive subroutine for CloneForeignKeyConstraints, referencing side
+ *
+ * Clone the given list of FK constraints when a partition is attached on the
+ * referencing side of those constraints.
+ *
+ * When cloning foreign keys to a partition, it may happen that equivalent
+ * constraints already exist in the partition for some of them.  We can skip
+ * creating a clone in that case, and instead just attach the existing
+ * constraint to the one in the parent.
+ *
+ * This function recurses to partitions, if the new partition is partitioned;
+ * of course, only do this for FKs that were actually cloned.
+ */
+static void
+CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+				   Relation partRel, List *clone, List **cloned)
+{
+	AttrNumber *attmap;
+	List	   *partFKs;
+	List	   *subclone = NIL;
+	ListCell   *cell;
+
+	/*
+	 * The constraint key may differ, if the columns in the partition are
+	 * different.  This map is used to convert them.
+	 */
+	attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
+										RelationGetDescr(parentRel),
+										gettext_noop("could not convert row type"));
+
+	partFKs = copyObject(RelationGetFKeyList(partRel));
+
+	foreach(cell, clone)
+	{
+		Oid			parentConstrOid = lfirst_oid(cell);
+		Form_pg_constraint constrForm;
+		HeapTuple	tuple;
+		int			numfks;
+		AttrNumber	conkey[INDEX_MAX_KEYS];
+		AttrNumber	mapped_conkey[INDEX_MAX_KEYS];
+		AttrNumber	confkey[INDEX_MAX_KEYS];
+		Oid			conpfeqop[INDEX_MAX_KEYS];
+		Oid			conppeqop[INDEX_MAX_KEYS];
+		Oid			conffeqop[INDEX_MAX_KEYS];
+		Constraint *fkconstraint;
+		bool		attach_it;
+		Oid			constrOid;
+		ObjectAddress parentAddr,
+					childAddr;
+		ListCell   *cell;
+		int			i;
+
+		tuple = SearchSysCache1(CONSTROID, parentConstrOid);
+		if (!tuple)
+			elog(ERROR, "cache lookup failed for constraint %u",
+				 parentConstrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* only foreign keys */
+		if (constrForm->contype != CONSTRAINT_FOREIGN)
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+
+		DeconstructConstraintRow(tuple, &numfks, conkey, confkey,
+								 conpfeqop, conppeqop, conffeqop);
+		for (i = 0; i < numfks; i++)
+			mapped_conkey[i] = attmap[conkey[i] - 1];
+
+		/*
+		 * Before creating a new constraint, see whether any existing FKs are
+		 * fit for the purpose.  If one is, attach the parent constraint to it,
+		 * and don't clone anything.  This way we avoid the expensive
+		 * verification step and don't end up with a duplicate FK.  This also
+		 * means we don't consider this constraint when recursing to
+		 * partitions.
+		 */
+		attach_it = false;
+		foreach(cell, partFKs)
+		{
+			ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+			Form_pg_constraint partConstr;
+			HeapTuple	partcontup;
+
+			attach_it = true;
+
+			/*
+			 * Do some quick & easy initial checks.  If any of these fail, we
+			 * cannot use this constraint, but keep looking.
+			 */
+			if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+			{
+				attach_it = false;
+				continue;
+			}
+			for (i = 0; i < numfks; i++)
+			{
+				if (fk->conkey[i] != mapped_conkey[i] ||
+					fk->confkey[i] != confkey[i] ||
+					fk->conpfeqop[i] != conpfeqop[i])
+				{
+					attach_it = false;
+					break;
+				}
+			}
+			if (!attach_it)
+				continue;
+
+			/*
+			 * Looks good so far; do some more extensive checks.  Presumably
+			 * the check for 'convalidated' could be dropped, since we don't
+			 * really care about that, but let's be careful for now.
+			 */
+			partcontup = SearchSysCache1(CONSTROID,
+										 ObjectIdGetDatum(fk->conoid));
+			if (!partcontup)
+				elog(ERROR, "cache lookup failed for constraint %u",
+					 fk->conoid);
+			partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+			if (OidIsValid(partConstr->conparentid) ||
+				!partConstr->convalidated ||
+				partConstr->condeferrable != constrForm->condeferrable ||
+				partConstr->condeferred != constrForm->condeferred ||
+				partConstr->confupdtype != constrForm->confupdtype ||
+				partConstr->confdeltype != constrForm->confdeltype ||
+				partConstr->confmatchtype != constrForm->confmatchtype)
+			{
+				ReleaseSysCache(partcontup);
+				attach_it = false;
+				continue;
+			}
+
+			ReleaseSysCache(partcontup);
+
+			/* looks good!  Attach this constraint */
+			ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
+			CommandCounterIncrement();
+			attach_it = true;
+			break;
+		}
+
+		/*
+		 * If we attached to an existing constraint, there is no need to
+		 * create a new one.  In fact, there's no need to recurse for this
+		 * constraint to partitions, either.
+		 */
+		if (attach_it)
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		constrOid =
+			CreateConstraintEntry(NameStr(constrForm->conname),
+								  constrForm->connamespace,
+								  CONSTRAINT_FOREIGN,
+								  constrForm->condeferrable,
+								  constrForm->condeferred,
+								  constrForm->convalidated,
+								  parentConstrOid,
+								  RelationGetRelid(partRel),
+								  mapped_conkey,
+								  numfks,
+								  numfks,
+								  InvalidOid,	/* not a domain constraint */
+								  constrForm->conindid, /* same index */
+								  constrForm->confrelid,	/* same foreign rel */
+								  confkey,
+								  conpfeqop,
+								  conppeqop,
+								  conffeqop,
+								  numfks,
+								  constrForm->confupdtype,
+								  constrForm->confdeltype,
+								  constrForm->confmatchtype,
+								  NULL,
+								  NULL,
+								  NULL,
+								  false,
+								  1, false, true);
+		subclone = lappend_oid(subclone, constrOid);
+
+		ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+		recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+		fkconstraint = makeNode(Constraint);
+		/* for now this is all we need */
+		fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+
+		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
+								 constrOid, constrForm->conindid, false);
+
+		if (cloned)
+		{
+			ClonedConstraint *newc;
+
+			/*
+			 * Feed back caller about the constraints we created, so that they
+			 * can set up constraint verification.
+			 */
+			newc = palloc(sizeof(ClonedConstraint));
+			newc->relid = RelationGetRelid(partRel);
+			newc->refrelid = constrForm->confrelid;
+			newc->conindid = constrForm->conindid;
+			newc->conid = constrOid;
+			newc->constraint = fkconstraint;
+
+			*cloned = lappend(*cloned, newc);
+		}
+
+		ReleaseSysCache(tuple);
+	}
+
+	pfree(attmap);
+	list_free_deep(partFKs);
+
+	/*
+	 * If the partition is partitioned, recurse to handle any constraints that
+	 * were cloned.
+	 */
+	if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+		subclone != NIL)
+	{
+		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
+		int			i;
+
+		for (i = 0; i < partdesc->nparts; i++)
+		{
+			Relation	childRel;
+
+			childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
+			CloneFkReferencing(pg_constraint,
+							   partRel,
+							   childRel,
+							   subclone,
+							   cloned);
+			heap_close(childRel, NoLock);	/* keep lock till commit */
+		}
+	}
+}
+
+/*
  * ALTER TABLE ALTER CONSTRAINT
  *
  * Update the attributes of a constraint.
diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h
index 8405fe2eea..8b2af36ba7 100644
--- a/src/include/catalog/pg_constraint.h
+++ b/src/include/catalog/pg_constraint.h
@@ -226,9 +226,6 @@ extern Oid CreateConstraintEntry(const char *constraintName,
 					  bool conNoInherit,
 					  bool is_internal);
 
-extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
-						   List **cloned);
-
 extern void RemoveConstraintById(Oid conId);
 extern void RenameConstraintById(Oid conId, const char *newname);
 
-- 
2.11.0

>From b1024f44489f4d610e3658fa91642bf090692faf Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 28 Nov 2018 11:52:00 -0300
Subject: [PATCH 3/5] Rework deleteObjectsInList to allow objtype-specific
 checks

This doesn't change any functionality yet.
---
 src/backend/catalog/dependency.c | 41 ++++++++++++++++++++++++----------------
 1 file changed, 25 insertions(+), 16 deletions(-)

diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 7dfa3278a5..c7d3b8a654 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -219,29 +219,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 	int			i;
 
 	/*
-	 * Keep track of objects for event triggers, if necessary.
+	 * Invoke pre-deletion callbacks and keep track of objects for event
+	 * triggers, if necessary.
 	 */
-	if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
+	for (i = 0; i < targetObjects->numrefs; i++)
 	{
-		for (i = 0; i < targetObjects->numrefs; i++)
+		const ObjectAddress *thisobj = &targetObjects->refs[i];
+		Oid			objectClass = getObjectClass(thisobj);
+
+		if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL))
 		{
-			const ObjectAddress *thisobj = &targetObjects->refs[i];
-			const ObjectAddressExtra *extra = &targetObjects->extras[i];
-			bool		original = false;
-			bool		normal = false;
-
-			if (extra->flags & DEPFLAG_ORIGINAL)
-				original = true;
-			if (extra->flags & DEPFLAG_NORMAL)
-				normal = true;
-			if (extra->flags & DEPFLAG_REVERSE)
-				normal = true;
-
-			if (EventTriggerSupportsObjectClass(getObjectClass(thisobj)))
+			if (EventTriggerSupportsObjectClass(objectClass))
 			{
+				bool		original = false;
+				bool		normal = false;
+				const ObjectAddressExtra *extra = &targetObjects->extras[i];
+
+				if (extra->flags & DEPFLAG_ORIGINAL)
+					original = true;
+				if (extra->flags & DEPFLAG_NORMAL ||
+					extra->flags & DEPFLAG_REVERSE)
+					normal = true;
+
 				EventTriggerSQLDropAddObject(thisobj, original, normal);
 			}
 		}
+
+		/* Invoke class-specific pre-deletion checks */
+		switch (objectClass)
+		{
+			default:
+				break;
+		}
 	}
 
 	/*
-- 
2.11.0

>From 44300eb0dfdf2221ed8fffd200fc7d6b958d090e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 30 Aug 2018 06:01:38 -0300
Subject: [PATCH 4/5] Support foreign key referencing partitioned tables

---
 src/backend/catalog/dependency.c          |   3 +
 src/backend/catalog/heap.c                |  18 +
 src/backend/catalog/pg_constraint.c       |  15 +-
 src/backend/commands/tablecmds.c          | 813 +++++++++++++++++++++++++-----
 src/backend/utils/adt/ri_triggers.c       | 228 ++++++++-
 src/backend/utils/adt/ruleutils.c         |  18 +
 src/include/catalog/heap.h                |   2 +
 src/include/commands/tablecmds.h          |   6 +-
 src/include/commands/trigger.h            |   1 +
 src/include/utils/ruleutils.h             |   1 +
 src/test/regress/expected/foreign_key.out | 168 +++++-
 src/test/regress/sql/foreign_key.sql      | 114 ++++-
 12 files changed, 1201 insertions(+), 186 deletions(-)

diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index c7d3b8a654..35c648fb21 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -248,6 +248,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 		/* Invoke class-specific pre-deletion checks */
 		switch (objectClass)
 		{
+			case OCLASS_CLASS:
+				pre_drop_class_check(thisobj->objectId, thisobj->objectSubId);
+				break;
 			default:
 				break;
 		}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 11debaa780..55f9e58a39 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1788,6 +1788,24 @@ RemoveAttrDefaultById(Oid attrdefId)
 }
 
 /*
+ * XXX explain
+ */
+void
+pre_drop_class_check(Oid relationId, Oid objectSubId)
+{
+	Relation	relation;
+
+	/* caller must hold strong lock already, if they're dropping */
+	relation = relation_open(relationId, NoLock);
+
+	if (relation->rd_rel->relkind == RELKIND_RELATION &&
+		relation->rd_rel->relispartition)
+		CheckNoForeignKeyRefs(relation, true);
+
+	relation_close(relation, NoLock);
+}
+
+/*
  * heap_drop_with_catalog	- removes specified relation from catalogs
  *
  * Note that this routine is not responsible for dropping objects that are
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 2833e45cff..1c5017eccc 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -303,10 +303,12 @@ CreateConstraintEntry(const char *constraintName,
 	if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
 	{
 		/*
-		 * Register normal dependency on the unique index that supports a
-		 * foreign-key constraint.  (Note: for indexes associated with unique
-		 * or primary-key constraints, the dependency runs the other way, and
-		 * is not made here.)
+		 * Register dependency on the unique index that supports a foreign-key
+		 * constraint.  (Note: for indexes associated with unique or
+		 * primary-key constraints, the dependency runs the other way, and is
+		 * not made here.)  For standalone constraints, this is a normal
+		 * dependency; for a constraint that is implementation part of a
+		 * larger one, it's internal-auto.
 		 */
 		ObjectAddress relobject;
 
@@ -314,7 +316,10 @@ CreateConstraintEntry(const char *constraintName,
 		relobject.objectId = indexRelId;
 		relobject.objectSubId = 0;
 
-		recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
+		recordDependencyOn(&conobject, &relobject,
+						   OidIsValid(parentConstrId) ?
+						   DEPENDENCY_INTERNAL_AUTO :
+						   DEPENDENCY_NORMAL);
 	}
 
 	if (foreignNKeys > 0)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 31ca651213..74109557ae 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -412,10 +412,28 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
 						  Relation rel, Constraint *fkconstraint, Oid parentConstr,
 						  bool recurse, bool recursing,
 						  LOCKMODE lockmode);
+static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
+					   Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks, int16 *pkattnum, int16 *fkattnum,
+					   Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+					   bool old_check_ok);
+static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
+						Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks,
+						int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+						Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok,
+						bool create_constraint);
 static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
 						   List **cloned);
+static void CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint);
 static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 				   Relation partRel, List *clone, List **cloned);
+static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+							  Constraint *fkconstraint, Oid constraintOid,
+							  Oid indexOid);
+static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
+							   Constraint *fkconstraint, Oid constraintOid,
+							   Oid indexOid);
 static void ATExecDropConstraint(Relation rel, const char *constrName,
 					 DropBehavior behavior,
 					 bool recurse, bool recursing,
@@ -3407,7 +3425,8 @@ AlterTableGetLockLevel(List *cmds)
 
 				/*
 				 * Removing constraints can affect SELECTs that have been
-				 * optimised assuming the constraint holds true.
+				 * optimised assuming the constraint holds true. See also
+				 * CloneFkReferenced.
 				 */
 			case AT_DropConstraint: /* as DROP INDEX */
 			case AT_DropNotNull:	/* may change some SQL plans */
@@ -6987,11 +7006,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 			break;
 
 		case CONSTR_FOREIGN:
-
 			/*
-			 * Note that we currently never recurse for FK constraints, so the
-			 * "recurse" flag is silently ignored.
-			 *
 			 * Assign or validate constraint name
 			 */
 			if (newConstraint->conname)
@@ -7170,6 +7185,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
  * Subroutine for ATExecAddConstraint.  Must already hold exclusive
  * lock on the rel, and have done appropriate validity checks for it.
  * We do permissions checks here, however.
+ *
+ * When the referenced or referencing tables (or both) are partitioned,
+ * multiple pg_constraint rows are required -- one for each partitioned table
+ * and each partition on each side (fortunately, not one for every combination
+ * thereof).  We also need the appropriate triggers to be created on each leaf
+ * partition.
  */
 static ObjectAddress
 ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
@@ -7189,7 +7210,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	int			numfks,
 				numpks;
 	Oid			indexOid;
-	Oid			constrOid;
 	bool		old_check_ok;
 	ObjectAddress address;
 	ListCell   *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
@@ -7207,12 +7227,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * Validity checks (permission checks wait till we have the column
 	 * numbers)
 	 */
-	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("cannot reference partitioned table \"%s\"",
-						RelationGetRelationName(pkrel))));
-
 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
 		if (!recurse)
@@ -7230,7 +7244,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 					 errdetail("This feature is not yet supported on partitioned tables.")));
 	}
 
-	if (pkrel->rd_rel->relkind != RELKIND_RELATION)
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("referenced relation \"%s\" is not a table",
@@ -7440,8 +7455,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
 			ereport(ERROR,
 					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("foreign key constraint \"%s\" "
-							"cannot be implemented",
+					 errmsg(fkconstraint->conname ?
+							"foreign key constraint \"%s\" cannot be implemented" :
+							"foreign key constraint cannot be implemented",
 							fkconstraint->conname),
 					 errdetail("Key columns \"%s\" and \"%s\" "
 							   "are of incompatible types: %s and %s.",
@@ -7529,15 +7545,160 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	}
 
 	/*
+	 * Create all the constraint and trigger objects, recursing to partitions
+	 * as necessary.  First handle the referenced side.
+	 */
+	address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
+									 indexOid,
+									 InvalidOid,	/* no parent constraint */
+									 numfks,
+									 pkattnum,
+									 fkattnum,
+									 pfeqoperators,
+									 ppeqoperators,
+									 ffeqoperators,
+									 old_check_ok);
+
+	/*
+	 * Now handle the referencing side.  We don't need the topmost constraint
+	 * on this side, because the constraint we created above fills that role;
+	 * if this recurses for any partitions then more constraints are added.
+	 */
+	addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
+							indexOid,
+							address.objectId,
+							numfks,
+							pkattnum,
+							fkattnum,
+							pfeqoperators,
+							ppeqoperators,
+							ffeqoperators,
+							old_check_ok,
+							false);
+
+	/*
+	 * Close pk table, but keep lock until we've committed.
+	 */
+	heap_close(pkrel, NoLock);
+
+	return address;
+}
+
+/*
+ * Return the OID of the index, in the given partition, that is a child of the
+ * given index or InvalidOid if there isn't one.
+ *
+ * XXX maybe this should be in catalog/partition.c?  Not for the moment only
+ * because all the callers are here ...
+ */
+static Oid
+index_get_partition(Relation partition, Oid indexId)
+{
+	List	   *idxlist = RelationGetIndexList(partition);
+	ListCell   *l;
+
+	foreach(l, idxlist)
+	{
+		Oid			partIdx = lfirst_oid(l);
+		HeapTuple	tup;
+		Form_pg_class classForm;
+		bool		ispartition;
+
+		tup = SearchSysCache1(RELOID, ObjectIdGetDatum(partIdx));
+		if (!tup)
+			elog(ERROR, "cache lookup failed for relation %u", partIdx);
+		classForm = (Form_pg_class) GETSTRUCT(tup);
+		ispartition = classForm->relispartition;
+		ReleaseSysCache(tup);
+		if (!ispartition)
+			continue;
+		if (get_partition_parent(lfirst_oid(l)) == indexId)
+		{
+			list_free(idxlist);
+			return partIdx;
+		}
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * addFkRecurseReferenced
+ *		recursive subroutine for ATAddForeignKeyConstraint, referenced side
+ *
+ * Create pg_constraint rows for the referenced side of the constraint,
+ * referencing the parent of the referencing side; also create action triggers
+ * on leaf partitions.  If the table is partitioned, recurse to handle each
+ * partition.
+ *
+ * On constraint names: It's not possible in general to give all the cascaded
+ * constraints the same name, so we don't try.  (Also, even when it *is*
+ * possible, it goes counter to the SQL-standard rule that constraint names
+ * must be unique within a schema.)  Therefore we apply the given name to the
+ * top-most constraint, and use generated names when cascading to partitions
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the root referencing relation.
+ * pkrel is the referenced relation; might be a partition, if recursing.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstraint is the OID of a parent constraint; InvalidOid if this is a
+ * top-level constraint.
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ */
+static ObjectAddress
+addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
+					   Relation pkrel, Oid indexOid, Oid parentConstraint,
+					   int numfks,
+					   int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
+					   Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
+{
+	ObjectAddress address;
+	Oid			constrOid;
+	char	   *conname;
+
+	/*
+	 * Verify relkind for each referenced partition.  At the top level, this
+	 * is redundant with a previous check, but we need it when recursing.
+	 */
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("referenced relation \"%s\" is not a table",
+						RelationGetRelationName(pkrel))));
+
+	/*
+	 * Caller supplies us with a constraint name; however, it may be
+	 * used in this partition, so come up with a different one in that
+	 * case.
+	 */
+	if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+							 RelationGetRelid(rel),
+							 fkconstraint->conname))
+		conname = ChooseConstraintName(RelationGetRelationName(rel),
+									   strVal(linitial(fkconstraint->fk_attrs)),
+									   "fkey",
+									   RelationGetNamespace(rel), NIL);
+	else
+		conname = fkconstraint->conname;
+
+	/*
 	 * Record the FK constraint in pg_constraint.
 	 */
-	constrOid = CreateConstraintEntry(fkconstraint->conname,
+	constrOid = CreateConstraintEntry(conname,
 									  RelationGetNamespace(rel),
 									  CONSTRAINT_FOREIGN,
 									  fkconstraint->deferrable,
 									  fkconstraint->initdeferred,
 									  fkconstraint->initially_valid,
-									  parentConstr,
+									  parentConstraint,
 									  RelationGetRelid(rel),
 									  fkattnum,
 									  numfks,
@@ -7549,7 +7710,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 									  pfeqoperators,
 									  ppeqoperators,
 									  ffeqoperators,
-									  numpks,
+									  numfks,
 									  fkconstraint->fk_upd_action,
 									  fkconstraint->fk_del_action,
 									  fkconstraint->fk_matchtype,
@@ -7562,79 +7723,262 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 									  false);	/* is_internal */
 	ObjectAddressSet(address, ConstraintRelationId, constrOid);
 
-	/*
-	 * Create the triggers that will enforce the constraint.  We only want the
-	 * action triggers to appear for the parent partitioned relation, even
-	 * though the constraints also exist below.
-	 */
-	createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
-							 constrOid, indexOid, !recursing);
+	/* make new constraint visible, in case we add more */
+	CommandCounterIncrement();
 
 	/*
-	 * Tell Phase 3 to check that the constraint is satisfied by existing
-	 * rows. We can skip this during table creation, when requested explicitly
-	 * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
-	 * recreating a constraint following a SET DATA TYPE operation that did
-	 * not impugn its validity.
+	 * If the referenced table is a plain relation, create the action triggers
+	 * that enforce the constraint.
 	 */
-	if (!old_check_ok && !fkconstraint->skip_validation)
+	if (pkrel->rd_rel->relkind == RELKIND_RELATION)
 	{
-		NewConstraint *newcon;
-
-		newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
-		newcon->name = fkconstraint->conname;
-		newcon->contype = CONSTR_FOREIGN;
-		newcon->refrelid = RelationGetRelid(pkrel);
-		newcon->refindid = indexOid;
-		newcon->conid = constrOid;
-		newcon->qual = (Node *) fkconstraint;
-
-		tab->constraints = lappend(tab->constraints, newcon);
+		createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
+									   fkconstraint,
+									   constrOid, indexOid);
 	}
 
 	/*
-	 * When called on a partitioned table, recurse to create the constraint on
-	 * the partitions also.
+	 * If the referenced table is partitioned, recurse on ourselves to handle
+	 * each partition.  We need one pg_constraint row created for each
+	 * partition in addition to the pg_constraint row for the parent table.
 	 */
-	if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc;
+		PartitionDesc pd = RelationGetPartitionDesc(pkrel);
+		int			i;
 
-		partdesc = RelationGetPartitionDesc(rel);
-
-		for (i = 0; i < partdesc->nparts; i++)
+		for (i = 0; i < pd->nparts; i++)
 		{
-			Oid			partitionId = partdesc->oids[i];
-			Relation	partition = heap_open(partitionId, lockmode);
-			AlteredTableInfo *childtab;
-			ObjectAddress childAddr;
+			Relation	partRel;
+			AttrNumber *map;
+			AttrNumber *mapped_pkattnum;
+			Oid			partIndexId;
 
-			CheckTableNotInUse(partition, "ALTER TABLE");
+			partRel = heap_open(pd->oids[i], ShareRowExclusiveLock);
 
-			/* Find or create work queue entry for this table */
-			childtab = ATGetQueueEntry(wqueue, partition);
+			/*
+			 * Map the attribute numbers in the referenced side of the FK
+			 * definition to match the partition's column layout.
+			 */
+			map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+													RelationGetDescr(pkrel),
+													gettext_noop("could not convert row type"));
+			if (map)
+			{
+				int			j;
 
-			childAddr =
-				ATAddForeignKeyConstraint(wqueue, childtab, partition,
-										  fkconstraint, constrOid,
-										  recurse, true, lockmode);
+				mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
+				for (j = 0; j < numfks; j++)
+					mapped_pkattnum[j] = map[pkattnum[j] - 1];
+			}
+			else
+				mapped_pkattnum = pkattnum;
 
-			/* Record this constraint as dependent on the parent one */
-			recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO);
+			/* do the deed */
+			partIndexId = index_get_partition(partRel, indexOid);
+			if (!OidIsValid(partIndexId))
+				elog(ERROR, "index for %u not found in partition %s",
+					 indexOid, RelationGetRelationName(partRel));
+			addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
+								   partIndexId, constrOid, numfks,
+								   mapped_pkattnum, fkattnum,
+								   pfeqoperators, ppeqoperators, ffeqoperators,
+								   old_check_ok);
 
-			heap_close(partition, NoLock);
+			/* Done -- clean up (but keep the lock) */
+			heap_close(partRel, NoLock);
+			if (map)
+			{
+				pfree(mapped_pkattnum);
+				pfree(map);
+			}
 		}
 	}
 
-	/*
-	 * Close pk table, but keep lock until we've committed.
-	 */
-	heap_close(pkrel, NoLock);
-
 	return address;
 }
 
 /*
+ * addFkRecurseReferencing
+ *		recursive subroutine for ATAddForeignKeyConstraint, referencing side
+ *
+ * wqueue is the ALTER TABLE work queue; can be NULL when not running as part
+ * of an ALTER TABLE sequence.
+ * fkconstraint is the constraint being added.
+ * rel is the referencing relation; might be a partition, if recursing.
+ * pkrel is the root referenced relation.
+ * indexOid is the OID of the index (on pkrel) implementing this constraint.
+ * parentConstr is the OID of the parent constraint (there is always one).
+ * numfks is the number of columns in the foreign key
+ * pkattnum is the attnum array of referenced attributes.
+ * fkattnum is the attnum array of referencing attributes.
+ * pf/pp/ffeqoperators are OID array of operators between columns.
+ * old_check_ok signals that this constraint replaces an existing one that
+ * was already validated (thus this one doesn't need validation).
+ * create_constraint indicates whether to create a constraint; if false, the
+ * given parentConstr is the origin of all others.
+ *
+ * Note we never try to use the constraint name assigned in fkconstraint.
+ */
+static void
+addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
+						Relation pkrel, Oid indexOid, Oid parentConstr,
+						int numfks, int16 *pkattnum, int16 *fkattnum,
+						Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
+						bool old_check_ok, bool create_constraint)
+{
+	Oid			constrOid;		/* uninitialized */
+	char	   *conname;
+
+	AssertArg(OidIsValid(parentConstr));
+
+	/*
+	 * Verify relkind for each referencing partition.  At the top level, this
+	 * is redundant with a previous check, but we need it when recursing.
+	 */
+	if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
+		pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("referenced relation \"%s\" is not a table",
+						RelationGetRelationName(pkrel))));
+
+	if (!create_constraint)
+		constrOid = parentConstr;
+	else
+	{
+		/* Figure out a good constraint name to use */
+		conname = ChooseConstraintName(RelationGetRelationName(rel),
+									   strVal(linitial(fkconstraint->fk_attrs)),
+									   "fkey",
+									   RelationGetNamespace(rel), NIL);
+
+		/*
+		 * Record the FK constraint in pg_constraint.
+		 */
+		constrOid = CreateConstraintEntry(conname,
+										  RelationGetNamespace(rel),
+										  CONSTRAINT_FOREIGN,
+										  fkconstraint->deferrable,
+										  fkconstraint->initdeferred,
+										  fkconstraint->initially_valid,
+										  parentConstr,
+										  RelationGetRelid(rel),
+										  fkattnum,
+										  numfks,
+										  numfks,
+										  InvalidOid,
+										  indexOid,
+										  RelationGetRelid(pkrel),
+										  pkattnum,
+										  pfeqoperators,
+										  ppeqoperators,
+										  ffeqoperators,
+										  numfks,
+										  fkconstraint->fk_upd_action,
+										  fkconstraint->fk_del_action,
+										  fkconstraint->fk_matchtype,
+										  NULL, /* no exclusion constraint */
+										  NULL, /* no check constraint */
+										  NULL,
+										  true, /* islocal */
+										  0,	/* inhcount */
+										  true, /* isnoinherit */
+										  false);	/* is_internal */
+		/* make constraint visible */
+		CommandCounterIncrement();
+	}
+
+	/*
+	 * If the referencing relation is a plain table, add the check triggers to
+	 * it and, if necessary, schedule it to be checked in Phase 3.
+	 *
+	 * If the relation is partitioned, drill down to do it to its partitions.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		createForeignKeyCheckTriggers(RelationGetRelid(rel),
+									  RelationGetRelid(pkrel),
+									  fkconstraint,
+									  constrOid,
+									  indexOid);
+
+		/*
+		 * Tell Phase 3 to check that the constraint is satisfied by existing
+		 * rows. We can skip this during table creation, when requested
+		 * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
+		 * and when we're recreating a constraint following a SET DATA TYPE
+		 * operation that did not impugn its validity.
+		 */
+		if (!old_check_ok && !fkconstraint->skip_validation)
+		{
+			NewConstraint *newcon;
+			AlteredTableInfo *tab;
+
+			tab = ATGetQueueEntry(wqueue, rel);
+
+			newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+			newcon->name = fkconstraint->conname;
+			newcon->contype = CONSTR_FOREIGN;
+			newcon->refrelid = RelationGetRelid(pkrel);
+			newcon->refindid = indexOid;
+			newcon->conid = constrOid;
+			newcon->qual = (Node *) fkconstraint;
+
+			tab->constraints = lappend(tab->constraints, newcon);
+		}
+	}
+	else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		PartitionDesc pd;
+		int			i;
+
+		pd = RelationGetPartitionDesc(rel);
+		for (i = 0; i < pd->nparts; i++)
+		{
+			Relation	partRel;
+			AttrNumber *map;
+			AttrNumber *mapped_fkattnum;
+
+			partRel = heap_open(pd->oids[i], ShareRowExclusiveLock);
+
+			/*
+			 * Map the attribute numbers in the referencing side of the FK
+			 * definition to match the partition's column layout.
+			 */
+			map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
+													RelationGetDescr(rel),
+													gettext_noop("could not convert row type"));
+			if (map)
+			{
+				int			j;
+
+				mapped_fkattnum = palloc(sizeof(AttrNumber) * numfks);
+				for (j = 0; j < numfks; j++)
+					mapped_fkattnum[j] = map[fkattnum[j] - 1];
+			}
+			else
+				mapped_fkattnum = fkattnum;
+
+			/* down the rabbit hole */
+			addFkRecurseReferencing(wqueue, fkconstraint, partRel, pkrel,
+									indexOid, constrOid, numfks, pkattnum,
+									mapped_fkattnum,
+									pfeqoperators, ppeqoperators, ffeqoperators,
+									old_check_ok, true);
+
+			/* Done -- clean up (but keep the lock) */
+			heap_close(partRel, NoLock);
+			if (map)
+			{
+				pfree(mapped_fkattnum);
+				pfree(map);
+			}
+		}
+	}
+}
+
+/*
  * CloneForeignKeyConstraints
  *		Clone foreign keys from a partitioned table to a newly acquired
  *		partition.
@@ -7663,7 +8007,14 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 	rel = heap_open(relationId, AccessExclusiveLock);
 	pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
 
-	/* Obtain the list of constraints to clone or attach */
+	/*
+	 * Clone constraints where the parent is in the referenced side.
+	 */
+	CloneFkReferenced(parentRel, rel, pg_constraint);
+
+	/*
+	 * Now search for constraints where the parent is in the referencing side.
+	 */
 	ScanKeyInit(&key,
 				Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
 				F_OIDEQ, ObjectIdGetDatum(parentId));
@@ -7680,13 +8031,160 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 	/* Do the actual work, recursing to partitions as needed */
 	CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
 
-	/* We're done.  Clean up */
+	list_free(clone);
+
+	/* We're done.  Clean up, keeping locks till commit */
 	heap_close(parentRel, NoLock);
-	heap_close(rel, NoLock);	/* keep lock till commit */
+	heap_close(rel, NoLock);
 	heap_close(pg_constraint, RowShareLock);
 }
 
 /*
+ * CloneFkReferenced
+ *		Subroutine for CloneForeignKeyConstraints, referenced side
+ *
+ * Clone the FKs that reference the parent relation.  Used when partitionRel
+ * is created/attached.  (Recursion to partitions is effected by callee
+ * addFkRecurseReferenced, so this routine is not itself recursive.)
+ */
+static void
+CloneFkReferenced(Relation parentRel, Relation partitionRel,
+				  Relation pg_constraint)
+{
+	AttrNumber *attmap;
+	ListCell   *cell;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+	HeapTuple	tuple;
+	List	   *clone = NIL;
+
+	/*
+	 * Search for any constraints where this partition is in the referenced
+	 * side.  However, we must ignore any constraint whose parent constraint
+	 * is also going to be cloned, to avoid duplicates.  So do it in two
+	 * steps: first construct the list of constraints to clone, then go over
+	 * that list cloning those whose parents are not in the list.  (We must
+	 * not rely on the parent being seen first, since catalog order could
+	 * return children first.)
+	 */
+	attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
+										RelationGetDescr(parentRel),
+										gettext_noop("could not convert row type"));
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+	/* This is a seqscan, as we don't have a usable index ... */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true,
+							  NULL, 2, key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* ignore this constraint if the parent is already on the list */
+		if (list_member_oid(clone, constrForm->conparentid))
+			continue;
+
+		clone = lappend_oid(clone, constrForm->oid);
+	}
+	systable_endscan(scan);
+
+	foreach(cell, clone)
+	{
+		Oid			constrOid = lfirst_oid(cell);
+		Form_pg_constraint constrForm;
+		Form_pg_attribute att;
+		Relation	fkRel;
+		Oid			indexOid;
+		Oid			partIndexId;
+		int			numfks;
+		AttrNumber	conkey[INDEX_MAX_KEYS];
+		AttrNumber	mapped_confkey[INDEX_MAX_KEYS];
+		AttrNumber	confkey[INDEX_MAX_KEYS];
+		Oid			conpfeqop[INDEX_MAX_KEYS];
+		Oid			conppeqop[INDEX_MAX_KEYS];
+		Oid			conffeqop[INDEX_MAX_KEYS];
+		Constraint *fkconstraint;
+
+		tuple = SearchSysCache1(CONSTROID, constrOid);
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for constraint %u", constrOid);
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		/* skip children whose parents are going to be cloned, as above */
+		if (list_member_oid(clone, constrForm->conparentid))
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
+
+		/*
+		 * Because we're only expanding the key space at the referenced side,
+		 * we don't need to prevent any operation in the referencing table, so
+		 * AccessShareLock suffices (assumes that dropping the constraint
+		 * acquires AEL).
+		 */
+		fkRel = heap_open(constrForm->conrelid, AccessShareLock);
+
+		indexOid = constrForm->conindid;
+		DeconstructConstraintRow(tuple,
+								 &numfks,
+								 conkey,
+								 confkey,
+								 conpfeqop,
+								 conppeqop,
+								 conffeqop);
+		for (int i = 0; i < numfks; i++)
+			mapped_confkey[i] = attmap[confkey[i] - 1];
+
+		fkconstraint = makeNode(Constraint);
+		/* for now this is all we need */
+		fkconstraint->conname = NameStr(constrForm->conname);
+		fkconstraint->fk_upd_action = constrForm->confupdtype;
+		fkconstraint->fk_del_action = constrForm->confdeltype;
+		fkconstraint->deferrable = constrForm->condeferrable;
+		fkconstraint->initdeferred = constrForm->condeferred;
+		fkconstraint->initially_valid = true;
+		fkconstraint->fk_matchtype = constrForm->confmatchtype;
+
+		/*
+		 * This is a bit grotty: we only need the first column name, which is
+		 * used to generate the constraint name.
+		 */
+		att = TupleDescAttr(RelationGetDescr(partitionRel), conkey[0] - 1);
+		fkconstraint->fk_attrs = list_make1(makeString(NameStr(att->attname)));
+
+		/*
+		 * Add the new foreign key constraint pointing to the new partition.
+		 * Because this new partition appears in the referenced side of the
+		 * constraint, we don't need to set up for Phase 3 check.
+		 */
+		partIndexId = index_get_partition(partitionRel, indexOid);
+		if (!OidIsValid(partIndexId))
+			elog(ERROR, "index for %u not found in partition %s",
+				 indexOid, RelationGetRelationName(partitionRel));
+		addFkRecurseReferenced(NULL,
+							   fkconstraint,
+							   fkRel,
+							   partitionRel,
+							   partIndexId,
+							   constrOid,
+							   numfks,
+							   mapped_confkey,
+							   conkey,
+							   conpfeqop,
+							   conppeqop,
+							   conffeqop,
+							   true);
+
+		heap_close(fkRel, NoLock);
+		ReleaseSysCache(tuple);
+	}
+}
+
+/*
  * CloneFkReferencing
  *		Recursive subroutine for CloneForeignKeyConstraints, referencing side
  *
@@ -7752,18 +8250,24 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 			ReleaseSysCache(tuple);
 			continue;
 		}
+		if (list_member_oid(clone, constrForm->conparentid))
+		{
+			ReleaseSysCache(tuple);
+			continue;
+		}
 
 		ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
 
 		DeconstructConstraintRow(tuple, &numfks, conkey, confkey,
 								 conpfeqop, conppeqop, conffeqop);
+
 		for (i = 0; i < numfks; i++)
 			mapped_conkey[i] = attmap[conkey[i] - 1];
 
 		/*
 		 * Before creating a new constraint, see whether any existing FKs are
-		 * fit for the purpose.  If one is, attach the parent constraint to it,
-		 * and don't clone anything.  This way we avoid the expensive
+		 * fit for the purpose.  If one is, attach the parent constraint to
+		 * it, and don't clone anything.  This way we avoid the expensive
 		 * verification step and don't end up with a duplicate FK.  This also
 		 * means we don't consider this constraint when recursing to
 		 * partitions.
@@ -7884,8 +8388,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		fkconstraint->deferrable = constrForm->condeferrable;
 		fkconstraint->initdeferred = constrForm->condeferred;
 
-		createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
-								 constrOid, constrForm->conindid, false);
+		/* If this is a plain relation, create the check triggers */
+		if (partRel->rd_rel->relkind == RELKIND_RELATION)
+			createForeignKeyCheckTriggers(RelationGetRelid(partRel),
+										  constrForm->confrelid,
+										  fkconstraint, constrOid,
+										  constrForm->conindid);
 
 		if (cloned)
 		{
@@ -7921,6 +8429,9 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
 		PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
 		int			i;
 
+		/* make previously created constraints visible */
+		CommandCounterIncrement();
+
 		for (i = 0; i < partdesc->nparts; i++)
 		{
 			Relation	childRel;
@@ -8968,37 +9479,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
 }
 
 /*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
- */
-void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
-						 Oid constraintOid, Oid indexOid, bool create_action)
-{
-	/*
-	 * For the referenced side, create action triggers, if requested.  (If the
-	 * referencing side is partitioned, there is still only one trigger, which
-	 * runs on the referenced side and points to the top of the referencing
-	 * hierarchy.)
-	 */
-	if (create_action)
-		createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
-									   indexOid);
-
-	/*
-	 * For the referencing side, create the check triggers.  We only need
-	 * these on the partitions.
-	 */
-	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
-									  fkconstraint, constraintOid, indexOid);
-
-	CommandCounterIncrement();
-}
-
-/*
  * ALTER TABLE DROP CONSTRAINT
  *
  * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
@@ -14848,6 +15328,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 
 	partRel = heap_openrv(name, AccessShareLock);
 
+	/* Ensure that foreign keys still hold after this detach */
+	CheckNoForeignKeyRefs(partRel, false);
+
 	/* All inheritance related checks are performed within the function */
 	RemoveInheritance(partRel, rel);
 
@@ -15164,36 +15647,17 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 static void
 refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl)
 {
-	Relation	pg_inherits;
-	ScanKeyData key;
-	HeapTuple	tuple;
-	SysScanDesc scan;
+	Oid		existingIdx;
 
-	pg_inherits = heap_open(InheritsRelationId, AccessShareLock);
-	ScanKeyInit(&key, Anum_pg_inherits_inhparent,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(RelationGetRelid(parentIdx)));
-	scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true,
-							  NULL, 1, &key);
-	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
-	{
-		Form_pg_inherits inhForm;
-		Oid			tab;
-
-		inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
-		tab = IndexGetRelation(inhForm->inhrelid, false);
-		if (tab == RelationGetRelid(partitionTbl))
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
-							RelationGetRelationName(partIdx),
-							RelationGetRelationName(parentIdx)),
-					 errdetail("Another index is already attached for partition \"%s\".",
-							   RelationGetRelationName(partitionTbl))));
-	}
-
-	systable_endscan(scan);
-	heap_close(pg_inherits, AccessShareLock);
+	existingIdx = index_get_partition(partitionTbl, RelationGetRelid(parentIdx));
+	if (OidIsValid(existingIdx))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
+						RelationGetRelationName(partIdx),
+						RelationGetRelationName(parentIdx)),
+				 errdetail("Another index is already attached for partition \"%s\".",
+						   RelationGetRelationName(partitionTbl))));
 }
 
 /*
@@ -15324,3 +15788,82 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
 	if (opened)
 		heap_close(classRel, RowExclusiveLock);
 }
+
+/*
+ * During an operation that removes a partition from a partitioned table,
+ * verify that any foreign keys pointing to the partitioned table would not
+ * become invalid.  (Used for ALTER TABLE ... DETACH as well as DROP).
+ * An error raised if any referenced values exist.
+ */
+void
+CheckNoForeignKeyRefs(Relation partition, bool isDrop)
+{
+	Relation	pg_constraint;
+	HeapTuple	tuple;
+	SysScanDesc scan;
+	ScanKeyData key[2];
+
+	/*
+	 * Quick exit if there are no indexes.
+	 */
+	if (RelationGetIndexList(partition) == NIL)
+		return;
+
+	/*
+	 * In the DROP case, we can skip this check when this is a partitioned
+	 * partition, because its partitions will go through this also, and we'd
+	 * run the check twice uselessly.
+	 *
+	 * In the DETACH case, this is only called for the top-level relation,
+	 * so we must run it nevertheless.
+	 */
+	if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return;
+
+	/* Search for constraints referencing this table */
+	pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
+	ScanKeyInit(&key[1],
+				Anum_pg_constraint_contype, BTEqualStrategyNumber,
+				F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
+
+	/* XXX This is a seqscan, as we don't have a usable index */
+	scan = systable_beginscan(pg_constraint, InvalidOid, true,
+							  NULL, 2, key);
+	while ((tuple = systable_getnext(scan)) != NULL)
+	{
+		Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+		Trigger		trig;
+		Relation	rel;
+
+		/*
+		 * We only need to process constraints that are part of larger ones.
+		 */
+		if (!OidIsValid(constrForm->conparentid))
+			continue;
+
+		/* prevent data changes into the referencing table until commit */
+		rel = heap_open(constrForm->conrelid, ShareLock);
+
+		MemSet(&trig, 0, sizeof(trig));
+		trig.tgoid = InvalidOid;
+		trig.tgname = NameStr(constrForm->conname);
+		trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
+		trig.tgisinternal = true;
+		trig.tgconstrrelid = RelationGetRelid(partition);
+		trig.tgconstrindid = constrForm->conindid;
+		trig.tgconstraint = constrForm->oid;
+		trig.tgdeferrable = false;
+		trig.tginitdeferred = false;
+		/* we needn't fill in remaining fields */
+
+		RI_Final_Check(&trig, rel, partition);
+
+		heap_close(rel, NoLock);
+	}
+
+	systable_endscan(scan);
+	heap_close(pg_constraint, AccessShareLock);
+}
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index b9aef07c1a..d7e03adeae 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -54,6 +54,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
+#include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
@@ -239,7 +240,7 @@ static void ri_ExtractValues(Relation rel, HeapTuple tup,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   HeapTuple violator, TupleDesc tupdesc,
-				   int queryno) pg_attribute_noreturn();
+				   int queryno, bool partgone) pg_attribute_noreturn();
 
 
 /* ----------
@@ -397,18 +398,22 @@ RI_FKey_check(TriggerData *trigdata)
 		char		paramname[16];
 		const char *querysep;
 		Oid			queryoids[RI_MAX_NUMKEYS];
+		const char *pk_only;
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * corresponding FK attributes.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (i = 0; i < riinfo->nkeys; i++)
 		{
@@ -532,19 +537,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
 		char		attname[MAX_QUOTED_NAME_LEN];
 		char		paramname[16];
 		const char *querysep;
+		const char *pk_only;
 		Oid			queryoids[RI_MAX_NUMKEYS];
 
 		/* ----------
 		 * The query string built is
-		 *	SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
+		 *	SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
 		 *		   FOR KEY SHARE OF x
 		 * The type id's for the $ parameters are those of the
 		 * PK attributes themselves.
 		 * ----------
 		 */
 		initStringInfo(&querybuf);
+		pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+			"" : "ONLY ";
 		quoteRelationName(pkrelname, pk_rel);
-		appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
+		appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+						 pk_only, pkrelname);
 		querysep = "WHERE";
 		for (i = 0; i < riinfo->nkeys; i++)
 		{
@@ -1710,6 +1719,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	RangeTblEntry *fkrte;
 	const char *sep;
 	const char *fk_only;
+	const char *pk_only;
 	int			i;
 	int			save_nestlevel;
 	char		workmembuf[32];
@@ -1769,7 +1779,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	/*----------
 	 * The query string built is:
 	 *	SELECT fk.keycols FROM [ONLY] relname fk
-	 *	 LEFT OUTER JOIN ONLY pkrelname pk
+	 *	 LEFT OUTER JOIN [ONLY] pkrelname pk
 	 *	 ON (pk.pkkeycol1=fk.keycol1 [AND ...])
 	 *	 WHERE pk.pkkeycol1 IS NULL AND
 	 * For MATCH SIMPLE:
@@ -1796,9 +1806,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	quoteRelationName(fkrelname, fk_rel);
 	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
 		"" : "ONLY ";
+	pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
 	appendStringInfo(&querybuf,
-					 " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
-					 fk_only, fkrelname, pkrelname);
+					 " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
+					 fk_only, fkrelname, pk_only, pkrelname);
 
 	strcpy(pkattname, "pk.");
 	strcpy(fkattname, "fk.");
@@ -1951,7 +1963,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 		ri_ReportViolation(&fake_riinfo,
 						   pk_rel, fk_rel,
 						   tuple, tupdesc,
-						   RI_PLAN_CHECK_LOOKUPPK);
+						   RI_PLAN_CHECK_LOOKUPPK, false);
 	}
 
 	if (SPI_finish() != SPI_OK_FINISH)
@@ -1965,6 +1977,179 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
 	return true;
 }
 
+/* ----------
+ * RI_Final_Check -
+ *
+ */
+void
+RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
+{
+	const RI_ConstraintInfo *riinfo;
+	StringInfoData querybuf;
+	char		pkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
+	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
+	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
+	const char *sep;
+	const char *fk_only;
+	int			spi_result;
+	SPIPlanPtr	qplan;
+	int			i;
+
+	riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
+
+	/* XXX handle the non-permission case?? */
+
+	/*----------
+	 * The query string built is:
+	 *  SELECT fk.keycols FROM [ONLY] relname fk
+	 *    JOIN pkrelname pk
+	 *    ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+	 *    WHERE (<partition constraint>) AND
+	 * For MATCH SIMPLE:
+	 *   (fk.keycol1 IS NOT NULL [AND ...])
+	 * For MATCH FULL:
+	 *   (fk.keycol1 IS NOT NULL [OR ...])
+	 *
+	 * We attach COLLATE clauses to the operators when comparing columns
+	 * that have different collations.
+	 *----------
+	 */
+	initStringInfo(&querybuf);
+	appendStringInfoString(&querybuf, "SELECT ");
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
+		sep = ", ";
+	}
+
+	quoteRelationName(pkrelname, pk_rel);
+	quoteRelationName(fkrelname, fk_rel);
+	fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+		"" : "ONLY ";
+	appendStringInfo(&querybuf,
+					 " FROM %s%s fk JOIN %s pk ON",
+					 fk_only, fkrelname, pkrelname);
+	strcpy(pkattname, "pk.");
+	strcpy(fkattname, "fk.");
+	sep = "(";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		Oid			pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+		Oid			pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
+		Oid			fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
+
+		quoteOneName(pkattname + 3,
+					 RIAttName(pk_rel, riinfo->pk_attnums[i]));
+		quoteOneName(fkattname + 3,
+					 RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		ri_GenerateQual(&querybuf, sep,
+						pkattname, pk_type,
+						riinfo->pf_eq_oprs[i],
+						fkattname, fk_type);
+		if (pk_coll != fk_coll)
+			ri_GenerateQualCollation(&querybuf, pk_coll);
+		sep = "AND";
+	}
+
+	appendStringInfo(&querybuf, ") WHERE %s AND (",
+					 pg_get_partconstrdef_string(RelationGetRelid(pk_rel),
+												 "pk"));
+
+	sep = "";
+	for (i = 0; i < riinfo->nkeys; i++)
+	{
+		quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
+		appendStringInfo(&querybuf,
+						 "%sfk.%s IS NOT NULL",
+						 sep, fkattname);
+		switch (riinfo->confmatchtype)
+		{
+			case FKCONSTR_MATCH_SIMPLE:
+				sep = " AND ";
+				break;
+			case FKCONSTR_MATCH_FULL:
+				sep = " OR ";
+				break;
+			case FKCONSTR_MATCH_PARTIAL:
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("MATCH PARTIAL not yet implemented")));
+				break;
+			default:
+				elog(ERROR, "unrecognized confmatchtype: %d",
+					 riinfo->confmatchtype);
+				break;
+		}
+	}
+	appendStringInfoChar(&querybuf, ')');
+
+	/*
+	 * RI_Initial_Check changes work_mem here.
+	 */
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	/*
+	 * Generate the plan.  We don't need to cache it, and there are no
+	 * arguments to the plan.
+	 */
+	qplan = SPI_prepare(querybuf.data, 0, NULL);
+
+	if (qplan == NULL)
+		elog(ERROR, "SPI_prepare returned %s for %s",
+			 SPI_result_code_string(SPI_result), querybuf.data);
+
+	/*
+	 * Run the plan.  For safety we force a current snapshot to be used. (In
+	 * transaction-snapshot mode, this arguably violates transaction isolation
+	 * rules, but we really haven't got much choice.) We don't need to
+	 * register the snapshot, because SPI_execute_snapshot will see to it. We
+	 * need at most one tuple returned, so pass limit = 1.
+	 */
+	spi_result = SPI_execute_snapshot(qplan,
+									  NULL, NULL,
+									  GetLatestSnapshot(),
+									  InvalidSnapshot,
+									  true, false, 1);
+
+	/* Check result */
+	if (spi_result != SPI_OK_SELECT)
+		elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
+
+	/* Did we find a tuple that would violate the constraint? */
+	if (SPI_processed > 0)
+	{
+		HeapTuple	tuple = SPI_tuptable->vals[0];
+		TupleDesc	tupdesc = SPI_tuptable->tupdesc;
+		RI_ConstraintInfo fake_riinfo;
+
+		/*
+		 * The columns to look at in the result tuple are 1..N, not whatever
+		 * they are in the fk_rel.  Hack up riinfo so that ri_ReportViolation
+		 * will behave properly.
+		 *
+		 * In addition to this, we have to pass the correct tupdesc to
+		 * ri_ReportViolation, overriding its normal habit of using the pk_rel
+		 * or fk_rel's tupdesc.
+		 */
+		memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
+		for (i = 0; i < fake_riinfo.nkeys; i++)
+			fake_riinfo.pk_attnums[i] = i + 1;
+
+		ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
+						   tuple, tupdesc, 0, true);
+	}
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+}
+
 
 /* ----------
  * Local functions below
@@ -2166,6 +2351,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 	/* Find or create a hashtable entry for the constraint */
 	riinfo = ri_LoadConstraintInfo(constraintOid);
 
+#if 0
 	/* Do some easy cross-checks against the trigger call data */
 	if (rel_is_pk)
 	{
@@ -2174,6 +2360,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
 			elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
 				 trigger->tgname, RelationGetRelationName(trig_rel));
 	}
+#endif
 
 	return riinfo;
 }
@@ -2479,7 +2666,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
 						   pk_rel, fk_rel,
 						   new_tuple ? new_tuple : old_tuple,
 						   NULL,
-						   qkey->constr_queryno);
+						   qkey->constr_queryno, false);
 
 	return SPI_processed != 0;
 }
@@ -2523,7 +2710,7 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 				   Relation pk_rel, Relation fk_rel,
 				   HeapTuple violator, TupleDesc tupdesc,
-				   int queryno)
+				   int queryno, bool partgone)
 {
 	StringInfoData key_names;
 	StringInfoData key_values;
@@ -2563,9 +2750,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 	 *
 	 * Check table-level permissions next and, failing that, column-level
 	 * privileges.
+	 *
+	 * When a partition at the referenced side is being detached/dropped, we
+	 * needn't check, since the user must be the table owner anyway.
 	 */
-
-	if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
+	if (partgone)
+		has_perm = true;
+	else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
 	{
 		aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
 		if (aclresult != ACLCHECK_OK)
@@ -2615,7 +2806,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
 		}
 	}
 
-	if (onfk)
+	if (partgone)
+		ereport(ERROR,
+				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+				 errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
+						RelationGetRelationName(pk_rel),
+						NameStr(riinfo->conname)),
+				 errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
+						   key_names.data, key_values.data,
+						   RelationGetRelationName(fk_rel))));
+	else if (onfk)
 		ereport(ERROR,
 				(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
 				 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 4857caecaa..b2deee6696 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1818,6 +1818,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
 }
 
 /*
+ * pg_get_partconstrdef_string
+ *
+ * Returns the partition constraint as a C-string for the input relation, with
+ * the given alias.  No pretty-printing.
+ */
+char *
+pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
+{
+	Expr	   *constr_expr;
+	List	   *context;
+
+	constr_expr = get_partition_qual_relid(partitionId);
+	context = deparse_context_for(aliasname, partitionId);
+
+	return deparse_expression((Node *) constr_expr, context, true, false);
+}
+
+/*
  * pg_get_constraintdef
  *
  * Returns the definition for the constraint, ie, everything that needs to
diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h
index 56a341a622..f7cf97554a 100644
--- a/src/include/catalog/heap.h
+++ b/src/include/catalog/heap.h
@@ -77,6 +77,8 @@ extern void heap_create_init_fork(Relation rel);
 
 extern void heap_drop_with_catalog(Oid relid);
 
+extern void pre_drop_class_check(Oid relationId, Oid objectSubId);
+
 extern void heap_truncate(List *relids);
 
 extern void heap_truncate_one_rel(Relation rel);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 2afcd5be44..350674bf67 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -52,6 +52,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
 
 extern void CheckTableNotInUse(Relation rel, const char *stmt);
 
+extern void CheckNoForeignKeyRefs(Relation partition, bool isDrop);
+
 extern void ExecuteTruncate(TruncateStmt *stmt);
 extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 					DropBehavior behavior, bool restart_seqs);
@@ -76,10 +78,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
 
 extern void check_of_type(HeapTuple typetuple);
 
-extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
-						 Constraint *fkconstraint, Oid constraintOid,
-						 Oid indexOid, bool create_action);
-
 extern void register_on_commit_action(Oid relid, OnCommitAction action);
 extern void remove_on_commit_action(Oid relid);
 
diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h
index 1031448c14..3183d8b14a 100644
--- a/src/include/commands/trigger.h
+++ b/src/include/commands/trigger.h
@@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
 							  HeapTuple old_row, HeapTuple new_row);
 extern bool RI_Initial_Check(Trigger *trigger,
 				 Relation fk_rel, Relation pk_rel);
+extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel);
 
 /* result values for RI_FKey_trigger_type: */
 #define RI_TRIGGER_PK	1		/* is a trigger on the PK relation */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 9f9b029ab8..dd8ea7861d 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
 extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
 
 extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
+extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
 
 extern char *pg_get_constraintdef_command(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 085c9aba11..66c083a8e1 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1431,19 +1431,6 @@ drop table pktable2, fktable2;
 --
 -- Foreign keys and partitioned tables
 --
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-ERROR:  cannot reference partitioned table "fk_partitioned_pk"
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1479,10 +1466,10 @@ DETAIL:  This feature is not yet supported on partitioned tables.
 -- these inserts, targeting both the partition directly as well as the
 -- partitioned table, should all fail
 INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501);
-ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey"
 DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501);
-ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey"
 DETAIL:  Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk".
 INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501);
 ERROR:  insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey"
@@ -1609,7 +1596,7 @@ INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503);
 -- this fails, because the defaults for the referencing table are not present
 -- in the referenced table:
 UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502;
-ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey"
+ERROR:  insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_3_a_fkey"
 DETAIL:  Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk".
 -- but inserting the row we can make it work:
 INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857);
@@ -1781,3 +1768,152 @@ INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
 ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
   FOR VALUES IN (1600);
 -- leave these tables around intentionally
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+create table fk3 partition of fk for values from (3500) to (5000);
+-- these should fail: referenced value not present
+insert into fk values (1);
+ERROR:  insert or update on table "fk1" violates foreign key constraint "fk1_a_fkey"
+DETAIL:  Key (a)=(1) is not present in table "pk".
+insert into fk values (1000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(1000) is not present in table "pk".
+insert into fk values (2000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(2000) is not present in table "pk".
+insert into fk values (3000);
+ERROR:  insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(3000) is not present in table "pk".
+insert into fk values (4000);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4000) is not present in table "pk".
+insert into fk values (4500);
+ERROR:  insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
+DETAIL:  Key (a)=(4500) is not present in table "pk".
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+-- should fail: referencing value present
+delete from pk where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+delete from pk where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+delete from pk where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+delete from pk where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+delete from pk where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+delete from pk where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+update pk set a = 2 where a = 1;
+ERROR:  update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
+DETAIL:  Key (a)=(1) is still referenced from table "fk".
+update pk set a = 1002 where a = 1000;
+ERROR:  update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
+DETAIL:  Key (a)=(1000) is still referenced from table "fk".
+update pk set a = 2002 where a = 2000;
+ERROR:  update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
+DETAIL:  Key (a)=(2000) is still referenced from table "fk".
+update pk set a = 3002 where a = 3000;
+ERROR:  update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
+DETAIL:  Key (a)=(3000) is still referenced from table "fk".
+update pk set a = 4002 where a = 4000;
+ERROR:  update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
+DETAIL:  Key (a)=(4000) is still referenced from table "fk".
+update pk set a = 4502 where a = 4500;
+ERROR:  update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
+DETAIL:  Key (a)=(4500) is still referenced from table "fk".
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+alter table droppk detach partition droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+alter table droppk detach partition droppk2;
+ERROR:  removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+alter table droppk2 detach partition droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+drop table droppk_d;
+ERROR:  removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
+DETAIL:  Key (a)=(2000) still referenced from table "dropfk".
+drop table droppk2_d;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk1;
+ERROR:  removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
+DETAIL:  Key (a)=(1) still referenced from table "dropfk".
+drop table droppk2;
+ERROR:  removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
+DETAIL:  Key (a)=(1500) still referenced from table "dropfk".
+drop table droppk21;
+ERROR:  removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
+DETAIL:  Key (a)=(1000) still referenced from table "dropfk".
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql
index 068ab2aab7..6416523b94 100644
--- a/src/test/regress/sql/foreign_key.sql
+++ b/src/test/regress/sql/foreign_key.sql
@@ -1071,18 +1071,6 @@ drop table pktable2, fktable2;
 -- Foreign keys and partitioned tables
 --
 
--- partitioned table in the referenced side are not allowed
-CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
-  PARTITION BY RANGE (a, b);
--- verify with create table first ...
-CREATE TABLE fk_notpartitioned_fk (a int, b int,
-  FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
--- and then with alter table.
-CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
-ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
-  REFERENCES fk_partitioned_pk;
-DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-
 -- Creation of a partitioned hierarchy with irregular definitions
 CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
   PRIMARY KEY (a, b));
@@ -1289,3 +1277,105 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
   FOR VALUES IN (1600);
 
 -- leave these tables around intentionally
+
+-- Test a partitioned table as referenced table.
+-- Verify basic functionality with a regular partition creation and a partition
+-- with a different column layout, as well as partitions
+-- added (created and attached) after creating the foreign key.
+create schema regress_fk;
+set search_path to regress_fk;
+
+create table pk (a int primary key) partition by range (a);
+create table pk1 partition of pk for values from (0) to (1000);
+create table pk2 (b int, a int);
+alter table pk2 drop column b;
+alter table pk2 alter a set not null;
+alter table pk attach partition pk2 for values from (1000) to (2000);
+
+create table fk (a int) partition by range (a);
+create table fk1 partition of fk for values from (0) to (750);
+alter table fk add foreign key (a) references pk;
+create table fk2 (b int, a int) ;
+alter table fk2 drop column b;
+alter table fk attach partition fk2 for values from (750) to (3500);
+
+create table pk3 partition of pk for values from (2000) to (3000);
+create table pk4 (like pk);
+alter table pk attach partition pk4 for values from (3000) to (4000);
+
+create table pk5 (like pk) partition by range (a);
+create table pk51 partition of pk5 for values from (4000) to (4500);
+create table pk52 partition of pk5 for values from (4500) to (5000);
+alter table pk attach partition pk5 for values from (4000) to (5000);
+
+create table fk3 partition of fk for values from (3500) to (5000);
+
+-- these should fail: referenced value not present
+insert into fk values (1);
+insert into fk values (1000);
+insert into fk values (2000);
+insert into fk values (3000);
+insert into fk values (4000);
+insert into fk values (4500);
+-- insert into the referenced table, now they should work
+insert into pk values (1), (1000), (2000), (3000), (4000), (4500);
+insert into fk values (1), (1000), (2000), (3000), (4000), (4500);
+
+-- should fail: referencing value present
+delete from pk where a = 1;
+delete from pk where a = 1000;
+delete from pk where a = 2000;
+delete from pk where a = 3000;
+delete from pk where a = 4000;
+delete from pk where a = 4500;
+update pk set a = 2 where a = 1;
+update pk set a = 1002 where a = 1000;
+update pk set a = 2002 where a = 2000;
+update pk set a = 3002 where a = 3000;
+update pk set a = 4002 where a = 4000;
+update pk set a = 4502 where a = 4500;
+-- now they should work
+delete from fk;
+update pk set a = 2 where a = 1;
+delete from pk where a = 2;
+update pk set a = 1002 where a = 1000;
+delete from pk where a = 1002;
+update pk set a = 2002 where a = 2000;
+delete from pk where a = 2002;
+update pk set a = 3002 where a = 3000;
+delete from pk where a = 3002;
+update pk set a = 4002 where a = 4000;
+delete from pk where a = 4002;
+update pk set a = 4502 where a = 4500;
+delete from pk where a = 4502;
+
+-- dropping/detaching partitions is prevented if that would break
+-- a foreign key's existing data
+create table droppk (a int primary key) partition by range (a);
+create table droppk1 partition of droppk for values from (0) to (1000);
+create table droppk_d partition of droppk default;
+create table droppk2 partition of droppk for values from (1000) to (2000)
+  partition by range (a);
+create table droppk21 partition of droppk2 for values from (1000) to (1400);
+create table droppk2_d partition of droppk2 default;
+insert into droppk values (1), (1000), (1500), (2000);
+create table dropfk (a int references droppk);
+insert into dropfk values (1), (1000), (1500), (2000);
+-- these should all fail
+alter table droppk detach partition droppk_d;
+alter table droppk2 detach partition droppk2_d;
+alter table droppk detach partition droppk1;
+alter table droppk detach partition droppk2;
+alter table droppk2 detach partition droppk21;
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+drop table droppk2;
+drop table droppk21;
+delete from dropfk;
+-- now they should all work
+drop table droppk_d;
+drop table droppk2_d;
+drop table droppk1;
+alter table droppk2 detach partition droppk21;
+drop table droppk2;
-- 
2.11.0

>From 87053b456eae36811d643e5ce8c1b03eae5b7091 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 22 Nov 2018 19:30:22 -0300
Subject: [PATCH 5/5] Have psql not display redundant FKs

---
 src/bin/psql/describe.c                   | 61 +++++++++++++++++++++++--------
 src/test/regress/expected/foreign_key.out | 16 ++++----
 2 files changed, 53 insertions(+), 24 deletions(-)

diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 0a181b01d9..1ed73afaa8 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2382,17 +2382,28 @@ describeOneTableDetails(const char *schemaname,
 		/*
 		 * Print foreign-key constraints (there are none if no triggers,
 		 * except if the table is partitioned, in which case the triggers
-		 * appear in the partitions)
+		 * appear in the partitions).  Note we put the constraints defined
+		 * in this table first, followed by the constraints defined in
+		 * ancestor partitioned tables. XXX maybe should display in hierarchy
+		 * order.
 		 */
 		if (tableinfo.hastriggers ||
 			tableinfo.relkind == RELKIND_PARTITIONED_TABLE)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT conname,\n"
-							  "  pg_catalog.pg_get_constraintdef(r.oid, true) as condef\n"
-							  "FROM pg_catalog.pg_constraint r\n"
-							  "WHERE r.conrelid = '%s' AND r.contype = 'f' ORDER BY 1;",
-							  oid);
+							  "WITH RECURSIVE constraints (oid, parent, conname, conrelid) as (\n"
+							  "   SELECT oid, conparentid, conname, conrelid\n"
+							  "     FROM pg_catalog.pg_constraint WHERE contype = 'f' AND conrelid = '%s'\n"
+							  "  UNION\n"
+							  "   SELECT pc.oid, pc.conparentid, pc.conname, pc.conrelid\n"
+							  "     FROM constraints, pg_constraint pc\n"
+							  "    WHERE pc.oid = constraints.parent\n"
+							  ") SELECT conrelid = '%s' as conislocal, conname,\n"
+							  "         pg_catalog.pg_get_constraintdef(oid), conrelid::pg_catalog.regclass\n"
+							  "    FROM constraints\n"
+							  "   WHERE parent = 0\n"
+							  "ORDER BY conislocal DESC, conname;",
+							  oid, oid);
 			result = PSQLexec(buf.data);
 			if (!result)
 				goto error_return;
@@ -2404,10 +2415,20 @@ describeOneTableDetails(const char *schemaname,
 				printTableAddFooter(&cont, _("Foreign-key constraints:"));
 				for (i = 0; i < tuples; i++)
 				{
-					/* untranslated constraint name and def */
-					printfPQExpBuffer(&buf, "    \"%s\" %s",
-									  PQgetvalue(result, i, 0),
-									  PQgetvalue(result, i, 1));
+					/*
+					 * Print untranslated constraint name and definition.
+					 * Use a "TABLE tab" prefix when the constraint is
+					 * defined in a parent partitioned table.
+					 */
+					if (strcmp(PQgetvalue(result, i, 0), "f") == 0)
+						printfPQExpBuffer(&buf, "    TABLE \"%s\" CONSTRAINT \"%s\" %s",
+										  PQgetvalue(result, i, 3),
+										  PQgetvalue(result, i, 1),
+										  PQgetvalue(result, i, 2));
+					else
+						printfPQExpBuffer(&buf, "    \"%s\" %s",
+										  PQgetvalue(result, i, 1),
+										  PQgetvalue(result, i, 2));
 
 					printTableAddFooter(&cont, buf.data);
 				}
@@ -2416,13 +2437,21 @@ describeOneTableDetails(const char *schemaname,
 		}
 
 		/* print incoming foreign-key references (none if no triggers) */
-		if (tableinfo.hastriggers)
+		if (tableinfo.hastriggers ||
+			tableinfo.relkind == RELKIND_PARTITIONED_TABLE)
 		{
 			printfPQExpBuffer(&buf,
-							  "SELECT conname, conrelid::pg_catalog.regclass,\n"
-							  "  pg_catalog.pg_get_constraintdef(c.oid, true) as condef\n"
-							  "FROM pg_catalog.pg_constraint c\n"
-							  "WHERE c.confrelid = '%s' AND c.contype = 'f' ORDER BY 1;",
+							  "WITH RECURSIVE constraints (oid, parent, conname, conrelid, confrelid) as (\n"
+							  "   SELECT oid, conparentid, conname, conrelid, confrelid\n"
+							  "     FROM pg_catalog.pg_constraint WHERE contype = 'f' AND confrelid = '%s'\n"
+							  "  UNION\n"
+							  "   SELECT pc.oid, pc.conparentid, pc.conname, pc.conrelid, pc.confrelid\n"
+							  "     FROM constraints, pg_constraint pc\n"
+							  "    WHERE pc.oid = constraints.parent\n"
+							  ") SELECT conrelid::pg_catalog.regclass, conname, pg_catalog.pg_get_constraintdef(oid)\n"
+							  "    FROM constraints\n"
+							  "   WHERE parent = 0\n"
+							  "ORDER BY conname;",
 							  oid);
 			result = PSQLexec(buf.data);
 			if (!result)
@@ -2436,8 +2465,8 @@ describeOneTableDetails(const char *schemaname,
 				for (i = 0; i < tuples; i++)
 				{
 					printfPQExpBuffer(&buf, "    TABLE \"%s\" CONSTRAINT \"%s\" %s",
-									  PQgetvalue(result, i, 1),
 									  PQgetvalue(result, i, 0),
+									  PQgetvalue(result, i, 1),
 									  PQgetvalue(result, i, 2));
 
 					printTableAddFooter(&cont, buf.data);
diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out
index 66c083a8e1..5de2449119 100644
--- a/src/test/regress/expected/foreign_key.out
+++ b/src/test/regress/expected/foreign_key.out
@@ -1667,7 +1667,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN
  a      | integer |           |          | 
 Partition of: fk_partitioned_fk FOR VALUES IN (1500, 1502)
 Foreign-key constraints:
-    "fk_partitioned_fk_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 
 DROP TABLE fk_partitioned_fk_2;
 CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a);
@@ -1687,7 +1687,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN
 Partition of: fk_partitioned_fk FOR VALUES IN (3500, 3502)
 Partition key: RANGE (b, a)
 Foreign-key constraints:
-    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 Number of partitions: 2 (Use \d+ to list them.)
 
 \d fk_partitioned_fk_4_1
@@ -1698,7 +1698,7 @@ Number of partitions: 2 (Use \d+ to list them.)
  b      | integer |           |          | 
 Partition of: fk_partitioned_fk_4 FOR VALUES FROM (1, 1) TO (100, 100)
 Foreign-key constraints:
-    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 
 -- this one has an FK with mismatched properties
 \d fk_partitioned_fk_4_2
@@ -1710,7 +1710,7 @@ Foreign-key constraints:
 Partition of: fk_partitioned_fk_4 FOR VALUES FROM (100, 100) TO (1000, 1000)
 Foreign-key constraints:
     "fk_partitioned_fk_4_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL
-    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 
 CREATE TABLE fk_partitioned_fk_5 (a int, b int,
 	FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE,
@@ -1734,7 +1734,7 @@ Partition key: RANGE (a)
 Foreign-key constraints:
     "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
     "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
-    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 Number of partitions: 1 (Use \d+ to list them.)
 
 -- verify that it works to reattaching a child with multiple candidate
@@ -1750,9 +1750,9 @@ ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUE
 Partition of: fk_partitioned_fk_5 FOR VALUES FROM (0) TO (10)
 Foreign-key constraints:
     "fk_partitioned_fk_5_1_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b)
-    "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
-    "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
-    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk_5" CONSTRAINT "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
+    TABLE "fk_partitioned_fk_5" CONSTRAINT "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
+    TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
 
 -- verify that attaching a table checks that the existing data satisfies the
 -- constraint
-- 
2.11.0

Reply via email to