Here's a more credible version of this patch series. 0001 refactors some duplicative code that interprets a pg_constraint row for a foreign key back into a Constraint node. Only what is necessary for current features is read.
0002 moves some code that I added in 3de241dba86f to src/backend/catalog/pg_constraint.c so that it appears in tablecmds.c instead. After developing the current patch I realized that the latter is its natural home. 0003 changes the shape of a loop in deleteObjectsInList, so that I can add object type-specific functions to be called before deleting an object. This is needed by 0004 and makes no sense on its own; I would probably push both together, but splitting it like this makes it very obvious what it is I'm doing. 0004 adds the feature itself In 0004 there's also a new function "index_get_partition" which allows to simplify some code I added in 8b08f7d4820f. I will probably split it up and commit separately because it's an obvious code beautify. 0005 modifies psql to show the constraint in a different way, which makes more sense overall (rather than show the "internal" constraint that concerns the partition, show the top-level contraint that concerns the ancestor table on which it is defined. This includes naming the table in which the constraint is defined). There's one half of 0005 that I think should be applied to pg11, because when partitions have foreign keys, the display looks a bit odd currently. The other half I would probably merge into 0004 instead of committing separately. Even if not backpatched, it makes sense to apply ahead of the rest because it changes expected output for a regression test. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 563447095c3884a00ecfd933027524e252cf35f7 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 12 Nov 2018 16:05:16 -0300 Subject: [PATCH 1/5] Refactor DeconstructConstraintRow --- src/backend/catalog/pg_constraint.c | 200 ++++++++++++++++++++---------------- src/backend/utils/adt/ri_triggers.c | 89 ++-------------- src/backend/utils/cache/relcache.c | 61 +---------- src/include/catalog/pg_constraint.h | 3 + 4 files changed, 125 insertions(+), 228 deletions(-) diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index a8194b02fa..f71d89ff17 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -446,14 +446,11 @@ static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, Relation partRel, List *clone, List **cloned) { - TupleDesc tupdesc; AttrNumber *attmap; List *partFKs; List *subclone = NIL; ListCell *cell; - tupdesc = RelationGetDescr(pg_constraint); - /* * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. @@ -483,9 +480,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, int nelem; ListCell *cell; int i; - ArrayType *arr; - Datum datum; - bool isnull; tuple = SearchSysCache1(CONSTROID, parentConstrOid); if (!tuple) @@ -502,93 +496,11 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); - datum = fastgetattr(tuple, Anum_pg_constraint_conkey, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conkey"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); - + DeconstructConstraintRow(tuple, &nelem, conkey, confkey, + conpfeqop, conppeqop, conffeqop); for (i = 0; i < nelem; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; - datum = fastgetattr(tuple, Anum_pg_constraint_confkey, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null confkey"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "confkey is not a 1-D smallint array"); - memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conppeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conppeqop is not a 1-D OID array"); - memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - - datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conffeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conffeqop is not a 1-D OID array"); - memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - /* * Before creating a new constraint, see whether any existing FKs are * fit for the purpose. If one is, attach the parent constraint to it, @@ -1531,6 +1443,114 @@ get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid) } /* + * Extract data from the pg_constraint row of a foreign-key constraint. + * + * The last three can be passed as NULL. + */ +void +DeconstructConstraintRow(HeapTuple tuple, int *numfks, + AttrNumber *conkey, AttrNumber *confkey, + Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs) +{ + Oid constrId; + Datum adatum; + bool isNull; + ArrayType *arr; + int numkeys; + + constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + + /* + * We expect the arrays to be 1-D arrays of the right types; verify that. + * We don't need to use deconstruct_array() since the array data is just + * going to look like a C array of values. + */ + adatum = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_conkey, &isNull); + if (isNull) + elog(ERROR, "null conkey for constraint %u", constrId); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + if (ARR_NDIM(arr) != 1 || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != INT2OID) + elog(ERROR, "conkey is not a 1-D smallint array"); + numkeys = ARR_DIMS(arr)[0]; + if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS) + elog(ERROR, "foreign key constraint cannot have %d columns", numkeys); + memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + + adatum = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_confkey, &isNull); + if (isNull) + elog(ERROR, "null confkey for constraint %u", constrId); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + if (ARR_NDIM(arr) != 1 || + ARR_DIMS(arr)[0] != numkeys || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != INT2OID) + elog(ERROR, "confkey is not a 1-D smallint array"); + memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + + if (pf_eq_oprs) + { + adatum = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_conpfeqop, &isNull); + if (isNull) + elog(ERROR, "null conpfeqop for constraint %u", constrId); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + /* see TryReuseForeignKey if you change the test below */ + if (ARR_NDIM(arr) != 1 || + ARR_DIMS(arr)[0] != numkeys || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conpfeqop is not a 1-D Oid array"); + memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + } + + if (pp_eq_oprs) + { + adatum = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_conppeqop, &isNull); + if (isNull) + elog(ERROR, "null conppeqop for constraint %u", constrId); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + if (ARR_NDIM(arr) != 1 || + ARR_DIMS(arr)[0] != numkeys || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conppeqop is not a 1-D Oid array"); + memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + } + + if (ff_eq_oprs) + { + adatum = SysCacheGetAttr(CONSTROID, tuple, + Anum_pg_constraint_conffeqop, &isNull); + if (isNull) + elog(ERROR, "null conffeqop for constraint %u", constrId); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + if (ARR_NDIM(arr) != 1 || + ARR_DIMS(arr)[0] != numkeys || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != OIDOID) + elog(ERROR, "conffeqop is not a 1-D Oid array"); + memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + } + + *numfks = numkeys; +} + +/* * Determine whether a relation can be proven functionally dependent on * a set of grouping columns. If so, return true and add the pg_constraint * OIDs of the constraints needed for the proof to the *constraintDeps list. diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index cdda860e73..b9aef07c1a 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -2188,10 +2188,6 @@ ri_LoadConstraintInfo(Oid constraintOid) bool found; HeapTuple tup; Form_pg_constraint conForm; - Datum adatum; - bool isNull; - ArrayType *arr; - int numkeys; /* * On the first call initialize the hashtable @@ -2233,84 +2229,13 @@ ri_LoadConstraintInfo(Oid constraintOid) riinfo->confdeltype = conForm->confdeltype; riinfo->confmatchtype = conForm->confmatchtype; - /* - * We expect the arrays to be 1-D arrays of the right types; verify that. - * We don't need to use deconstruct_array() since the array data is just - * going to look like a C array of values. - */ - adatum = SysCacheGetAttr(CONSTROID, tup, - Anum_pg_constraint_conkey, &isNull); - if (isNull) - elog(ERROR, "null conkey for constraint %u", constraintOid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - if (ARR_NDIM(arr) != 1 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - numkeys = ARR_DIMS(arr)[0]; - if (numkeys <= 0 || numkeys > RI_MAX_NUMKEYS) - elog(ERROR, "foreign key constraint cannot have %d columns", numkeys); - riinfo->nkeys = numkeys; - memcpy(riinfo->fk_attnums, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ - - adatum = SysCacheGetAttr(CONSTROID, tup, - Anum_pg_constraint_confkey, &isNull); - if (isNull) - elog(ERROR, "null confkey for constraint %u", constraintOid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - if (ARR_NDIM(arr) != 1 || - ARR_DIMS(arr)[0] != numkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "confkey is not a 1-D smallint array"); - memcpy(riinfo->pk_attnums, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ - - adatum = SysCacheGetAttr(CONSTROID, tup, - Anum_pg_constraint_conpfeqop, &isNull); - if (isNull) - elog(ERROR, "null conpfeqop for constraint %u", constraintOid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - /* see TryReuseForeignKey if you change the test below */ - if (ARR_NDIM(arr) != 1 || - ARR_DIMS(arr)[0] != numkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D Oid array"); - memcpy(riinfo->pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ - - adatum = SysCacheGetAttr(CONSTROID, tup, - Anum_pg_constraint_conppeqop, &isNull); - if (isNull) - elog(ERROR, "null conppeqop for constraint %u", constraintOid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - if (ARR_NDIM(arr) != 1 || - ARR_DIMS(arr)[0] != numkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conppeqop is not a 1-D Oid array"); - memcpy(riinfo->pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ - - adatum = SysCacheGetAttr(CONSTROID, tup, - Anum_pg_constraint_conffeqop, &isNull); - if (isNull) - elog(ERROR, "null conffeqop for constraint %u", constraintOid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - if (ARR_NDIM(arr) != 1 || - ARR_DIMS(arr)[0] != numkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conffeqop is not a 1-D Oid array"); - memcpy(riinfo->ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid)); - if ((Pointer) arr != DatumGetPointer(adatum)) - pfree(arr); /* free de-toasted copy, if any */ + DeconstructConstraintRow(tup, + &riinfo->nkeys, + riinfo->fk_attnums, + riinfo->pk_attnums, + riinfo->pf_eq_oprs, + riinfo->pp_eq_oprs, + riinfo->ff_eq_oprs); ReleaseSysCache(tup); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index c3071db1cd..1dda178c5a 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -4124,10 +4124,6 @@ RelationGetFKeyList(Relation relation) { Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup); ForeignKeyCacheInfo *info; - Datum adatum; - bool isnull; - ArrayType *arr; - int nelem; /* consider only foreign keys */ if (constraint->contype != CONSTRAINT_FOREIGN) @@ -4138,58 +4134,11 @@ RelationGetFKeyList(Relation relation) info->conrelid = constraint->conrelid; info->confrelid = constraint->confrelid; - /* Extract data from conkey field */ - adatum = fastgetattr(htup, Anum_pg_constraint_conkey, - conrel->rd_att, &isnull); - if (isnull) - elog(ERROR, "null conkey for rel %s", - RelationGetRelationName(relation)); - - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - - info->nkeys = nelem; - memcpy(info->conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); - - /* Likewise for confkey */ - adatum = fastgetattr(htup, Anum_pg_constraint_confkey, - conrel->rd_att, &isnull); - if (isnull) - elog(ERROR, "null confkey for rel %s", - RelationGetRelationName(relation)); - - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem != info->nkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "confkey is not a 1-D smallint array"); - - memcpy(info->confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); - - /* Likewise for conpfeqop */ - adatum = fastgetattr(htup, Anum_pg_constraint_conpfeqop, - conrel->rd_att, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop for rel %s", - RelationGetRelationName(relation)); - - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem != info->nkeys || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - - memcpy(info->conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); + DeconstructConstraintRow(htup, &info->nkeys, + info->conkey, + info->confkey, + info->conpfeqop, + NULL, NULL); /* Add FK's node to the result list */ result = lappend(result, info); diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index 0e4007389b..8405fe2eea 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -251,6 +251,9 @@ extern Oid get_relation_idx_constraint_oid(Oid relationId, Oid indexId); extern Bitmapset *get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid); +extern void DeconstructConstraintRow(HeapTuple tuple, int *numfks, + AttrNumber *conkey, AttrNumber *confkey, + Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs); extern bool check_functional_grouping(Oid relid, Index varno, Index varlevelsup, -- 2.11.0
>From eb68d572c815c34500f6ed720860f24386afe9cd Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 12 Nov 2018 16:18:45 -0300 Subject: [PATCH 2/5] move CloneForeignKeyConstraints to tablecmds.c --- src/backend/catalog/pg_constraint.c | 304 ----------------------------------- src/backend/commands/tablecmds.c | 306 ++++++++++++++++++++++++++++++++++++ src/include/catalog/pg_constraint.h | 3 - 3 files changed, 306 insertions(+), 307 deletions(-) diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index f71d89ff17..2833e45cff 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -39,10 +39,6 @@ #include "utils/tqual.h" -static void clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned); - - /* * CreateConstraintEntry * Create a constraint table entry. @@ -378,306 +374,6 @@ CreateConstraintEntry(const char *constraintName, } /* - * CloneForeignKeyConstraints - * Clone foreign keys from a partitioned table to a newly acquired - * partition. - * - * relationId is a partition of parentId, so we can be certain that it has the - * same columns with the same datatypes. The columns may be in different - * order, though. - * - * The *cloned list is appended ClonedConstraint elements describing what was - * created. - */ -void -CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) -{ - Relation pg_constraint; - Relation parentRel; - Relation rel; - ScanKeyData key; - SysScanDesc scan; - HeapTuple tuple; - List *clone = NIL; - - parentRel = heap_open(parentId, NoLock); /* already got lock */ - /* see ATAddForeignKeyConstraint about lock level */ - rel = heap_open(relationId, AccessExclusiveLock); - pg_constraint = heap_open(ConstraintRelationId, RowShareLock); - - /* Obtain the list of constraints to clone or attach */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(parentId)); - scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &key); - while ((tuple = systable_getnext(scan)) != NULL) - { - Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; - - clone = lappend_oid(clone, oid); - } - systable_endscan(scan); - - /* Do the actual work, recursing to partitions as needed */ - clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned); - - /* We're done. Clean up */ - heap_close(parentRel, NoLock); - heap_close(rel, NoLock); /* keep lock till commit */ - heap_close(pg_constraint, RowShareLock); -} - -/* - * clone_fk_constraints - * Recursive subroutine for CloneForeignKeyConstraints - * - * Clone the given list of FK constraints when a partition is attached. - * - * When cloning foreign keys to a partition, it may happen that equivalent - * constraints already exist in the partition for some of them. We can skip - * creating a clone in that case, and instead just attach the existing - * constraint to the one in the parent. - * - * This function recurses to partitions, if the new partition is partitioned; - * of course, only do this for FKs that were actually cloned. - */ -static void -clone_fk_constraints(Relation pg_constraint, Relation parentRel, - Relation partRel, List *clone, List **cloned) -{ - AttrNumber *attmap; - List *partFKs; - List *subclone = NIL; - ListCell *cell; - - /* - * The constraint key may differ, if the columns in the partition are - * different. This map is used to convert them. - */ - attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), - RelationGetDescr(parentRel), - gettext_noop("could not convert row type")); - - partFKs = copyObject(RelationGetFKeyList(partRel)); - - foreach(cell, clone) - { - Oid parentConstrOid = lfirst_oid(cell); - Form_pg_constraint constrForm; - HeapTuple tuple; - AttrNumber conkey[INDEX_MAX_KEYS]; - AttrNumber mapped_conkey[INDEX_MAX_KEYS]; - AttrNumber confkey[INDEX_MAX_KEYS]; - Oid conpfeqop[INDEX_MAX_KEYS]; - Oid conppeqop[INDEX_MAX_KEYS]; - Oid conffeqop[INDEX_MAX_KEYS]; - Constraint *fkconstraint; - bool attach_it; - Oid constrOid; - ObjectAddress parentAddr, - childAddr; - int nelem; - ListCell *cell; - int i; - - tuple = SearchSysCache1(CONSTROID, parentConstrOid); - if (!tuple) - elog(ERROR, "cache lookup failed for constraint %u", - parentConstrOid); - constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - - /* only foreign keys */ - if (constrForm->contype != CONSTRAINT_FOREIGN) - { - ReleaseSysCache(tuple); - continue; - } - - ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); - - DeconstructConstraintRow(tuple, &nelem, conkey, confkey, - conpfeqop, conppeqop, conffeqop); - for (i = 0; i < nelem; i++) - mapped_conkey[i] = attmap[conkey[i] - 1]; - - /* - * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive - * verification step and don't end up with a duplicate FK. This also - * means we don't consider this constraint when recursing to - * partitions. - */ - attach_it = false; - foreach(cell, partFKs) - { - ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) - { - attach_it = false; - continue; - } - for (i = 0; i < nelem; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* looks good! Attach this constraint */ - ConstraintSetParentConstraint(fk->conoid, constrForm->oid); - CommandCounterIncrement(); - attach_it = true; - break; - } - - /* - * If we attached to an existing constraint, there is no need to - * create a new one. In fact, there's no need to recurse for this - * constraint to partitions, either. - */ - if (attach_it) - { - ReleaseSysCache(tuple); - continue; - } - - constrOid = - CreateConstraintEntry(NameStr(constrForm->conname), - constrForm->connamespace, - CONSTRAINT_FOREIGN, - constrForm->condeferrable, - constrForm->condeferred, - constrForm->convalidated, - constrForm->oid, - RelationGetRelid(partRel), - mapped_conkey, - nelem, - nelem, - InvalidOid, /* not a domain constraint */ - constrForm->conindid, /* same index */ - constrForm->confrelid, /* same foreign rel */ - confkey, - conpfeqop, - conppeqop, - conffeqop, - nelem, - constrForm->confupdtype, - constrForm->confdeltype, - constrForm->confmatchtype, - NULL, - NULL, - NULL, - false, - 1, false, true); - subclone = lappend_oid(subclone, constrOid); - - ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); - recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); - - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); - fkconstraint->fk_upd_action = constrForm->confupdtype; - fkconstraint->fk_del_action = constrForm->confdeltype; - fkconstraint->deferrable = constrForm->condeferrable; - fkconstraint->initdeferred = constrForm->condeferred; - - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); - - if (cloned) - { - ClonedConstraint *newc; - - /* - * Feed back caller about the constraints we created, so that they - * can set up constraint verification. - */ - newc = palloc(sizeof(ClonedConstraint)); - newc->relid = RelationGetRelid(partRel); - newc->refrelid = constrForm->confrelid; - newc->conindid = constrForm->conindid; - newc->conid = constrOid; - newc->constraint = fkconstraint; - - *cloned = lappend(*cloned, newc); - } - - ReleaseSysCache(tuple); - } - - pfree(attmap); - list_free_deep(partFKs); - - /* - * If the partition is partitioned, recurse to handle any constraints that - * were cloned. - */ - if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && - subclone != NIL) - { - PartitionDesc partdesc = RelationGetPartitionDesc(partRel); - int i; - - for (i = 0; i < partdesc->nparts; i++) - { - Relation childRel; - - childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); - clone_fk_constraints(pg_constraint, - partRel, - childRel, - subclone, - cloned); - heap_close(childRel, NoLock); /* keep lock till commit */ - } - } -} - -/* * Test whether given name is currently used as a constraint name * for the given object (relation or domain). * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 843ed48aa7..31ca651213 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -412,6 +412,10 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, + List **cloned); +static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -7631,6 +7635,308 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* + * CloneForeignKeyConstraints + * Clone foreign keys from a partitioned table to a newly acquired + * partition. + * + * relationId is a partition of parentId, so we can be certain that it has the + * same columns with the same datatypes. The columns may be in different + * order, though. + * + * The *cloned list is appended ClonedConstraint elements describing what was + * created, for the purposes of validating the constraint in ALTER TABLE's + * Phase 3. + */ +static void +CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +{ + Relation pg_constraint; + Relation parentRel; + Relation rel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tuple; + List *clone = NIL; + + parentRel = heap_open(parentId, NoLock); /* already got lock */ + /* see ATAddForeignKeyConstraint about lock level */ + rel = heap_open(relationId, AccessExclusiveLock); + pg_constraint = heap_open(ConstraintRelationId, RowShareLock); + + /* Obtain the list of constraints to clone or attach */ + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(parentId)); + scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, + NULL, 1, &key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + + clone = lappend_oid(clone, oid); + } + systable_endscan(scan); + + /* Do the actual work, recursing to partitions as needed */ + CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + + /* We're done. Clean up */ + heap_close(parentRel, NoLock); + heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(pg_constraint, RowShareLock); +} + +/* + * CloneFkReferencing + * Recursive subroutine for CloneForeignKeyConstraints, referencing side + * + * Clone the given list of FK constraints when a partition is attached on the + * referencing side of those constraints. + * + * When cloning foreign keys to a partition, it may happen that equivalent + * constraints already exist in the partition for some of them. We can skip + * creating a clone in that case, and instead just attach the existing + * constraint to the one in the parent. + * + * This function recurses to partitions, if the new partition is partitioned; + * of course, only do this for FKs that were actually cloned. + */ +static void +CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned) +{ + AttrNumber *attmap; + List *partFKs; + List *subclone = NIL; + ListCell *cell; + + /* + * The constraint key may differ, if the columns in the partition are + * different. This map is used to convert them. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + + partFKs = copyObject(RelationGetFKeyList(partRel)); + + foreach(cell, clone) + { + Oid parentConstrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + HeapTuple tuple; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + bool attach_it; + Oid constrOid; + ObjectAddress parentAddr, + childAddr; + ListCell *cell; + int i; + + tuple = SearchSysCache1(CONSTROID, parentConstrOid); + if (!tuple) + elog(ERROR, "cache lookup failed for constraint %u", + parentConstrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* only foreign keys */ + if (constrForm->contype != CONSTRAINT_FOREIGN) + { + ReleaseSysCache(tuple); + continue; + } + + ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + + DeconstructConstraintRow(tuple, &numfks, conkey, confkey, + conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) + mapped_conkey[i] = attmap[conkey[i] - 1]; + + /* + * Before creating a new constraint, see whether any existing FKs are + * fit for the purpose. If one is, attach the parent constraint to it, + * and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK. This also + * means we don't consider this constraint when recursing to + * partitions. + */ + attach_it = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); + Form_pg_constraint partConstr; + HeapTuple partcontup; + + attach_it = true; + + /* + * Do some quick & easy initial checks. If any of these fail, we + * cannot use this constraint, but keep looking. + */ + if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + { + attach_it = false; + continue; + } + for (i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + attach_it = false; + break; + } + } + if (!attach_it) + continue; + + /* + * Looks good so far; do some more extensive checks. Presumably + * the check for 'convalidated' could be dropped, since we don't + * really care about that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != constrForm->condeferrable || + partConstr->condeferred != constrForm->condeferred || + partConstr->confupdtype != constrForm->confupdtype || + partConstr->confdeltype != constrForm->confdeltype || + partConstr->confmatchtype != constrForm->confmatchtype) + { + ReleaseSysCache(partcontup); + attach_it = false; + continue; + } + + ReleaseSysCache(partcontup); + + /* looks good! Attach this constraint */ + ConstraintSetParentConstraint(fk->conoid, parentConstrOid); + CommandCounterIncrement(); + attach_it = true; + break; + } + + /* + * If we attached to an existing constraint, there is no need to + * create a new one. In fact, there's no need to recurse for this + * constraint to partitions, either. + */ + if (attach_it) + { + ReleaseSysCache(tuple); + continue; + } + + constrOid = + CreateConstraintEntry(NameStr(constrForm->conname), + constrForm->connamespace, + CONSTRAINT_FOREIGN, + constrForm->condeferrable, + constrForm->condeferred, + constrForm->convalidated, + parentConstrOid, + RelationGetRelid(partRel), + mapped_conkey, + numfks, + numfks, + InvalidOid, /* not a domain constraint */ + constrForm->conindid, /* same index */ + constrForm->confrelid, /* same foreign rel */ + confkey, + conpfeqop, + conppeqop, + conffeqop, + numfks, + constrForm->confupdtype, + constrForm->confdeltype, + constrForm->confmatchtype, + NULL, + NULL, + NULL, + false, + 1, false, true); + subclone = lappend_oid(subclone, constrOid); + + ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); + recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + + createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, + constrOid, constrForm->conindid, false); + + if (cloned) + { + ClonedConstraint *newc; + + /* + * Feed back caller about the constraints we created, so that they + * can set up constraint verification. + */ + newc = palloc(sizeof(ClonedConstraint)); + newc->relid = RelationGetRelid(partRel); + newc->refrelid = constrForm->confrelid; + newc->conindid = constrForm->conindid; + newc->conid = constrOid; + newc->constraint = fkconstraint; + + *cloned = lappend(*cloned, newc); + } + + ReleaseSysCache(tuple); + } + + pfree(attmap); + list_free_deep(partFKs); + + /* + * If the partition is partitioned, recurse to handle any constraints that + * were cloned. + */ + if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + subclone != NIL) + { + PartitionDesc partdesc = RelationGetPartitionDesc(partRel); + int i; + + for (i = 0; i < partdesc->nparts; i++) + { + Relation childRel; + + childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); + CloneFkReferencing(pg_constraint, + partRel, + childRel, + subclone, + cloned); + heap_close(childRel, NoLock); /* keep lock till commit */ + } + } +} + +/* * ALTER TABLE ALTER CONSTRAINT * * Update the attributes of a constraint. diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index 8405fe2eea..8b2af36ba7 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -226,9 +226,6 @@ extern Oid CreateConstraintEntry(const char *constraintName, bool conNoInherit, bool is_internal); -extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId, - List **cloned); - extern void RemoveConstraintById(Oid conId); extern void RenameConstraintById(Oid conId, const char *newname); -- 2.11.0
>From b1024f44489f4d610e3658fa91642bf090692faf Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 28 Nov 2018 11:52:00 -0300 Subject: [PATCH 3/5] Rework deleteObjectsInList to allow objtype-specific checks This doesn't change any functionality yet. --- src/backend/catalog/dependency.c | 41 ++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 7dfa3278a5..c7d3b8a654 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -219,29 +219,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, int i; /* - * Keep track of objects for event triggers, if necessary. + * Invoke pre-deletion callbacks and keep track of objects for event + * triggers, if necessary. */ - if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) + for (i = 0; i < targetObjects->numrefs; i++) { - for (i = 0; i < targetObjects->numrefs; i++) + const ObjectAddress *thisobj = &targetObjects->refs[i]; + Oid objectClass = getObjectClass(thisobj); + + if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) { - const ObjectAddress *thisobj = &targetObjects->refs[i]; - const ObjectAddressExtra *extra = &targetObjects->extras[i]; - bool original = false; - bool normal = false; - - if (extra->flags & DEPFLAG_ORIGINAL) - original = true; - if (extra->flags & DEPFLAG_NORMAL) - normal = true; - if (extra->flags & DEPFLAG_REVERSE) - normal = true; - - if (EventTriggerSupportsObjectClass(getObjectClass(thisobj))) + if (EventTriggerSupportsObjectClass(objectClass)) { + bool original = false; + bool normal = false; + const ObjectAddressExtra *extra = &targetObjects->extras[i]; + + if (extra->flags & DEPFLAG_ORIGINAL) + original = true; + if (extra->flags & DEPFLAG_NORMAL || + extra->flags & DEPFLAG_REVERSE) + normal = true; + EventTriggerSQLDropAddObject(thisobj, original, normal); } } + + /* Invoke class-specific pre-deletion checks */ + switch (objectClass) + { + default: + break; + } } /* -- 2.11.0
>From 44300eb0dfdf2221ed8fffd200fc7d6b958d090e Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 30 Aug 2018 06:01:38 -0300 Subject: [PATCH 4/5] Support foreign key referencing partitioned tables --- src/backend/catalog/dependency.c | 3 + src/backend/catalog/heap.c | 18 + src/backend/catalog/pg_constraint.c | 15 +- src/backend/commands/tablecmds.c | 813 +++++++++++++++++++++++++----- src/backend/utils/adt/ri_triggers.c | 228 ++++++++- src/backend/utils/adt/ruleutils.c | 18 + src/include/catalog/heap.h | 2 + src/include/commands/tablecmds.h | 6 +- src/include/commands/trigger.h | 1 + src/include/utils/ruleutils.h | 1 + src/test/regress/expected/foreign_key.out | 168 +++++- src/test/regress/sql/foreign_key.sql | 114 ++++- 12 files changed, 1201 insertions(+), 186 deletions(-) diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index c7d3b8a654..35c648fb21 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -248,6 +248,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, /* Invoke class-specific pre-deletion checks */ switch (objectClass) { + case OCLASS_CLASS: + pre_drop_class_check(thisobj->objectId, thisobj->objectSubId); + break; default: break; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 11debaa780..55f9e58a39 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1788,6 +1788,24 @@ RemoveAttrDefaultById(Oid attrdefId) } /* + * XXX explain + */ +void +pre_drop_class_check(Oid relationId, Oid objectSubId) +{ + Relation relation; + + /* caller must hold strong lock already, if they're dropping */ + relation = relation_open(relationId, NoLock); + + if (relation->rd_rel->relkind == RELKIND_RELATION && + relation->rd_rel->relispartition) + CheckNoForeignKeyRefs(relation, true); + + relation_close(relation, NoLock); +} + +/* * heap_drop_with_catalog - removes specified relation from catalogs * * Note that this routine is not responsible for dropping objects that are diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 2833e45cff..1c5017eccc 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -303,10 +303,12 @@ CreateConstraintEntry(const char *constraintName, if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN) { /* - * Register normal dependency on the unique index that supports a - * foreign-key constraint. (Note: for indexes associated with unique - * or primary-key constraints, the dependency runs the other way, and - * is not made here.) + * Register dependency on the unique index that supports a foreign-key + * constraint. (Note: for indexes associated with unique or + * primary-key constraints, the dependency runs the other way, and is + * not made here.) For standalone constraints, this is a normal + * dependency; for a constraint that is implementation part of a + * larger one, it's internal-auto. */ ObjectAddress relobject; @@ -314,7 +316,10 @@ CreateConstraintEntry(const char *constraintName, relobject.objectId = indexRelId; relobject.objectSubId = 0; - recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL); + recordDependencyOn(&conobject, &relobject, + OidIsValid(parentConstrId) ? + DEPENDENCY_INTERNAL_AUTO : + DEPENDENCY_NORMAL); } if (foreignNKeys > 0) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 31ca651213..74109557ae 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -412,10 +412,28 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok); +static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok, + bool create_constraint); static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned); +static void CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint); static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, Relation partRel, List *clone, List **cloned); +static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -3407,7 +3425,8 @@ AlterTableGetLockLevel(List *cmds) /* * Removing constraints can affect SELECTs that have been - * optimised assuming the constraint holds true. + * optimised assuming the constraint holds true. See also + * CloneFkReferenced. */ case AT_DropConstraint: /* as DROP INDEX */ case AT_DropNotNull: /* may change some SQL plans */ @@ -6987,11 +7006,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, break; case CONSTR_FOREIGN: - /* - * Note that we currently never recurse for FK constraints, so the - * "recurse" flag is silently ignored. - * * Assign or validate constraint name */ if (newConstraint->conname) @@ -7170,6 +7185,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity checks for it. * We do permissions checks here, however. + * + * When the referenced or referencing tables (or both) are partitioned, + * multiple pg_constraint rows are required -- one for each partitioned table + * and each partition on each side (fortunately, not one for every combination + * thereof). We also need the appropriate triggers to be created on each leaf + * partition. */ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, @@ -7189,7 +7210,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, int numfks, numpks; Oid indexOid; - Oid constrOid; bool old_check_ok; ObjectAddress address; ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop); @@ -7207,12 +7227,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot reference partitioned table \"%s\"", - RelationGetRelationName(pkrel)))); - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { if (!recurse) @@ -7230,7 +7244,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errdetail("This feature is not yet supported on partitioned tables."))); } - if (pkrel->rd_rel->relkind != RELKIND_RELATION) + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("referenced relation \"%s\" is not a table", @@ -7440,8 +7455,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("foreign key constraint \"%s\" " - "cannot be implemented", + errmsg(fkconstraint->conname ? + "foreign key constraint \"%s\" cannot be implemented" : + "foreign key constraint cannot be implemented", fkconstraint->conname), errdetail("Key columns \"%s\" and \"%s\" " "are of incompatible types: %s and %s.", @@ -7529,15 +7545,160 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* + * Create all the constraint and trigger objects, recursing to partitions + * as necessary. First handle the referenced side. + */ + address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel, + indexOid, + InvalidOid, /* no parent constraint */ + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok); + + /* + * Now handle the referencing side. We don't need the topmost constraint + * on this side, because the constraint we created above fills that role; + * if this recurses for any partitions then more constraints are added. + */ + addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel, + indexOid, + address.objectId, + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + false); + + /* + * Close pk table, but keep lock until we've committed. + */ + heap_close(pkrel, NoLock); + + return address; +} + +/* + * Return the OID of the index, in the given partition, that is a child of the + * given index or InvalidOid if there isn't one. + * + * XXX maybe this should be in catalog/partition.c? Not for the moment only + * because all the callers are here ... + */ +static Oid +index_get_partition(Relation partition, Oid indexId) +{ + List *idxlist = RelationGetIndexList(partition); + ListCell *l; + + foreach(l, idxlist) + { + Oid partIdx = lfirst_oid(l); + HeapTuple tup; + Form_pg_class classForm; + bool ispartition; + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(partIdx)); + if (!tup) + elog(ERROR, "cache lookup failed for relation %u", partIdx); + classForm = (Form_pg_class) GETSTRUCT(tup); + ispartition = classForm->relispartition; + ReleaseSysCache(tup); + if (!ispartition) + continue; + if (get_partition_parent(lfirst_oid(l)) == indexId) + { + list_free(idxlist); + return partIdx; + } + } + + return InvalidOid; +} + +/* + * addFkRecurseReferenced + * recursive subroutine for ATAddForeignKeyConstraint, referenced side + * + * Create pg_constraint rows for the referenced side of the constraint, + * referencing the parent of the referencing side; also create action triggers + * on leaf partitions. If the table is partitioned, recurse to handle each + * partition. + * + * On constraint names: It's not possible in general to give all the cascaded + * constraints the same name, so we don't try. (Also, even when it *is* + * possible, it goes counter to the SQL-standard rule that constraint names + * must be unique within a schema.) Therefore we apply the given name to the + * top-most constraint, and use generated names when cascading to partitions + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the root referencing relation. + * pkrel is the referenced relation; might be a partition, if recursing. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstraint is the OID of a parent constraint; InvalidOid if this is a + * top-level constraint. + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + */ +static ObjectAddress +addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok) +{ + ObjectAddress address; + Oid constrOid; + char *conname; + + /* + * Verify relkind for each referenced partition. At the top level, this + * is redundant with a previous check, but we need it when recursing. + */ + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)))); + + /* + * Caller supplies us with a constraint name; however, it may be + * used in this partition, so come up with a different one in that + * case. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(rel), + strVal(linitial(fkconstraint->fk_attrs)), + "fkey", + RelationGetNamespace(rel), NIL); + else + conname = fkconstraint->conname; + + /* * Record the FK constraint in pg_constraint. */ - constrOid = CreateConstraintEntry(fkconstraint->conname, + constrOid = CreateConstraintEntry(conname, RelationGetNamespace(rel), CONSTRAINT_FOREIGN, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, - parentConstr, + parentConstraint, RelationGetRelid(rel), fkattnum, numfks, @@ -7549,7 +7710,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, pfeqoperators, ppeqoperators, ffeqoperators, - numpks, + numfks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, @@ -7562,79 +7723,262 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, false); /* is_internal */ ObjectAddressSet(address, ConstraintRelationId, constrOid); - /* - * Create the triggers that will enforce the constraint. We only want the - * action triggers to appear for the parent partitioned relation, even - * though the constraints also exist below. - */ - createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid, !recursing); + /* make new constraint visible, in case we add more */ + CommandCounterIncrement(); /* - * Tell Phase 3 to check that the constraint is satisfied by existing - * rows. We can skip this during table creation, when requested explicitly - * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're - * recreating a constraint following a SET DATA TYPE operation that did - * not impugn its validity. + * If the referenced table is a plain relation, create the action triggers + * that enforce the constraint. */ - if (!old_check_ok && !fkconstraint->skip_validation) + if (pkrel->rd_rel->relkind == RELKIND_RELATION) { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->refindid = indexOid; - newcon->conid = constrOid; - newcon->qual = (Node *) fkconstraint; - - tab->constraints = lappend(tab->constraints, newcon); + createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel), + fkconstraint, + constrOid, indexOid); } /* - * When called on a partitioned table, recurse to create the constraint on - * the partitions also. + * If the referenced table is partitioned, recurse on ourselves to handle + * each partition. We need one pg_constraint row created for each + * partition in addition to the pg_constraint row for the parent table. */ - if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - PartitionDesc partdesc; + PartitionDesc pd = RelationGetPartitionDesc(pkrel); + int i; - partdesc = RelationGetPartitionDesc(rel); - - for (i = 0; i < partdesc->nparts; i++) + for (i = 0; i < pd->nparts; i++) { - Oid partitionId = partdesc->oids[i]; - Relation partition = heap_open(partitionId, lockmode); - AlteredTableInfo *childtab; - ObjectAddress childAddr; + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_pkattnum; + Oid partIndexId; - CheckTableNotInUse(partition, "ALTER TABLE"); + partRel = heap_open(pd->oids[i], ShareRowExclusiveLock); - /* Find or create work queue entry for this table */ - childtab = ATGetQueueEntry(wqueue, partition); + /* + * Map the attribute numbers in the referenced side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel), + gettext_noop("could not convert row type")); + if (map) + { + int j; - childAddr = - ATAddForeignKeyConstraint(wqueue, childtab, partition, - fkconstraint, constrOid, - recurse, true, lockmode); + mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); + for (j = 0; j < numfks; j++) + mapped_pkattnum[j] = map[pkattnum[j] - 1]; + } + else + mapped_pkattnum = pkattnum; - /* Record this constraint as dependent on the parent one */ - recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO); + /* do the deed */ + partIndexId = index_get_partition(partRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partRel)); + addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel, + partIndexId, constrOid, numfks, + mapped_pkattnum, fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok); - heap_close(partition, NoLock); + /* Done -- clean up (but keep the lock) */ + heap_close(partRel, NoLock); + if (map) + { + pfree(mapped_pkattnum); + pfree(map); + } } } - /* - * Close pk table, but keep lock until we've committed. - */ - heap_close(pkrel, NoLock); - return address; } /* + * addFkRecurseReferencing + * recursive subroutine for ATAddForeignKeyConstraint, referencing side + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the referencing relation; might be a partition, if recursing. + * pkrel is the root referenced relation. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstr is the OID of the parent constraint (there is always one). + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + * create_constraint indicates whether to create a constraint; if false, the + * given parentConstr is the origin of all others. + * + * Note we never try to use the constraint name assigned in fkconstraint. + */ +static void +addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok, bool create_constraint) +{ + Oid constrOid; /* uninitialized */ + char *conname; + + AssertArg(OidIsValid(parentConstr)); + + /* + * Verify relkind for each referencing partition. At the top level, this + * is redundant with a previous check, but we need it when recursing. + */ + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)))); + + if (!create_constraint) + constrOid = parentConstr; + else + { + /* Figure out a good constraint name to use */ + conname = ChooseConstraintName(RelationGetRelationName(rel), + strVal(linitial(fkconstraint->fk_attrs)), + "fkey", + RelationGetNamespace(rel), NIL); + + /* + * Record the FK constraint in pg_constraint. + */ + constrOid = CreateConstraintEntry(conname, + RelationGetNamespace(rel), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + fkconstraint->initially_valid, + parentConstr, + RelationGetRelid(rel), + fkattnum, + numfks, + numfks, + InvalidOid, + indexOid, + RelationGetRelid(pkrel), + pkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + numfks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, + NULL, /* no exclusion constraint */ + NULL, /* no check constraint */ + NULL, + true, /* islocal */ + 0, /* inhcount */ + true, /* isnoinherit */ + false); /* is_internal */ + /* make constraint visible */ + CommandCounterIncrement(); + } + + /* + * If the referencing relation is a plain table, add the check triggers to + * it and, if necessary, schedule it to be checked in Phase 3. + * + * If the relation is partitioned, drill down to do it to its partitions. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + createForeignKeyCheckTriggers(RelationGetRelid(rel), + RelationGetRelid(pkrel), + fkconstraint, + constrOid, + indexOid); + + /* + * Tell Phase 3 to check that the constraint is satisfied by existing + * rows. We can skip this during table creation, when requested + * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command, + * and when we're recreating a constraint following a SET DATA TYPE + * operation that did not impugn its validity. + */ + if (!old_check_ok && !fkconstraint->skip_validation) + { + NewConstraint *newcon; + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->conname; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->refindid = indexOid; + newcon->conid = constrOid; + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } + } + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc pd; + int i; + + pd = RelationGetPartitionDesc(rel); + for (i = 0; i < pd->nparts; i++) + { + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_fkattnum; + + partRel = heap_open(pd->oids[i], ShareRowExclusiveLock); + + /* + * Map the attribute numbers in the referencing side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(rel), + gettext_noop("could not convert row type")); + if (map) + { + int j; + + mapped_fkattnum = palloc(sizeof(AttrNumber) * numfks); + for (j = 0; j < numfks; j++) + mapped_fkattnum[j] = map[fkattnum[j] - 1]; + } + else + mapped_fkattnum = fkattnum; + + /* down the rabbit hole */ + addFkRecurseReferencing(wqueue, fkconstraint, partRel, pkrel, + indexOid, constrOid, numfks, pkattnum, + mapped_fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok, true); + + /* Done -- clean up (but keep the lock) */ + heap_close(partRel, NoLock); + if (map) + { + pfree(mapped_fkattnum); + pfree(map); + } + } + } +} + +/* * CloneForeignKeyConstraints * Clone foreign keys from a partitioned table to a newly acquired * partition. @@ -7663,7 +8007,14 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) rel = heap_open(relationId, AccessExclusiveLock); pg_constraint = heap_open(ConstraintRelationId, RowShareLock); - /* Obtain the list of constraints to clone or attach */ + /* + * Clone constraints where the parent is in the referenced side. + */ + CloneFkReferenced(parentRel, rel, pg_constraint); + + /* + * Now search for constraints where the parent is in the referencing side. + */ ScanKeyInit(&key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(parentId)); @@ -7680,13 +8031,160 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) /* Do the actual work, recursing to partitions as needed */ CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); - /* We're done. Clean up */ + list_free(clone); + + /* We're done. Clean up, keeping locks till commit */ heap_close(parentRel, NoLock); - heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(rel, NoLock); heap_close(pg_constraint, RowShareLock); } /* + * CloneFkReferenced + * Subroutine for CloneForeignKeyConstraints, referenced side + * + * Clone the FKs that reference the parent relation. Used when partitionRel + * is created/attached. (Recursion to partitions is effected by callee + * addFkRecurseReferenced, so this routine is not itself recursive.) + */ +static void +CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint) +{ + AttrNumber *attmap; + ListCell *cell; + SysScanDesc scan; + ScanKeyData key[2]; + HeapTuple tuple; + List *clone = NIL; + + /* + * Search for any constraints where this partition is in the referenced + * side. However, we must ignore any constraint whose parent constraint + * is also going to be cloned, to avoid duplicates. So do it in two + * steps: first construct the list of constraints to clone, then go over + * that list cloning those whose parents are not in the list. (We must + * not rely on the parent being seen first, since catalog order could + * return children first.) + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + /* This is a seqscan, as we don't have a usable index ... */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* ignore this constraint if the parent is already on the list */ + if (list_member_oid(clone, constrForm->conparentid)) + continue; + + clone = lappend_oid(clone, constrForm->oid); + } + systable_endscan(scan); + + foreach(cell, clone) + { + Oid constrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + Form_pg_attribute att; + Relation fkRel; + Oid indexOid; + Oid partIndexId; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* skip children whose parents are going to be cloned, as above */ + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } + + /* + * Because we're only expanding the key space at the referenced side, + * we don't need to prevent any operation in the referencing table, so + * AccessShareLock suffices (assumes that dropping the constraint + * acquires AEL). + */ + fkRel = heap_open(constrForm->conrelid, AccessShareLock); + + indexOid = constrForm->conindid; + DeconstructConstraintRow(tuple, + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap[confkey[i] - 1]; + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->initially_valid = true; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + + /* + * This is a bit grotty: we only need the first column name, which is + * used to generate the constraint name. + */ + att = TupleDescAttr(RelationGetDescr(partitionRel), conkey[0] - 1); + fkconstraint->fk_attrs = list_make1(makeString(NameStr(att->attname))); + + /* + * Add the new foreign key constraint pointing to the new partition. + * Because this new partition appears in the referenced side of the + * constraint, we don't need to set up for Phase 3 check. + */ + partIndexId = index_get_partition(partitionRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partitionRel)); + addFkRecurseReferenced(NULL, + fkconstraint, + fkRel, + partitionRel, + partIndexId, + constrOid, + numfks, + mapped_confkey, + conkey, + conpfeqop, + conppeqop, + conffeqop, + true); + + heap_close(fkRel, NoLock); + ReleaseSysCache(tuple); + } +} + +/* * CloneFkReferencing * Recursive subroutine for CloneForeignKeyConstraints, referencing side * @@ -7752,18 +8250,24 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, ReleaseSysCache(tuple); continue; } + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); DeconstructConstraintRow(tuple, &numfks, conkey, confkey, conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; /* * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive + * fit for the purpose. If one is, attach the parent constraint to + * it, and don't clone anything. This way we avoid the expensive * verification step and don't end up with a duplicate FK. This also * means we don't consider this constraint when recursing to * partitions. @@ -7884,8 +8388,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, fkconstraint->deferrable = constrForm->condeferrable; fkconstraint->initdeferred = constrForm->condeferred; - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); + /* If this is a plain relation, create the check triggers */ + if (partRel->rd_rel->relkind == RELKIND_RELATION) + createForeignKeyCheckTriggers(RelationGetRelid(partRel), + constrForm->confrelid, + fkconstraint, constrOid, + constrForm->conindid); if (cloned) { @@ -7921,6 +8429,9 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, PartitionDesc partdesc = RelationGetPartitionDesc(partRel); int i; + /* make previously created constraints visible */ + CommandCounterIncrement(); + for (i = 0; i < partdesc->nparts; i++) { Relation childRel; @@ -8968,37 +9479,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, } /* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. - */ -void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid, bool create_action) -{ - /* - * For the referenced side, create action triggers, if requested. (If the - * referencing side is partitioned, there is still only one trigger, which - * runs on the referenced side and points to the top of the referencing - * hierarchy.) - */ - if (create_action) - createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, - indexOid); - - /* - * For the referencing side, create the check triggers. We only need - * these on the partitions. - */ - if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, - fkconstraint, constraintOid, indexOid); - - CommandCounterIncrement(); -} - -/* * ALTER TABLE DROP CONSTRAINT * * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. @@ -14848,6 +15328,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name) partRel = heap_openrv(name, AccessShareLock); + /* Ensure that foreign keys still hold after this detach */ + CheckNoForeignKeyRefs(partRel, false); + /* All inheritance related checks are performed within the function */ RemoveInheritance(partRel, rel); @@ -15164,36 +15647,17 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl) { - Relation pg_inherits; - ScanKeyData key; - HeapTuple tuple; - SysScanDesc scan; + Oid existingIdx; - pg_inherits = heap_open(InheritsRelationId, AccessShareLock); - ScanKeyInit(&key, Anum_pg_inherits_inhparent, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(parentIdx))); - scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true, - NULL, 1, &key); - while (HeapTupleIsValid(tuple = systable_getnext(scan))) - { - Form_pg_inherits inhForm; - Oid tab; - - inhForm = (Form_pg_inherits) GETSTRUCT(tuple); - tab = IndexGetRelation(inhForm->inhrelid, false); - if (tab == RelationGetRelid(partitionTbl)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", - RelationGetRelationName(partIdx), - RelationGetRelationName(parentIdx)), - errdetail("Another index is already attached for partition \"%s\".", - RelationGetRelationName(partitionTbl)))); - } - - systable_endscan(scan); - heap_close(pg_inherits, AccessShareLock); + existingIdx = index_get_partition(partitionTbl, RelationGetRelid(parentIdx)); + if (OidIsValid(existingIdx)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("Another index is already attached for partition \"%s\".", + RelationGetRelationName(partitionTbl)))); } /* @@ -15324,3 +15788,82 @@ update_relispartition(Relation classRel, Oid relationId, bool newval) if (opened) heap_close(classRel, RowExclusiveLock); } + +/* + * During an operation that removes a partition from a partitioned table, + * verify that any foreign keys pointing to the partitioned table would not + * become invalid. (Used for ALTER TABLE ... DETACH as well as DROP). + * An error raised if any referenced values exist. + */ +void +CheckNoForeignKeyRefs(Relation partition, bool isDrop) +{ + Relation pg_constraint; + HeapTuple tuple; + SysScanDesc scan; + ScanKeyData key[2]; + + /* + * Quick exit if there are no indexes. + */ + if (RelationGetIndexList(partition) == NIL) + return; + + /* + * In the DROP case, we can skip this check when this is a partitioned + * partition, because its partitions will go through this also, and we'd + * run the check twice uselessly. + * + * In the DETACH case, this is only called for the top-level relation, + * so we must run it nevertheless. + */ + if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + /* Search for constraints referencing this table */ + pg_constraint = heap_open(ConstraintRelationId, AccessShareLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + + /* XXX This is a seqscan, as we don't have a usable index */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + Trigger trig; + Relation rel; + + /* + * We only need to process constraints that are part of larger ones. + */ + if (!OidIsValid(constrForm->conparentid)) + continue; + + /* prevent data changes into the referencing table until commit */ + rel = heap_open(constrForm->conrelid, ShareLock); + + MemSet(&trig, 0, sizeof(trig)); + trig.tgoid = InvalidOid; + trig.tgname = NameStr(constrForm->conname); + trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN; + trig.tgisinternal = true; + trig.tgconstrrelid = RelationGetRelid(partition); + trig.tgconstrindid = constrForm->conindid; + trig.tgconstraint = constrForm->oid; + trig.tgdeferrable = false; + trig.tginitdeferred = false; + /* we needn't fill in remaining fields */ + + RI_Final_Check(&trig, rel, partition); + + heap_close(rel, NoLock); + } + + systable_endscan(scan); + heap_close(pg_constraint, AccessShareLock); +} diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index b9aef07c1a..d7e03adeae 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -54,6 +54,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -239,7 +240,7 @@ static void ri_ExtractValues(Relation rel, HeapTuple tup, static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, HeapTuple violator, TupleDesc tupdesc, - int queryno) pg_attribute_noreturn(); + int queryno, bool partgone) pg_attribute_noreturn(); /* ---------- @@ -397,18 +398,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -532,19 +537,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -1710,6 +1719,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int i; int save_nestlevel; char workmembuf[32]; @@ -1769,7 +1779,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1796,9 +1806,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -1951,7 +1963,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, tuple, tupdesc, - RI_PLAN_CHECK_LOOKUPPK); + RI_PLAN_CHECK_LOOKUPPK, false); } if (SPI_finish() != SPI_OK_FINISH) @@ -1965,6 +1977,179 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) return true; } +/* ---------- + * RI_Final_Check - + * + */ +void +RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) +{ + const RI_ConstraintInfo *riinfo; + StringInfoData querybuf; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char fkrelname[MAX_QUOTED_REL_NAME_LEN]; + char pkattname[MAX_QUOTED_NAME_LEN + 3]; + char fkattname[MAX_QUOTED_NAME_LEN + 3]; + const char *sep; + const char *fk_only; + int spi_result; + SPIPlanPtr qplan; + int i; + + riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); + + /* XXX handle the non-permission case?? */ + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM [ONLY] relname fk + * JOIN pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE (<partition constraint>) AND + * For MATCH SIMPLE: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + * + * We attach COLLATE clauses to the operators when comparing columns + * that have different collations. + *---------- + */ + initStringInfo(&querybuf); + appendStringInfoString(&querybuf, "SELECT "); + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname); + sep = ", "; + } + + quoteRelationName(pkrelname, pk_rel); + quoteRelationName(fkrelname, fk_rel); + fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + appendStringInfo(&querybuf, + " FROM %s%s fk JOIN %s pk ON", + fk_only, fkrelname, pkrelname); + strcpy(pkattname, "pk."); + strcpy(fkattname, "fk."); + sep = "("; + for (i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); + Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]); + Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]); + + quoteOneName(pkattname + 3, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + quoteOneName(fkattname + 3, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + ri_GenerateQual(&querybuf, sep, + pkattname, pk_type, + riinfo->pf_eq_oprs[i], + fkattname, fk_type); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&querybuf, pk_coll); + sep = "AND"; + } + + appendStringInfo(&querybuf, ") WHERE %s AND (", + pg_get_partconstrdef_string(RelationGetRelid(pk_rel), + "pk")); + + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, + "%sfk.%s IS NOT NULL", + sep, fkattname); + switch (riinfo->confmatchtype) + { + case FKCONSTR_MATCH_SIMPLE: + sep = " AND "; + break; + case FKCONSTR_MATCH_FULL: + sep = " OR "; + break; + case FKCONSTR_MATCH_PARTIAL: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MATCH PARTIAL not yet implemented"))); + break; + default: + elog(ERROR, "unrecognized confmatchtype: %d", + riinfo->confmatchtype); + break; + } + } + appendStringInfoChar(&querybuf, ')'); + + /* + * RI_Initial_Check changes work_mem here. + */ + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querybuf.data, 0, NULL); + + if (qplan == NULL) + elog(ERROR, "SPI_prepare returned %s for %s", + SPI_result_code_string(SPI_result), querybuf.data); + + /* + * Run the plan. For safety we force a current snapshot to be used. (In + * transaction-snapshot mode, this arguably violates transaction isolation + * rules, but we really haven't got much choice.) We don't need to + * register the snapshot, because SPI_execute_snapshot will see to it. We + * need at most one tuple returned, so pass limit = 1. + */ + spi_result = SPI_execute_snapshot(qplan, + NULL, NULL, + GetLatestSnapshot(), + InvalidSnapshot, + true, false, 1); + + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + + /* Did we find a tuple that would violate the constraint? */ + if (SPI_processed > 0) + { + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + RI_ConstraintInfo fake_riinfo; + + /* + * The columns to look at in the result tuple are 1..N, not whatever + * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation + * will behave properly. + * + * In addition to this, we have to pass the correct tupdesc to + * ri_ReportViolation, overriding its normal habit of using the pk_rel + * or fk_rel's tupdesc. + */ + memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo)); + for (i = 0; i < fake_riinfo.nkeys; i++) + fake_riinfo.pk_attnums[i] = i + 1; + + ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, + tuple, tupdesc, 0, true); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + /* ---------- * Local functions below @@ -2166,6 +2351,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) /* Find or create a hashtable entry for the constraint */ riinfo = ri_LoadConstraintInfo(constraintOid); +#if 0 /* Do some easy cross-checks against the trigger call data */ if (rel_is_pk) { @@ -2174,6 +2360,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"", trigger->tgname, RelationGetRelationName(trig_rel)); } +#endif return riinfo; } @@ -2479,7 +2666,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, pk_rel, fk_rel, new_tuple ? new_tuple : old_tuple, NULL, - qkey->constr_queryno); + qkey->constr_queryno, false); return SPI_processed != 0; } @@ -2523,7 +2710,7 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, HeapTuple violator, TupleDesc tupdesc, - int queryno) + int queryno, bool partgone) { StringInfoData key_names; StringInfoData key_values; @@ -2563,9 +2750,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, * * Check table-level permissions next and, failing that, column-level * privileges. + * + * When a partition at the referenced side is being detached/dropped, we + * needn't check, since the user must be the table owner anyway. */ - - if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) + if (partgone) + has_perm = true; + else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) { aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT); if (aclresult != ACLCHECK_OK) @@ -2615,7 +2806,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, } } - if (onfk) + if (partgone) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(pk_rel), + NameStr(riinfo->conname)), + errdetail("Key (%s)=(%s) still referenced from table \"%s\".", + key_names.data, key_values.data, + RelationGetRelationName(fk_rel)))); + else if (onfk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 4857caecaa..b2deee6696 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -1818,6 +1818,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS) } /* + * pg_get_partconstrdef_string + * + * Returns the partition constraint as a C-string for the input relation, with + * the given alias. No pretty-printing. + */ +char * +pg_get_partconstrdef_string(Oid partitionId, char *aliasname) +{ + Expr *constr_expr; + List *context; + + constr_expr = get_partition_qual_relid(partitionId); + context = deparse_context_for(aliasname, partitionId); + + return deparse_expression((Node *) constr_expr, context, true, false); +} + +/* * pg_get_constraintdef * * Returns the definition for the constraint, ie, everything that needs to diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 56a341a622..f7cf97554a 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -77,6 +77,8 @@ extern void heap_create_init_fork(Relation rel); extern void heap_drop_with_catalog(Oid relid); +extern void pre_drop_class_check(Oid relationId, Oid objectSubId); + extern void heap_truncate(List *relids); extern void heap_truncate_one_rel(Relation rel); diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 2afcd5be44..350674bf67 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -52,6 +52,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void CheckTableNotInUse(Relation rel, const char *stmt); +extern void CheckNoForeignKeyRefs(Relation partition, bool isDrop); + extern void ExecuteTruncate(TruncateStmt *stmt); extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs); @@ -76,10 +78,6 @@ extern void find_composite_type_dependencies(Oid typeOid, extern void check_of_type(HeapTuple typetuple); -extern void createForeignKeyTriggers(Relation rel, Oid refRelOid, - Constraint *fkconstraint, Oid constraintOid, - Oid indexOid, bool create_action); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 1031448c14..3183d8b14a 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel, HeapTuple old_row, HeapTuple new_row); extern bool RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); +extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); /* result values for RI_FKey_trigger_type: */ #define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h index 9f9b029ab8..dd8ea7861d 100644 --- a/src/include/utils/ruleutils.h +++ b/src/include/utils/ruleutils.h @@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid); extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty); extern char *pg_get_partkeydef_columns(Oid relid, bool pretty); +extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname); extern char *pg_get_constraintdef_command(Oid constraintId); extern char *deparse_expression(Node *expr, List *dpcontext, diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index 085c9aba11..66c083a8e1 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -1431,19 +1431,6 @@ drop table pktable2, fktable2; -- -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); -ERROR: cannot reference partitioned table "fk_partitioned_pk" --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -ERROR: cannot reference partitioned table "fk_partitioned_pk" -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1479,10 +1466,10 @@ DETAIL: This feature is not yet supported on partitioned tables. -- these inserts, targeting both the partition directly as well as the -- partitioned table, should all fail INSERT INTO fk_partitioned_fk (a,b) VALUES (500, 501); -ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey" DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk". INSERT INTO fk_partitioned_fk_1 (a,b) VALUES (500, 501); -ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_1" violates foreign key constraint "fk_partitioned_fk_1_a_fkey" DETAIL: Key (a, b)=(500, 501) is not present in table "fk_notpartitioned_pk". INSERT INTO fk_partitioned_fk (a,b) VALUES (1500, 1501); ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_fkey" @@ -1609,7 +1596,7 @@ INSERT INTO fk_partitioned_fk_3 (a, b) VALUES (2502, 2503); -- this fails, because the defaults for the referencing table are not present -- in the referenced table: UPDATE fk_notpartitioned_pk SET a = 1500 WHERE a = 2502; -ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_a_fkey" +ERROR: insert or update on table "fk_partitioned_fk_3" violates foreign key constraint "fk_partitioned_fk_3_a_fkey" DETAIL: Key (a, b)=(2501, 142857) is not present in table "fk_notpartitioned_pk". -- but inserting the row we can make it work: INSERT INTO fk_notpartitioned_pk VALUES (2501, 142857); @@ -1781,3 +1768,152 @@ INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601); ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1600); -- leave these tables around intentionally +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); +create table fk3 partition of fk for values from (3500) to (5000); +-- these should fail: referenced value not present +insert into fk values (1); +ERROR: insert or update on table "fk1" violates foreign key constraint "fk1_a_fkey" +DETAIL: Key (a)=(1) is not present in table "pk". +insert into fk values (1000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1000) is not present in table "pk". +insert into fk values (2000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(2000) is not present in table "pk". +insert into fk values (3000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(3000) is not present in table "pk". +insert into fk values (4000); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4000) is not present in table "pk". +insert into fk values (4500); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4500) is not present in table "pk". +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); +-- should fail: referencing value present +delete from pk where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +delete from pk where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +delete from pk where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +delete from pk where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +delete from pk where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +delete from pk where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +update pk set a = 2 where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +update pk set a = 1002 where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +update pk set a = 2002 where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +update pk set a = 3002 where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +update pk set a = 4002 where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +update pk set a = 4502 where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +alter table droppk detach partition droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +alter table droppk detach partition droppk2; +ERROR: removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +drop table droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +drop table droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +drop table droppk2; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql index 068ab2aab7..6416523b94 100644 --- a/src/test/regress/sql/foreign_key.sql +++ b/src/test/regress/sql/foreign_key.sql @@ -1071,18 +1071,6 @@ drop table pktable2, fktable2; -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; - -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1289,3 +1277,105 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1600); -- leave these tables around intentionally + +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; + +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); + +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); + +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); + +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); + +create table fk3 partition of fk for values from (3500) to (5000); + +-- these should fail: referenced value not present +insert into fk values (1); +insert into fk values (1000); +insert into fk values (2000); +insert into fk values (3000); +insert into fk values (4000); +insert into fk values (4500); +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); + +-- should fail: referencing value present +delete from pk where a = 1; +delete from pk where a = 1000; +delete from pk where a = 2000; +delete from pk where a = 3000; +delete from pk where a = 4000; +delete from pk where a = 4500; +update pk set a = 2 where a = 1; +update pk set a = 1002 where a = 1000; +update pk set a = 2002 where a = 2000; +update pk set a = 3002 where a = 3000; +update pk set a = 4002 where a = 4000; +update pk set a = 4502 where a = 4500; +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; + +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +alter table droppk2 detach partition droppk2_d; +alter table droppk detach partition droppk1; +alter table droppk detach partition droppk2; +alter table droppk2 detach partition droppk21; +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +drop table droppk2; +drop table droppk21; +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; -- 2.11.0
>From 87053b456eae36811d643e5ce8c1b03eae5b7091 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 22 Nov 2018 19:30:22 -0300 Subject: [PATCH 5/5] Have psql not display redundant FKs --- src/bin/psql/describe.c | 61 +++++++++++++++++++++++-------- src/test/regress/expected/foreign_key.out | 16 ++++---- 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 0a181b01d9..1ed73afaa8 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2382,17 +2382,28 @@ describeOneTableDetails(const char *schemaname, /* * Print foreign-key constraints (there are none if no triggers, * except if the table is partitioned, in which case the triggers - * appear in the partitions) + * appear in the partitions). Note we put the constraints defined + * in this table first, followed by the constraints defined in + * ancestor partitioned tables. XXX maybe should display in hierarchy + * order. */ if (tableinfo.hastriggers || tableinfo.relkind == RELKIND_PARTITIONED_TABLE) { printfPQExpBuffer(&buf, - "SELECT conname,\n" - " pg_catalog.pg_get_constraintdef(r.oid, true) as condef\n" - "FROM pg_catalog.pg_constraint r\n" - "WHERE r.conrelid = '%s' AND r.contype = 'f' ORDER BY 1;", - oid); + "WITH RECURSIVE constraints (oid, parent, conname, conrelid) as (\n" + " SELECT oid, conparentid, conname, conrelid\n" + " FROM pg_catalog.pg_constraint WHERE contype = 'f' AND conrelid = '%s'\n" + " UNION\n" + " SELECT pc.oid, pc.conparentid, pc.conname, pc.conrelid\n" + " FROM constraints, pg_constraint pc\n" + " WHERE pc.oid = constraints.parent\n" + ") SELECT conrelid = '%s' as conislocal, conname,\n" + " pg_catalog.pg_get_constraintdef(oid), conrelid::pg_catalog.regclass\n" + " FROM constraints\n" + " WHERE parent = 0\n" + "ORDER BY conislocal DESC, conname;", + oid, oid); result = PSQLexec(buf.data); if (!result) goto error_return; @@ -2404,10 +2415,20 @@ describeOneTableDetails(const char *schemaname, printTableAddFooter(&cont, _("Foreign-key constraints:")); for (i = 0; i < tuples; i++) { - /* untranslated constraint name and def */ - printfPQExpBuffer(&buf, " \"%s\" %s", - PQgetvalue(result, i, 0), - PQgetvalue(result, i, 1)); + /* + * Print untranslated constraint name and definition. + * Use a "TABLE tab" prefix when the constraint is + * defined in a parent partitioned table. + */ + if (strcmp(PQgetvalue(result, i, 0), "f") == 0) + printfPQExpBuffer(&buf, " TABLE \"%s\" CONSTRAINT \"%s\" %s", + PQgetvalue(result, i, 3), + PQgetvalue(result, i, 1), + PQgetvalue(result, i, 2)); + else + printfPQExpBuffer(&buf, " \"%s\" %s", + PQgetvalue(result, i, 1), + PQgetvalue(result, i, 2)); printTableAddFooter(&cont, buf.data); } @@ -2416,13 +2437,21 @@ describeOneTableDetails(const char *schemaname, } /* print incoming foreign-key references (none if no triggers) */ - if (tableinfo.hastriggers) + if (tableinfo.hastriggers || + tableinfo.relkind == RELKIND_PARTITIONED_TABLE) { printfPQExpBuffer(&buf, - "SELECT conname, conrelid::pg_catalog.regclass,\n" - " pg_catalog.pg_get_constraintdef(c.oid, true) as condef\n" - "FROM pg_catalog.pg_constraint c\n" - "WHERE c.confrelid = '%s' AND c.contype = 'f' ORDER BY 1;", + "WITH RECURSIVE constraints (oid, parent, conname, conrelid, confrelid) as (\n" + " SELECT oid, conparentid, conname, conrelid, confrelid\n" + " FROM pg_catalog.pg_constraint WHERE contype = 'f' AND confrelid = '%s'\n" + " UNION\n" + " SELECT pc.oid, pc.conparentid, pc.conname, pc.conrelid, pc.confrelid\n" + " FROM constraints, pg_constraint pc\n" + " WHERE pc.oid = constraints.parent\n" + ") SELECT conrelid::pg_catalog.regclass, conname, pg_catalog.pg_get_constraintdef(oid)\n" + " FROM constraints\n" + " WHERE parent = 0\n" + "ORDER BY conname;", oid); result = PSQLexec(buf.data); if (!result) @@ -2436,8 +2465,8 @@ describeOneTableDetails(const char *schemaname, for (i = 0; i < tuples; i++) { printfPQExpBuffer(&buf, " TABLE \"%s\" CONSTRAINT \"%s\" %s", - PQgetvalue(result, i, 1), PQgetvalue(result, i, 0), + PQgetvalue(result, i, 1), PQgetvalue(result, i, 2)); printTableAddFooter(&cont, buf.data); diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index 66c083a8e1..5de2449119 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -1667,7 +1667,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN a | integer | | | Partition of: fk_partitioned_fk FOR VALUES IN (1500, 1502) Foreign-key constraints: - "fk_partitioned_fk_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DROP TABLE fk_partitioned_fk_2; CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a); @@ -1687,7 +1687,7 @@ ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN Partition of: fk_partitioned_fk FOR VALUES IN (3500, 3502) Partition key: RANGE (b, a) Foreign-key constraints: - "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE Number of partitions: 2 (Use \d+ to list them.) \d fk_partitioned_fk_4_1 @@ -1698,7 +1698,7 @@ Number of partitions: 2 (Use \d+ to list them.) b | integer | | | Partition of: fk_partitioned_fk_4 FOR VALUES FROM (1, 1) TO (100, 100) Foreign-key constraints: - "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE -- this one has an FK with mismatched properties \d fk_partitioned_fk_4_2 @@ -1710,7 +1710,7 @@ Foreign-key constraints: Partition of: fk_partitioned_fk_4 FOR VALUES FROM (100, 100) TO (1000, 1000) Foreign-key constraints: "fk_partitioned_fk_4_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL - "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE CREATE TABLE fk_partitioned_fk_5 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE, @@ -1734,7 +1734,7 @@ Partition key: RANGE (a) Foreign-key constraints: "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE - "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE Number of partitions: 1 (Use \d+ to list them.) -- verify that it works to reattaching a child with multiple candidate @@ -1750,9 +1750,9 @@ ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUE Partition of: fk_partitioned_fk_5 FOR VALUES FROM (0) TO (10) Foreign-key constraints: "fk_partitioned_fk_5_1_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) - "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE - "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE - "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk_5" CONSTRAINT "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE + TABLE "fk_partitioned_fk_5" CONSTRAINT "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE + TABLE "fk_partitioned_fk" CONSTRAINT "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE -- verify that attaching a table checks that the existing data satisfies the -- constraint -- 2.11.0