Here's another rebase. The code is structured somewhat differently from the previous one, mostly because of the backpatched bugfixes, but also because of the changes in dependency handling.
I think there's also at least one more bugfix to backpatch (included in 0003 here), related to whether foreign tables are allowed as having FKs, but AFAICS it just cosmetic: you get an error if you have a foreign partition and add a FK, but it says "you cannot have constraint triggers" rather than "FKs are not supported", so it's a bit odd. 0003 also includes Amit L's patch to remove the parentConstraint argument from ATExecAddForeignKey, which I suppose I should commit separately. Odd bugs fixed: a) handle the case where the default partition is the only one and it's being detached or dropped. b) when a partition was dropped/detached from the referenced side, the child constraint was left in place. I have not yet fixed the "#if 0" section. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 127a752e859e72bdfeadb31e260389b0b0f28e8a Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 28 Nov 2018 11:52:00 -0300 Subject: [PATCH v4 1/3] Rework deleteObjectsInList to allow objtype-specific checks This doesn't change any functionality yet. --- src/backend/catalog/dependency.c | 41 +++++++++++++++++++------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 2048d71535b..0b4c47b808c 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -230,29 +230,38 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, int i; /* - * Keep track of objects for event triggers, if necessary. + * Invoke pre-deletion callbacks and keep track of objects for event + * triggers, if necessary. */ - if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) + for (i = 0; i < targetObjects->numrefs; i++) { - for (i = 0; i < targetObjects->numrefs; i++) + const ObjectAddress *thisobj = &targetObjects->refs[i]; + Oid objectClass = getObjectClass(thisobj); + + if (trackDroppedObjectsNeeded() && !(flags & PERFORM_DELETION_INTERNAL)) { - const ObjectAddress *thisobj = &targetObjects->refs[i]; - const ObjectAddressExtra *extra = &targetObjects->extras[i]; - bool original = false; - bool normal = false; - - if (extra->flags & DEPFLAG_ORIGINAL) - original = true; - if (extra->flags & DEPFLAG_NORMAL) - normal = true; - if (extra->flags & DEPFLAG_REVERSE) - normal = true; - - if (EventTriggerSupportsObjectClass(getObjectClass(thisobj))) + if (EventTriggerSupportsObjectClass(objectClass)) { + bool original = false; + bool normal = false; + const ObjectAddressExtra *extra = &targetObjects->extras[i]; + + if (extra->flags & DEPFLAG_ORIGINAL) + original = true; + if (extra->flags & DEPFLAG_NORMAL || + extra->flags & DEPFLAG_REVERSE) + normal = true; + EventTriggerSQLDropAddObject(thisobj, original, normal); } } + + /* Invoke class-specific pre-deletion checks */ + switch (objectClass) + { + default: + break; + } } /* -- 2.17.1
>From ce57c2ca7fa500c6a08d26d332c1c5d94e7d6b02 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 22 Jan 2019 18:00:31 -0300 Subject: [PATCH v4 2/3] index_get_partition --- src/backend/catalog/partition.c | 35 ++++++++++++++++++++++++++++ src/backend/commands/tablecmds.c | 40 +++++++++----------------------- src/include/catalog/partition.h | 1 + 3 files changed, 47 insertions(+), 29 deletions(-) diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c index 3ccdaff8c45..f9282587f8b 100644 --- a/src/backend/catalog/partition.c +++ b/src/backend/catalog/partition.c @@ -145,6 +145,41 @@ get_partition_ancestors_worker(Relation inhRel, Oid relid, List **ancestors) get_partition_ancestors_worker(inhRel, parentOid, ancestors); } +/* + * Return the OID of the index, in the given partition, that is a child of the + * given index or InvalidOid if there isn't one. + */ +Oid +index_get_partition(Relation partition, Oid indexId) +{ + List *idxlist = RelationGetIndexList(partition); + ListCell *l; + + foreach(l, idxlist) + { + Oid partIdx = lfirst_oid(l); + HeapTuple tup; + Form_pg_class classForm; + bool ispartition; + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(partIdx)); + if (!tup) + elog(ERROR, "cache lookup failed for relation %u", partIdx); + classForm = (Form_pg_class) GETSTRUCT(tup); + ispartition = classForm->relispartition; + ReleaseSysCache(tup); + if (!ispartition) + continue; + if (get_partition_parent(lfirst_oid(l)) == indexId) + { + list_free(idxlist); + return partIdx; + } + } + + return InvalidOid; +} + /* * map_partition_varattnos - maps varattno of any Vars in expr from the * attno's of 'from_rel' to the attno's of 'to_rel' partition, each of which diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 35bdb0e0c6f..4da5aafecf5 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -15453,36 +15453,18 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl) { - Relation pg_inherits; - ScanKeyData key; - HeapTuple tuple; - SysScanDesc scan; + Oid existingIdx; - pg_inherits = table_open(InheritsRelationId, AccessShareLock); - ScanKeyInit(&key, Anum_pg_inherits_inhparent, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(parentIdx))); - scan = systable_beginscan(pg_inherits, InheritsParentIndexId, true, - NULL, 1, &key); - while (HeapTupleIsValid(tuple = systable_getnext(scan))) - { - Form_pg_inherits inhForm; - Oid tab; - - inhForm = (Form_pg_inherits) GETSTRUCT(tuple); - tab = IndexGetRelation(inhForm->inhrelid, false); - if (tab == RelationGetRelid(partitionTbl)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", - RelationGetRelationName(partIdx), - RelationGetRelationName(parentIdx)), - errdetail("Another index is already attached for partition \"%s\".", - RelationGetRelationName(partitionTbl)))); - } - - systable_endscan(scan); - table_close(pg_inherits, AccessShareLock); + existingIdx = index_get_partition(partitionTbl, + RelationGetRelid(parentIdx)); + if (OidIsValid(existingIdx)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", + RelationGetRelationName(partIdx), + RelationGetRelationName(parentIdx)), + errdetail("Another index is already attached for partition \"%s\".", + RelationGetRelationName(partitionTbl)))); } /* diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h index d84e3259835..616e18af308 100644 --- a/src/include/catalog/partition.h +++ b/src/include/catalog/partition.h @@ -21,6 +21,7 @@ extern Oid get_partition_parent(Oid relid); extern List *get_partition_ancestors(Oid relid); +extern Oid index_get_partition(Relation partition, Oid indexId); extern List *map_partition_varattnos(List *expr, int fromrel_varno, Relation to_rel, Relation from_rel, bool *found_whole_row); -- 2.17.1
>From e4338bf1310db26b22bb78ad1df6a0d56d3e2a78 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 20 Feb 2019 15:08:20 -0300 Subject: [PATCH v4 3/3] support FKs referencing partitioned tables --- doc/src/sgml/ref/create_table.sgml | 7 +- src/backend/catalog/dependency.c | 3 + src/backend/catalog/heap.c | 24 + src/backend/commands/tablecmds.c | 1020 ++++++++++++++++----- src/backend/utils/adt/ri_triggers.c | 237 ++++- src/backend/utils/adt/ruleutils.c | 18 + src/include/catalog/heap.h | 2 + src/include/commands/tablecmds.h | 8 +- src/include/commands/trigger.h | 1 + src/include/utils/ruleutils.h | 1 + src/test/regress/expected/foreign_key.out | 164 +++- src/test/regress/sql/foreign_key.sql | 114 ++- 12 files changed, 1330 insertions(+), 269 deletions(-) diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index 22dbc07b238..2802ac14f67 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -378,9 +378,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM <para> Partitioned tables do not support <literal>EXCLUDE</literal> constraints; however, you can define these constraints on individual partitions. - Also, while it's possible to define <literal>PRIMARY KEY</literal> - constraints on partitioned tables, creating foreign keys that - reference a partitioned table is not yet supported. </para> <para> @@ -995,9 +992,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM addition of a foreign key constraint requires a <literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table. Note that foreign key constraints cannot be defined between temporary - tables and permanent tables. Also note that while it is possible to - define a foreign key on a partitioned table, it is not possible to - declare a foreign key that references a partitioned table. + tables and permanent tables. </para> <para> diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 0b4c47b808c..11ec9d2f853 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -259,6 +259,9 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel, /* Invoke class-specific pre-deletion checks */ switch (objectClass) { + case OCLASS_CLASS: + pre_drop_class_check(thisobj->objectId, thisobj->objectSubId); + break; default: break; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 7dba4e50ddb..effe95849af 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1804,6 +1804,30 @@ RemoveAttrDefaultById(Oid attrdefId) relation_close(myrel, NoLock); } +/* + * Checks to be run before just dropping a relation. + */ +void +pre_drop_class_check(Oid relationId, Oid objectSubId) +{ + Relation relation; + + /* caller must hold strong lock already, if they're dropping */ + relation = relation_open(relationId, NoLock); + + /* + * For leaf partitions, this is our last chance to verify any foreign keys + * that may point to the partition as referenced table. + */ + if (relation->rd_rel->relkind == RELKIND_RELATION && + relation->rd_rel->relispartition) + CheckNoForeignKeyRefs(relation, + GetParentedForeignKeyRefs(relation), + true); + + relation_close(relation, NoLock); +} + /* * heap_drop_with_catalog - removes specified relation from catalogs * diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 4da5aafecf5..35ecc2700a3 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -408,10 +408,33 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok); +static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, + Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr, int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok, + LOCKMODE lockmode); static void CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned); +static void CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint); static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, Relation partRel, List *clone, List **cloned); +static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, + Oid indexOid); +static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, int numfks, + AttrNumber *mapped_conkey, AttrNumber *confkey, + Oid *conpfeqop); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -3469,7 +3492,8 @@ AlterTableGetLockLevel(List *cmds) /* * Removing constraints can affect SELECTs that have been - * optimised assuming the constraint holds true. + * optimised assuming the constraint holds true. See also + * CloneFkReferenced. */ case AT_DropConstraint: /* as DROP INDEX */ case AT_DropNotNull: /* may change some SQL plans */ @@ -7051,11 +7075,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, break; case CONSTR_FOREIGN: - /* - * Note that we currently never recurse for FK constraints, so the - * "recurse" flag is silently ignored. - * * Assign or validate constraint name */ if (newConstraint->conname) @@ -7234,6 +7254,12 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity checks for it. * We do permissions checks here, however. + * + * When the referenced or referencing tables (or both) are partitioned, + * multiple pg_constraint rows are required -- one for each partitioned table + * and each partition on each side (fortunately, not one for every combination + * thereof). We also need the appropriate triggers to be created on each leaf + * partition. */ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, @@ -7249,12 +7275,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Oid pfeqoperators[INDEX_MAX_KEYS]; Oid ppeqoperators[INDEX_MAX_KEYS]; Oid ffeqoperators[INDEX_MAX_KEYS]; - bool connoinherit; int i; int numfks, numpks; Oid indexOid; - Oid constrOid; bool old_check_ok; ObjectAddress address; ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop); @@ -7272,12 +7296,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, * Validity checks (permission checks wait till we have the column * numbers) */ - if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot reference partitioned table \"%s\"", - RelationGetRelationName(pkrel)))); - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { if (!recurse) @@ -7295,7 +7313,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errdetail("This feature is not yet supported on partitioned tables."))); } - if (pkrel->rd_rel->relkind != RELKIND_RELATION) + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("referenced relation \"%s\" is not a table", @@ -7505,8 +7524,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("foreign key constraint \"%s\" " - "cannot be implemented", + errmsg(fkconstraint->conname ? + "foreign key constraint \"%s\" cannot be implemented" : + "foreign key constraint cannot be implemented", fkconstraint->conname), errdetail("Key columns \"%s\" and \"%s\" " "are of incompatible types: %s and %s.", @@ -7594,21 +7614,136 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* - * FKs always inherit for partitioned tables, and never for legacy - * inheritance. + * Create all the constraint and trigger objects, recursing to partitions + * as necessary. First handle the referenced side. */ - connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel, + indexOid, + InvalidOid, /* no parent constraint */ + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok); + + /* Now handle the referencing side. */ + addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel, + indexOid, + address.objectId, + numfks, + pkattnum, + fkattnum, + pfeqoperators, + ppeqoperators, + ffeqoperators, + old_check_ok, + lockmode); + + /* + * Close pk table, but keep lock until we've committed. + */ + table_close(pkrel, NoLock); + + return address; +} + +/* + * addFkRecurseReferenced + * recursive subroutine for ATAddForeignKeyConstraint, referenced side + * + * Create pg_constraint rows for the referenced side of the constraint, + * referencing the parent of the referencing side; also create action triggers + * on leaf partitions. If the table is partitioned, recurse to handle each + * partition. + * + * On constraint names: It's not possible in general to give all the cascaded + * constraints the same name, so we don't try. (Also, even when it *is* + * possible, it goes counter to the SQL-standard rule that constraint names + * must be unique within a schema.) Therefore we apply the given name to the + * top-most constraint, and use generated names when cascading to partitions + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the root referencing relation. + * pkrel is the referenced relation; might be a partition, if recursing. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstraint is the OID of a parent constraint; InvalidOid if this is a + * top-level constraint. + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + */ +static ObjectAddress +addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstraint, + int numfks, + int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators, + Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok) +{ + ObjectAddress address; + Oid constrOid; + char *conname; + bool conislocal; + int coninhcount; + bool connoinherit; + + /* + * Verify relkind for each referenced partition. At the top level, this + * is redundant with a previous check, but we need it when recursing. + */ + if (pkrel->rd_rel->relkind != RELKIND_RELATION && + pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)))); + + /* + * Caller supplies us with a constraint name; however, it may be used in + * this partition, so come up with a different one in that case. + */ + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + fkconstraint->conname)) + conname = ChooseConstraintName(RelationGetRelationName(rel), + strVal(linitial(fkconstraint->fk_attrs)), + "fkey", + RelationGetNamespace(rel), NIL); + else + conname = fkconstraint->conname; + + if (OidIsValid(parentConstraint)) + { + conislocal = false; + coninhcount = 1; + connoinherit = false; + } + else + { + conislocal = true; + coninhcount = 0; + /* + * always inherit for partitioned tables, never for legacy inheritance + */ + connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE; + } /* * Record the FK constraint in pg_constraint. */ - constrOid = CreateConstraintEntry(fkconstraint->conname, + constrOid = CreateConstraintEntry(conname, RelationGetNamespace(rel), CONSTRAINT_FOREIGN, fkconstraint->deferrable, fkconstraint->initdeferred, fkconstraint->initially_valid, - parentConstr, + parentConstraint, RelationGetRelid(rel), fkattnum, numfks, @@ -7620,72 +7755,197 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, pfeqoperators, ppeqoperators, ffeqoperators, - numpks, + numfks, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, NULL, /* no exclusion constraint */ NULL, /* no check constraint */ NULL, - true, /* islocal */ - 0, /* inhcount */ - connoinherit, /* conNoInherit */ + conislocal, /* islocal */ + coninhcount, /* inhcount */ + connoinherit, /* conNoInherit */ false); /* is_internal */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); /* - * Create the triggers that will enforce the constraint. We only want the - * action triggers to appear for the parent partitioned relation, even - * though the constraints also exist below. + * Also, if this is a constraint on a partition, give it partition-type + * dependencies on the parent constraint as well as the table. */ - createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, - constrOid, indexOid, !recursing); + if (OidIsValid(parentConstraint)) + { + ObjectAddress referenced; + + ObjectAddressSet(referenced, ConstraintRelationId, parentConstraint); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, RelationGetRelid(pkrel)); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + } + + /* make new constraint visible, in case we add more */ + CommandCounterIncrement(); /* - * Tell Phase 3 to check that the constraint is satisfied by existing - * rows. We can skip this during table creation, when requested explicitly - * by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're - * recreating a constraint following a SET DATA TYPE operation that did - * not impugn its validity. + * If the referenced table is a plain relation, create the action triggers + * that enforce the constraint. */ - if (!old_check_ok && !fkconstraint->skip_validation) + if (pkrel->rd_rel->relkind == RELKIND_RELATION) { - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = fkconstraint->conname; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = RelationGetRelid(pkrel); - newcon->refindid = indexOid; - newcon->conid = constrOid; - newcon->qual = (Node *) fkconstraint; - - tab->constraints = lappend(tab->constraints, newcon); + createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel), + fkconstraint, + constrOid, indexOid); } /* - * When called on a partitioned table, recurse to create the constraint on - * the partitions also. + * If the referenced table is partitioned, recurse on ourselves to handle + * each partition. We need one pg_constraint row created for each + * partition in addition to the pg_constraint row for the parent table. */ - if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - PartitionDesc partdesc; + PartitionDesc pd = RelationGetPartitionDesc(pkrel); + + for (int i = 0; i < pd->nparts; i++) + { + Relation partRel; + AttrNumber *map; + AttrNumber *mapped_pkattnum; + Oid partIndexId; + + partRel = table_open(pd->oids[i], ShareRowExclusiveLock); + + /* + * Map the attribute numbers in the referenced side of the FK + * definition to match the partition's column layout. + */ + map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel), + gettext_noop("could not convert row type")); + if (map) + { + mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); + for (int j = 0; j < numfks; j++) + mapped_pkattnum[j] = map[pkattnum[j] - 1]; + } + else + mapped_pkattnum = pkattnum; + + /* do the deed */ + partIndexId = index_get_partition(partRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partRel)); + addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel, + partIndexId, constrOid, numfks, + mapped_pkattnum, fkattnum, + pfeqoperators, ppeqoperators, ffeqoperators, + old_check_ok); + + /* Done -- clean up (but keep the lock) */ + table_close(partRel, NoLock); + if (map) + { + pfree(mapped_pkattnum); + pfree(map); + } + } + } + + return address; +} + +/* + * addFkRecurseReferencing + * subroutine for ATAddForeignKeyConstraint, referencing side + * + * wqueue is the ALTER TABLE work queue; can be NULL when not running as part + * of an ALTER TABLE sequence. + * fkconstraint is the constraint being added. + * rel is the referencing relation; might be a partition, if recursing. + * pkrel is the root referenced relation. + * indexOid is the OID of the index (on pkrel) implementing this constraint. + * parentConstr is the OID of the parent constraint (there is always one). + * numfks is the number of columns in the foreign key + * pkattnum is the attnum array of referenced attributes. + * fkattnum is the attnum array of referencing attributes. + * pf/pp/ffeqoperators are OID array of operators between columns. + * old_check_ok signals that this constraint replaces an existing one that + * was already validated (thus this one doesn't need validation). + * lockmode is the lockmode to acquire on partitions when recursing. + * + * Note we never try to use the constraint name assigned in fkconstraint. + */ +static void +addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, + Relation pkrel, Oid indexOid, Oid parentConstr, + int numfks, int16 *pkattnum, int16 *fkattnum, + Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators, + bool old_check_ok, LOCKMODE lockmode) +{ + AssertArg(OidIsValid(parentConstr)); + + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + + /* + * If the referencing relation is a plain table, add the check triggers to + * it and, if necessary, schedule it to be checked in Phase 3. + * + * If the relation is partitioned, drill down to do it to its partitions. + */ + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + createForeignKeyCheckTriggers(RelationGetRelid(rel), + RelationGetRelid(pkrel), + fkconstraint, + parentConstr, + indexOid); + + /* + * Tell Phase 3 to check that the constraint is satisfied by existing + * rows. We can skip this during table creation, when requested + * explicitly by specifying NOT VALID in an ADD FOREIGN KEY command, + * and when we're recreating a constraint following a SET DATA TYPE + * operation that did not impugn its validity. + */ + if (!old_check_ok && !fkconstraint->skip_validation) + { + NewConstraint *newcon; + AlteredTableInfo *tab; + + tab = ATGetQueueEntry(wqueue, rel); + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = get_constraint_name(parentConstr); + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->refindid = indexOid; + newcon->conid = parentConstr; + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } + } + else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc pd = RelationGetPartitionDesc(rel); Relation pg_constraint; List *cloned = NIL; ListCell *cell; pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock); - partdesc = RelationGetPartitionDesc(rel); - - for (i = 0; i < partdesc->nparts; i++) + for (int i = 0; i < pd->nparts; i++) { - Oid partitionId = partdesc->oids[i]; + Oid partitionId = pd->oids[i]; Relation partition = table_open(partitionId, lockmode); CheckTableNotInUse(partition, "ALTER TABLE"); CloneFkReferencing(pg_constraint, rel, partition, - list_make1_oid(constrOid), + list_make1_oid(parentConstr), &cloned); table_close(partition, NoLock); @@ -7712,16 +7972,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, childtab->constraints = lappend(childtab->constraints, newcon); - table_close(partition, lockmode); + table_close(partition, NoLock); } } - - /* - * Close pk table, but keep lock until we've committed. - */ - table_close(pkrel, NoLock); - - return address; } /* @@ -7749,11 +8002,21 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) List *clone = NIL; parentRel = table_open(parentId, NoLock); /* already got lock */ + /* This only works for declarative partitioning */ + Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* see ATAddForeignKeyConstraint about lock level */ rel = table_open(relationId, AccessExclusiveLock); pg_constraint = table_open(ConstraintRelationId, RowShareLock); - /* Obtain the list of constraints to clone or attach */ + /* + * Clone constraints where the parent is in the referenced side. + */ + CloneFkReferenced(parentRel, rel, pg_constraint); + + /* + * Now search for constraints where the parent is in the referencing side. + */ ScanKeyInit(&key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(parentId)); @@ -7761,21 +8024,172 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) NULL, 1, &key); while ((tuple = systable_getnext(scan)) != NULL) { - Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid; + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); - clone = lappend_oid(clone, oid); + if (constrForm->contype != CONSTRAINT_FOREIGN) + continue; + + clone = lappend_oid(clone, constrForm->oid); } systable_endscan(scan); /* Do the actual work, recursing to partitions as needed */ - CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + if (clone != NIL) + CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); - /* We're done. Clean up */ + list_free(clone); + + /* We're done. Clean up, keeping locks till commit */ table_close(parentRel, NoLock); - table_close(rel, NoLock); /* keep lock till commit */ + table_close(rel, NoLock); table_close(pg_constraint, RowShareLock); } +/* + * CloneFkReferenced + * Subroutine for CloneForeignKeyConstraints, referenced side + * + * Clone the FKs that reference the parent relation. Used when partitionRel + * is created/attached. (Recursion to partitions is effected by callee + * addFkRecurseReferenced, so this routine is not itself recursive.) + */ +static void +CloneFkReferenced(Relation parentRel, Relation partitionRel, + Relation pg_constraint) +{ + AttrNumber *attmap; + ListCell *cell; + SysScanDesc scan; + ScanKeyData key[2]; + HeapTuple tuple; + List *clone = NIL; + + /* + * Search for any constraints where this partition is in the referenced + * side. However, we must ignore any constraint whose parent constraint + * is also going to be cloned, to avoid duplicates. So do it in two + * steps: first construct the list of constraints to clone, then go over + * that list cloning those whose parents are not in the list. (We must + * not rely on the parent being seen first, since catalog order could + * return children first.) + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + /* This is a seqscan, as we don't have a usable index ... */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, + NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* ignore this constraint if the parent is already on the list */ + if (list_member_oid(clone, constrForm->conparentid)) + continue; + + clone = lappend_oid(clone, constrForm->oid); + } + systable_endscan(scan); + + foreach(cell, clone) + { + Oid constrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + Form_pg_attribute att; + Relation fkRel; + Oid indexOid; + Oid partIndexId; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* skip children whose parents are going to be cloned, as above */ + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } + + /* + * Because we're only expanding the key space at the referenced side, + * we don't need to prevent any operation in the referencing table, so + * AccessShareLock suffices (assumes that dropping the constraint + * acquires AEL). + */ + fkRel = table_open(constrForm->conrelid, AccessShareLock); + + indexOid = constrForm->conindid; + DeconstructFkConstraintRow(tuple, + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap[confkey[i] - 1]; + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = NameStr(constrForm->conname); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + fkconstraint->initially_valid = true; + fkconstraint->fk_matchtype = constrForm->confmatchtype; + + /* + * This is a bit grotty: we only need the first column name, which is + * used to generate the constraint name. + */ + att = TupleDescAttr(RelationGetDescr(partitionRel), conkey[0] - 1); + fkconstraint->fk_attrs = list_make1(makeString(NameStr(att->attname))); + + /* + * Add the new foreign key constraint pointing to the new partition. + * Because this new partition appears in the referenced side of the + * constraint, we don't need to set up for Phase 3 check. + */ + partIndexId = index_get_partition(partitionRel, indexOid); + if (!OidIsValid(partIndexId)) + elog(ERROR, "index for %u not found in partition %s", + indexOid, RelationGetRelationName(partitionRel)); + addFkRecurseReferenced(NULL, + fkconstraint, + fkRel, + partitionRel, + partIndexId, + constrOid, + numfks, + mapped_confkey, + conkey, + conpfeqop, + conppeqop, + conffeqop, + true); + + table_close(fkRel, NoLock); + ReleaseSysCache(tuple); + } +} + /* * CloneFkReferencing * Recursive subroutine for CloneForeignKeyConstraints, referencing side @@ -7800,6 +8214,11 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, List *subclone = NIL; ListCell *cell; + if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("foreign keys constraints are not supported on foreign tables"))); + /* * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. @@ -7823,7 +8242,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, Oid conppeqop[INDEX_MAX_KEYS]; Oid conffeqop[INDEX_MAX_KEYS]; Constraint *fkconstraint; - bool attach_it; + bool attached; Oid constrOid; ObjectAddress parentAddr, childAddr, @@ -7843,136 +8262,44 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, ReleaseSysCache(tuple); continue; } + if (list_member_oid(clone, constrForm->conparentid)) + { + ReleaseSysCache(tuple); + continue; + } ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) mapped_conkey[i] = attmap[conkey[i] - 1]; /* * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive + * fit for the purpose. If one is, attach the parent constraint to + * it, and don't clone anything. This way we avoid the expensive * verification step and don't end up with a duplicate FK. This also * means we don't consider this constraint when recursing to * partitions. */ - attach_it = false; + attached = false; foreach(cell, partFKs) { ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - Relation trigrel; - HeapTuple trigtup; - SysScanDesc scan; - ScanKeyData key; - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + if (tryAttachPartitionForeignKey(fk, + RelationGetRelid(partRel), + parentConstrOid, + numfks, + mapped_conkey, + confkey, + conpfeqop)) { - attach_it = false; - continue; + attached = true; + break; } - for (i = 0; i < numfks; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* - * Looks good! Attach this constraint. The action triggers in - * the new partition become redundant -- the parent table already - * has equivalent ones, and those will be able to reach the - * partition. Remove the ones in the partition. We identify them - * because they have our constraint OID, as well as being on the - * referenced rel. - */ - trigrel = heap_open(TriggerRelationId, RowExclusiveLock); - ScanKeyInit(&key, - Anum_pg_trigger_tgconstraint, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(fk->conoid)); - - scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, - NULL, 1, &key); - while ((trigtup = systable_getnext(scan)) != NULL) - { - Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); - ObjectAddress trigger; - - if (trgform->tgconstrrelid != fk->conrelid) - continue; - if (trgform->tgrelid != fk->confrelid) - continue; - - /* - * The constraint is originally set up to contain this trigger - * as an implementation object, so there's a dependency record - * that links the two; however, since the trigger is no longer - * needed, we remove the dependency link in order to be able - * to drop the trigger while keeping the constraint intact. - */ - deleteDependencyRecordsFor(TriggerRelationId, - trgform->oid, - false); - /* make dependency deletion visible to performDeletion */ - CommandCounterIncrement(); - ObjectAddressSet(trigger, TriggerRelationId, - trgform->oid); - performDeletion(&trigger, DROP_RESTRICT, 0); - /* make trigger drop visible, in case the loop iterates */ - CommandCounterIncrement(); - } - - systable_endscan(scan); - table_close(trigrel, RowExclusiveLock); - - ConstraintSetParentConstraint(fk->conoid, parentConstrOid, - RelationGetRelid(partRel)); - CommandCounterIncrement(); - attach_it = true; - break; } /* @@ -7980,7 +8307,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, * create a new one. In fact, there's no need to recurse for this * constraint to partitions, either. */ - if (attach_it) + if (attached) { ReleaseSysCache(tuple); continue; @@ -8012,8 +8339,10 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, NULL, NULL, NULL, - false, - 1, false, true); + false, /* islocal */ + 1, /* inhcount */ + false, /* conNoInherit */ + true); subclone = lappend_oid(subclone, constrOid); /* Set up partition dependencies for the new constraint */ @@ -8033,8 +8362,14 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, fkconstraint->deferrable = constrForm->condeferrable; fkconstraint->initdeferred = constrForm->condeferred; - createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, - constrOid, constrForm->conindid, false); + /* If this is a plain relation, create the check triggers */ + if (partRel->rd_rel->relkind == RELKIND_RELATION) + createForeignKeyCheckTriggers(RelationGetRelid(partRel), + constrForm->confrelid, + fkconstraint, constrOid, + constrForm->conindid); + /* make catalog rows visible */ + CommandCounterIncrement(); if (cloned) { @@ -8070,6 +8405,9 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, PartitionDesc partdesc = RelationGetPartitionDesc(partRel); int i; + /* make previously created constraints visible */ + CommandCounterIncrement(); + for (i = 0; i < partdesc->nparts; i++) { Relation childRel; @@ -8085,6 +8423,141 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, } } +/* + * When the parent of a partition receives a foreign key, we must propagate + * that foreign key to the partition. However, the partition might already + * have an equivalent foreign key; this routine compares the given + * ForeignKeyCacheInfo (in the partition) to the FK defined by the other + * parameters. If they are equivalent, create the link between the two + * constraints and return true. + * + * If no FK in the partition matches the rest of the params, return false. + * Caller must create a new constraint. + */ +static bool +tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk, + Oid partRelid, + Oid parentConstrOid, + int numfks, + AttrNumber *mapped_conkey, + AttrNumber *confkey, + Oid *conpfeqop) +{ + HeapTuple parentConstrTup; + Form_pg_constraint parentConstr; + HeapTuple partcontup; + Form_pg_constraint partConstr; + Relation trigrel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple trigtup; + + parentConstrTup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(parentConstrOid)); + if (!parentConstrTup) + elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid); + parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup); + + /* + * Do some quick & easy initial checks. If any of these fail, we cannot + * use this constraint. + */ + if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks) + { + ReleaseSysCache(parentConstrTup); + return false; + } + for (int i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + ReleaseSysCache(parentConstrTup); + return false; + } + } + + /* + * Looks good so far; do some more extensive checks. Presumably the check + * for 'convalidated' could be dropped, since we don't really care about + * that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != parentConstr->condeferrable || + partConstr->condeferred != parentConstr->condeferred || + partConstr->confupdtype != parentConstr->confupdtype || + partConstr->confdeltype != parentConstr->confdeltype || + partConstr->confmatchtype != parentConstr->confmatchtype) + { + ReleaseSysCache(parentConstrTup); + ReleaseSysCache(partcontup); + return false; + } + + ReleaseSysCache(partcontup); + ReleaseSysCache(parentConstrTup); + + /* + * Looks good! Attach this constraint. The action triggers in the new + * partition become redundant -- the parent table already has equivalent + * ones, and those will be able to reach the partition. Remove the ones + * in the partition. We identify them because they have our constraint + * OID, as well as being on the referenced rel. + */ + trigrel = heap_open(TriggerRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_trigger_tgconstraint, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(fk->conoid)); + + scan = systable_beginscan(trigrel, TriggerConstraintIndexId, true, + NULL, 1, &key); + while ((trigtup = systable_getnext(scan)) != NULL) + { + Form_pg_trigger trgform = (Form_pg_trigger) GETSTRUCT(trigtup); + ObjectAddress trigger; + + if (trgform->tgconstrrelid != fk->conrelid) + continue; + if (trgform->tgrelid != fk->confrelid) + continue; + + /* + * The constraint is originally set up to contain this trigger as an + * implementation object, so there's a dependency record that links + * the two; however, since the trigger is no longer needed, we remove + * the dependency link in order to be able to drop the trigger while + * keeping the constraint intact. + */ + deleteDependencyRecordsFor(TriggerRelationId, + trgform->oid, + false); + /* make dependency deletion visible to performDeletion */ + CommandCounterIncrement(); + ObjectAddressSet(trigger, TriggerRelationId, + trgform->oid); + performDeletion(&trigger, DROP_RESTRICT, 0); + /* make trigger drop visible, in case the loop iterates */ + CommandCounterIncrement(); + } + + systable_endscan(scan); + table_close(trigrel, RowExclusiveLock); + + ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid); + CommandCounterIncrement(); + return true; +} + + /* * ALTER TABLE ALTER CONSTRAINT * @@ -8204,8 +8677,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd, /* * Update deferrability of RI_FKey_noaction_del, * RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd - * triggers, but not others; see createForeignKeyTriggers and - * CreateFKCheckTrigger. + * triggers, but not others; see createForeignKeyActionTriggers + * and CreateFKCheckTrigger. */ if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL && tgform->tgfoid != F_RI_FKEY_NOACTION_UPD && @@ -9116,37 +9589,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, indexOid, false); } -/* - * Create the triggers that implement an FK constraint. - * - * NB: if you change any trigger properties here, see also - * ATExecAlterConstraint. - */ -void -createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, - Oid constraintOid, Oid indexOid, bool create_action) -{ - /* - * For the referenced side, create action triggers, if requested. (If the - * referencing side is partitioned, there is still only one trigger, which - * runs on the referenced side and points to the top of the referencing - * hierarchy.) - */ - if (create_action) - createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid, - indexOid); - - /* - * For the referencing side, create the check triggers. We only need - * these on the partitions. - */ - if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid, - fkconstraint, constraintOid, indexOid); - - CommandCounterIncrement(); -} - /* * ALTER TABLE DROP CONSTRAINT * @@ -14906,6 +15348,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) RelationGetRelid(attachrel)); update_relispartition(NULL, cldIdxId, true); found = true; + + CommandCounterIncrement(); break; } } @@ -15084,6 +15528,7 @@ ATExecDetachPartition(Relation rel, RangeVar *name) Oid defaultPartOid; List *indexes; List *fks; + List *refconstraints; ListCell *cell; /* @@ -15097,6 +15542,10 @@ ATExecDetachPartition(Relation rel, RangeVar *name) partRel = table_openrv(name, ShareUpdateExclusiveLock); + /* Ensure that foreign keys still hold after this detach */ + refconstraints = GetParentedForeignKeyRefs(partRel); + CheckNoForeignKeyRefs(partRel, refconstraints, false); + /* All inheritance related checks are performed within the function */ RemoveInheritance(partRel, rel); @@ -15215,6 +15664,24 @@ ATExecDetachPartition(Relation rel, RangeVar *name) } list_free_deep(fks); + /* + * Any sub-constrains that are in the referenced-side of a larger + * constraint have to be removed. This partition is no longer part of the + * key space of the constraint. + */ + foreach(cell, refconstraints) + { + Oid constrOid = lfirst_oid(cell); + ObjectAddress constraint; + + ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid); + CommandCounterIncrement(); + + ObjectAddressSet(constraint, ConstraintRelationId, constrOid); + performDeletion(&constraint, DROP_RESTRICT, 0); + } + CommandCounterIncrement(); + /* * Invalidate the parent's relcache so that the partition is no longer * included in its partition descriptor. @@ -15595,3 +16062,118 @@ update_relispartition(Relation classRel, Oid relationId, bool newval) if (opened) table_close(classRel, RowExclusiveLock); } + +/* + * Return an OID list of constraints that reference the given relation + * that are marked as having a parent constraints. + */ +List * +GetParentedForeignKeyRefs(Relation partition) +{ + Relation pg_constraint; + HeapTuple tuple; + SysScanDesc scan; + ScanKeyData key[2]; + List *constraints = NIL; + + /* + * If no indexes, or no columns are referenceable by FKs, we can avoid the + * scan. + */ + if (RelationGetIndexList(partition) == NIL || + bms_is_empty(RelationGetIndexAttrBitmap(partition, + INDEX_ATTR_BITMAP_KEY))) + return NIL; + + /* Search for constraints referencing this table */ + pg_constraint = table_open(ConstraintRelationId, AccessShareLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_confrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition))); + ScanKeyInit(&key[1], + Anum_pg_constraint_contype, BTEqualStrategyNumber, + F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN)); + + /* XXX This is a seqscan, as we don't have a usable index */ + scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* + * We only need to process constraints that are part of larger ones. + */ + if (!OidIsValid(constrForm->conparentid)) + continue; + + constraints = lappend_oid(constraints, constrForm->oid); + } + + systable_endscan(scan); + table_close(pg_constraint, AccessShareLock); + + return constraints; +} + +/* + * During an operation that removes a partition from a partitioned table + * (either a DETACH or DROP), verify that any foreign keys pointing to the + * partitioned table would not become invalid. An error raised if any + * referenced values exist. + * + * Returns a list of such constraints. + */ +void +CheckNoForeignKeyRefs(Relation partition, List *constraints, bool isDrop) +{ + ListCell *cell; + + /* + * In the DROP case, we can skip this check when this is a partitioned + * partition, because its partitions will go through this also, and we'd + * run the check twice uselessly. + * + * In the DETACH case, this is only called for the top-level relation, so + * we must run it nevertheless. + */ + if (isDrop && partition->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + foreach(cell, constraints) + { + Oid constrOid = lfirst_oid(cell); + HeapTuple tuple; + Form_pg_constraint constrForm; + Relation rel; + Trigger trig; + + tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + Assert(OidIsValid(constrForm->conparentid)); + Assert(constrForm->confrelid == RelationGetRelid(partition)); + + /* prevent data changes into the referencing table until commit */ + rel = table_open(constrForm->conrelid, ShareLock); + + MemSet(&trig, 0, sizeof(trig)); + trig.tgoid = InvalidOid; + trig.tgname = NameStr(constrForm->conname); + trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN; + trig.tgisinternal = true; + trig.tgconstrrelid = RelationGetRelid(partition); + trig.tgconstrindid = constrForm->conindid; + trig.tgconstraint = constrForm->oid; + trig.tgdeferrable = false; + trig.tginitdeferred = false; + /* we needn't fill in remaining fields */ + + RI_Final_Check(&trig, rel, partition); + + ReleaseSysCache(tuple); + + table_close(rel, NoLock); + } +} diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index e1aa3d0044f..2720ab6076b 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -56,6 +56,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" +#include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" @@ -240,7 +241,7 @@ static void ri_ExtractValues(Relation rel, HeapTuple tup, static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, HeapTuple violator, TupleDesc tupdesc, - int queryno) pg_attribute_noreturn(); + int queryno, bool partgone) pg_attribute_noreturn(); /* ---------- @@ -398,18 +399,22 @@ RI_FKey_check(TriggerData *trigdata) char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; + const char *pk_only; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * corresponding FK attributes. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -533,19 +538,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; + const char *pk_only; Oid queryoids[RI_MAX_NUMKEYS]; /* ---------- * The query string built is - * SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...] + * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...] * FOR KEY SHARE OF x * The type id's for the $ parameters are those of the * PK attributes themselves. * ---------- */ initStringInfo(&querybuf); + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; quoteRelationName(pkrelname, pk_rel); - appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); + appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x", + pk_only, pkrelname); querysep = "WHERE"; for (i = 0; i < riinfo->nkeys; i++) { @@ -1711,6 +1720,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RangeTblEntry *fkrte; const char *sep; const char *fk_only; + const char *pk_only; int i; int save_nestlevel; char workmembuf[32]; @@ -1770,7 +1780,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) /*---------- * The query string built is: * SELECT fk.keycols FROM [ONLY] relname fk - * LEFT OUTER JOIN ONLY pkrelname pk + * LEFT OUTER JOIN [ONLY] pkrelname pk * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH SIMPLE: @@ -1797,9 +1807,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) quoteRelationName(fkrelname, fk_rel); fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? "" : "ONLY "; + pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; appendStringInfo(&querybuf, - " FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON", - fk_only, fkrelname, pkrelname); + " FROM %s%s fk LEFT OUTER JOIN %s%s pk ON", + fk_only, fkrelname, pk_only, pkrelname); strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); @@ -1952,7 +1964,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, tuple, tupdesc, - RI_PLAN_CHECK_LOOKUPPK); + RI_PLAN_CHECK_LOOKUPPK, false); } if (SPI_finish() != SPI_OK_FINISH) @@ -1966,6 +1978,188 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) return true; } +/* ---------- + * RI_Final_Check - + * + */ +void +RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) +{ + const RI_ConstraintInfo *riinfo; + StringInfoData querybuf; + char *constraintDef; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char fkrelname[MAX_QUOTED_REL_NAME_LEN]; + char pkattname[MAX_QUOTED_NAME_LEN + 3]; + char fkattname[MAX_QUOTED_NAME_LEN + 3]; + const char *sep; + const char *fk_only; + int spi_result; + SPIPlanPtr qplan; + int i; + + riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); + + /* XXX handle the non-permission case?? */ + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM [ONLY] relname fk + * JOIN pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE (<partition constraint>) AND + * For MATCH SIMPLE: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + * + * We attach COLLATE clauses to the operators when comparing columns + * that have different collations. + *---------- + */ + initStringInfo(&querybuf); + appendStringInfoString(&querybuf, "SELECT "); + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname); + sep = ", "; + } + + quoteRelationName(pkrelname, pk_rel); + quoteRelationName(fkrelname, fk_rel); + fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + appendStringInfo(&querybuf, + " FROM %s%s fk JOIN %s pk ON", + fk_only, fkrelname, pkrelname); + strcpy(pkattname, "pk."); + strcpy(fkattname, "fk."); + sep = "("; + for (i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]); + Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]); + Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]); + + quoteOneName(pkattname + 3, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + quoteOneName(fkattname + 3, + RIAttName(fk_rel, riinfo->fk_attnums[i])); + ri_GenerateQual(&querybuf, sep, + pkattname, pk_type, + riinfo->pf_eq_oprs[i], + fkattname, fk_type); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&querybuf, pk_coll); + sep = "AND"; + } + + /* + * Start the WHERE clause with the partition constraint (except if this + * is the default partition and there's no other partition, because the + * partition constraint is the empty string in that case.) + */ + constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk"); + if (constraintDef && constraintDef[0] != '\0') + appendStringInfo(&querybuf, ") WHERE %s AND (", + constraintDef); + else + appendStringInfo(&querybuf, ") WHERE ("); + + sep = ""; + for (i = 0; i < riinfo->nkeys; i++) + { + quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i])); + appendStringInfo(&querybuf, + "%sfk.%s IS NOT NULL", + sep, fkattname); + switch (riinfo->confmatchtype) + { + case FKCONSTR_MATCH_SIMPLE: + sep = " AND "; + break; + case FKCONSTR_MATCH_FULL: + sep = " OR "; + break; + case FKCONSTR_MATCH_PARTIAL: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MATCH PARTIAL not yet implemented"))); + break; + default: + elog(ERROR, "unrecognized confmatchtype: %d", + riinfo->confmatchtype); + break; + } + } + appendStringInfoChar(&querybuf, ')'); + + /* + * RI_Initial_Check changes work_mem here. + */ + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querybuf.data, 0, NULL); + + if (qplan == NULL) + elog(ERROR, "SPI_prepare returned %s for %s", + SPI_result_code_string(SPI_result), querybuf.data); + + /* + * Run the plan. For safety we force a current snapshot to be used. (In + * transaction-snapshot mode, this arguably violates transaction isolation + * rules, but we really haven't got much choice.) We don't need to + * register the snapshot, because SPI_execute_snapshot will see to it. We + * need at most one tuple returned, so pass limit = 1. + */ + spi_result = SPI_execute_snapshot(qplan, + NULL, NULL, + GetLatestSnapshot(), + InvalidSnapshot, + true, false, 1); + + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + + /* Did we find a tuple that would violate the constraint? */ + if (SPI_processed > 0) + { + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + RI_ConstraintInfo fake_riinfo; + + /* + * The columns to look at in the result tuple are 1..N, not whatever + * they are in the fk_rel. Hack up riinfo so that ri_ReportViolation + * will behave properly. + * + * In addition to this, we have to pass the correct tupdesc to + * ri_ReportViolation, overriding its normal habit of using the pk_rel + * or fk_rel's tupdesc. + */ + memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo)); + for (i = 0; i < fake_riinfo.nkeys; i++) + fake_riinfo.pk_attnums[i] = i + 1; + + ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel, + tuple, tupdesc, 0, true); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + /* ---------- * Local functions below @@ -2167,6 +2361,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) /* Find or create a hashtable entry for the constraint */ riinfo = ri_LoadConstraintInfo(constraintOid); +#if 0 /* Do some easy cross-checks against the trigger call data */ if (rel_is_pk) { @@ -2175,6 +2370,7 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk) elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"", trigger->tgname, RelationGetRelationName(trig_rel)); } +#endif return riinfo; } @@ -2480,7 +2676,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, pk_rel, fk_rel, new_tuple ? new_tuple : old_tuple, NULL, - qkey->constr_queryno); + qkey->constr_queryno, false); return SPI_processed != 0; } @@ -2524,7 +2720,7 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, HeapTuple violator, TupleDesc tupdesc, - int queryno) + int queryno, bool partgone) { StringInfoData key_names; StringInfoData key_values; @@ -2564,9 +2760,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, * * Check table-level permissions next and, failing that, column-level * privileges. + * + * When a partition at the referenced side is being detached/dropped, we + * needn't check, since the user must be the table owner anyway. */ - - if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) + if (partgone) + has_perm = true; + else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED) { aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT); if (aclresult != ACLCHECK_OK) @@ -2616,7 +2816,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo, } } - if (onfk) + if (partgone) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(pk_rel), + NameStr(riinfo->conname)), + errdetail("Key (%s)=(%s) still referenced from table \"%s\".", + key_names.data, key_values.data, + RelationGetRelationName(fk_rel)))); + else if (onfk) ereport(ERROR, (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 1258092dc8c..1442c2c04c8 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -1818,6 +1818,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(string_to_text(consrc)); } +/* + * pg_get_partconstrdef_string + * + * Returns the partition constraint as a C-string for the input relation, with + * the given alias. No pretty-printing. + */ +char * +pg_get_partconstrdef_string(Oid partitionId, char *aliasname) +{ + Expr *constr_expr; + List *context; + + constr_expr = get_partition_qual_relid(partitionId); + context = deparse_context_for(aliasname, partitionId); + + return deparse_expression((Node *) constr_expr, context, true, false); +} + /* * pg_get_constraintdef * diff --git a/src/include/catalog/heap.h b/src/include/catalog/heap.h index 50fb62be9d5..2627a433da6 100644 --- a/src/include/catalog/heap.h +++ b/src/include/catalog/heap.h @@ -81,6 +81,8 @@ extern void heap_create_init_fork(Relation rel); extern void heap_drop_with_catalog(Oid relid); +extern void pre_drop_class_check(Oid relationId, Oid objectSubId); + extern void heap_truncate(List *relids); extern void heap_truncate_one_rel(Relation rel); diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index ec3bb90b01b..efbb5c3a93b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -52,6 +52,10 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void CheckTableNotInUse(Relation rel, const char *stmt); +extern List *GetParentedForeignKeyRefs(Relation partition); +extern void CheckNoForeignKeyRefs(Relation partition, List *constraints, + bool isDrop); + extern void ExecuteTruncate(TruncateStmt *stmt); extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, DropBehavior behavior, bool restart_seqs); @@ -76,10 +80,6 @@ extern void find_composite_type_dependencies(Oid typeOid, extern void check_of_type(HeapTuple typetuple); -extern void createForeignKeyTriggers(Relation rel, Oid refRelOid, - Constraint *fkconstraint, Oid constraintOid, - Oid indexOid, bool create_action); - extern void register_on_commit_action(Oid relid, OnCommitAction action); extern void remove_on_commit_action(Oid relid); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 9f212ac24bf..3c9a45a0720 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -263,6 +263,7 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel, HeapTuple old_row, HeapTuple new_row); extern bool RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); +extern void RI_Final_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel); /* result values for RI_FKey_trigger_type: */ #define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h index 3ebc01e7147..7c49e9d0a83 100644 --- a/src/include/utils/ruleutils.h +++ b/src/include/utils/ruleutils.h @@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid); extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty); extern char *pg_get_partkeydef_columns(Oid relid, bool pretty); +extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname); extern char *pg_get_constraintdef_command(Oid constraintId); extern char *deparse_expression(Node *expr, List *dpcontext, diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index bf2c91d9f0e..4a2f9af3287 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -1478,19 +1478,6 @@ drop table pktable2, fktable2; -- -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); -ERROR: cannot reference partitioned table "fk_partitioned_pk" --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -ERROR: cannot reference partitioned table "fk_partitioned_pk" -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1620,7 +1607,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x); CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT; INSERT INTO fk_partitioned_fk_full VALUES (1, NULL); ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; -- fails -ERROR: insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_fkey" +ERROR: insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_fkey" DETAIL: MATCH FULL does not allow mixing of null and nonnull key values. TRUNCATE fk_partitioned_fk_full; ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; @@ -1965,3 +1952,152 @@ ERROR: constraint "my_fkey" of relation "fk_part_1_1" does not exist drop schema fkpart0, fkpart1, fkpart2 cascade; NOTICE: drop cascades to 8 other objects \set VERBOSITY default +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); +create table fk3 partition of fk for values from (3500) to (5000); +-- these should fail: referenced value not present +insert into fk values (1); +ERROR: insert or update on table "fk1" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1) is not present in table "pk". +insert into fk values (1000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(1000) is not present in table "pk". +insert into fk values (2000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(2000) is not present in table "pk". +insert into fk values (3000); +ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(3000) is not present in table "pk". +insert into fk values (4000); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4000) is not present in table "pk". +insert into fk values (4500); +ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey" +DETAIL: Key (a)=(4500) is not present in table "pk". +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); +-- should fail: referencing value present +delete from pk where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +delete from pk where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +delete from pk where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +delete from pk where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +delete from pk where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +delete from pk where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +update pk set a = 2 where a = 1; +ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk" +DETAIL: Key (a)=(1) is still referenced from table "fk". +update pk set a = 1002 where a = 1000; +ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk" +DETAIL: Key (a)=(1000) is still referenced from table "fk". +update pk set a = 2002 where a = 2000; +ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk" +DETAIL: Key (a)=(2000) is still referenced from table "fk". +update pk set a = 3002 where a = 3000; +ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk" +DETAIL: Key (a)=(3000) is still referenced from table "fk". +update pk set a = 4002 where a = 4000; +ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk" +DETAIL: Key (a)=(4000) is still referenced from table "fk". +update pk set a = 4502 where a = 4500; +ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk" +DETAIL: Key (a)=(4500) is still referenced from table "fk". +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +alter table droppk detach partition droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +alter table droppk detach partition droppk2; +ERROR: removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +alter table droppk2 detach partition droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +drop table droppk_d; +ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5" +DETAIL: Key (a)=(2000) still referenced from table "dropfk". +drop table droppk2_d; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk1; +ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1" +DETAIL: Key (a)=(1) still referenced from table "dropfk". +drop table droppk2; +ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4" +DETAIL: Key (a)=(1500) still referenced from table "dropfk". +drop table droppk21; +ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3" +DETAIL: Key (a)=(1000) still referenced from table "dropfk". +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; diff --git a/src/test/regress/sql/foreign_key.sql b/src/test/regress/sql/foreign_key.sql index c8d1214d02c..63c3b20f83c 100644 --- a/src/test/regress/sql/foreign_key.sql +++ b/src/test/regress/sql/foreign_key.sql @@ -1111,18 +1111,6 @@ drop table pktable2, fktable2; -- Foreign keys and partitioned tables -- --- partitioned table in the referenced side are not allowed -CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b)) - PARTITION BY RANGE (a, b); --- verify with create table first ... -CREATE TABLE fk_notpartitioned_fk (a int, b int, - FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk); --- and then with alter table. -CREATE TABLE fk_notpartitioned_fk_2 (a int, b int); -ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b) - REFERENCES fk_partitioned_pk; -DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2; - -- Creation of a partitioned hierarchy with irregular definitions CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int, PRIMARY KEY (a, b)); @@ -1409,3 +1397,105 @@ alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist \set VERBOSITY terse \\ -- suppress cascade details drop schema fkpart0, fkpart1, fkpart2 cascade; \set VERBOSITY default + +-- Test a partitioned table as referenced table. +-- Verify basic functionality with a regular partition creation and a partition +-- with a different column layout, as well as partitions +-- added (created and attached) after creating the foreign key. +create schema regress_fk; +set search_path to regress_fk; + +create table pk (a int primary key) partition by range (a); +create table pk1 partition of pk for values from (0) to (1000); +create table pk2 (b int, a int); +alter table pk2 drop column b; +alter table pk2 alter a set not null; +alter table pk attach partition pk2 for values from (1000) to (2000); + +create table fk (a int) partition by range (a); +create table fk1 partition of fk for values from (0) to (750); +alter table fk add foreign key (a) references pk; +create table fk2 (b int, a int) ; +alter table fk2 drop column b; +alter table fk attach partition fk2 for values from (750) to (3500); + +create table pk3 partition of pk for values from (2000) to (3000); +create table pk4 (like pk); +alter table pk attach partition pk4 for values from (3000) to (4000); + +create table pk5 (like pk) partition by range (a); +create table pk51 partition of pk5 for values from (4000) to (4500); +create table pk52 partition of pk5 for values from (4500) to (5000); +alter table pk attach partition pk5 for values from (4000) to (5000); + +create table fk3 partition of fk for values from (3500) to (5000); + +-- these should fail: referenced value not present +insert into fk values (1); +insert into fk values (1000); +insert into fk values (2000); +insert into fk values (3000); +insert into fk values (4000); +insert into fk values (4500); +-- insert into the referenced table, now they should work +insert into pk values (1), (1000), (2000), (3000), (4000), (4500); +insert into fk values (1), (1000), (2000), (3000), (4000), (4500); + +-- should fail: referencing value present +delete from pk where a = 1; +delete from pk where a = 1000; +delete from pk where a = 2000; +delete from pk where a = 3000; +delete from pk where a = 4000; +delete from pk where a = 4500; +update pk set a = 2 where a = 1; +update pk set a = 1002 where a = 1000; +update pk set a = 2002 where a = 2000; +update pk set a = 3002 where a = 3000; +update pk set a = 4002 where a = 4000; +update pk set a = 4502 where a = 4500; +-- now they should work +delete from fk; +update pk set a = 2 where a = 1; +delete from pk where a = 2; +update pk set a = 1002 where a = 1000; +delete from pk where a = 1002; +update pk set a = 2002 where a = 2000; +delete from pk where a = 2002; +update pk set a = 3002 where a = 3000; +delete from pk where a = 3002; +update pk set a = 4002 where a = 4000; +delete from pk where a = 4002; +update pk set a = 4502 where a = 4500; +delete from pk where a = 4502; + +-- dropping/detaching partitions is prevented if that would break +-- a foreign key's existing data +create table droppk (a int primary key) partition by range (a); +create table droppk1 partition of droppk for values from (0) to (1000); +create table droppk_d partition of droppk default; +create table droppk2 partition of droppk for values from (1000) to (2000) + partition by range (a); +create table droppk21 partition of droppk2 for values from (1000) to (1400); +create table droppk2_d partition of droppk2 default; +insert into droppk values (1), (1000), (1500), (2000); +create table dropfk (a int references droppk); +insert into dropfk values (1), (1000), (1500), (2000); +-- these should all fail +alter table droppk detach partition droppk_d; +alter table droppk2 detach partition droppk2_d; +alter table droppk detach partition droppk1; +alter table droppk detach partition droppk2; +alter table droppk2 detach partition droppk21; +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +drop table droppk2; +drop table droppk21; +delete from dropfk; +-- now they should all work +drop table droppk_d; +drop table droppk2_d; +drop table droppk1; +alter table droppk2 detach partition droppk21; +drop table droppk2; -- 2.17.1