Excerpts from Alvaro Herrera's message of lun jun 13 18:08:12 -0400 2011: > Excerpts from Dean Rasheed's message of sáb jun 11 09:32:15 -0400 2011:
> > I think that you also need to update the constraint exclusion code > > (get_relation_constraints() or nearby), otherwise the planner might > > exclude a relation on the basis of a CHECK constraint that is not > > currently VALID. > > Ouch, yeah, thanks for pointing that out. Fortunately the patch to fix > this is quite simple. I don't have it handy right now but I'll post it > soon. Here's the complete patch. *** a/doc/src/sgml/catalogs.sgml --- b/doc/src/sgml/catalogs.sgml *************** *** 1898,1904 **** <entry><structfield>convalidated</structfield></entry> <entry><type>bool</type></entry> <entry></entry> ! <entry>Has the constraint been validated? Can only be false for foreign keys</entry> </row> <row> --- 1898,1904 ---- <entry><structfield>convalidated</structfield></entry> <entry><type>bool</type></entry> <entry></entry> ! <entry>Has the constraint been validated? Can only be false for foreign keys and CHECK constraints</entry> </row> <row> *** a/doc/src/sgml/ref/alter_domain.sgml --- b/doc/src/sgml/ref/alter_domain.sgml *************** *** 28,37 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> { SET | DROP } NOT NULL ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ! ADD <replaceable class="PARAMETER">domain_constraint</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ] ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> SET SCHEMA <replaceable class="PARAMETER">new_schema</replaceable> --- 28,39 ---- ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> { SET | DROP } NOT NULL ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ! ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ] ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ] ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> + VALIDATE CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> + ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> SET SCHEMA <replaceable class="PARAMETER">new_schema</replaceable> *************** *** 70,82 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> </varlistentry> <varlistentry> ! <term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable></term> <listitem> <para> This form adds a new constraint to a domain using the same syntax as <xref linkend="SQL-CREATEDOMAIN">. ! This will only succeed if all columns using the domain satisfy the ! new constraint. </para> </listitem> </varlistentry> --- 72,88 ---- </varlistentry> <varlistentry> ! <term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ]</term> <listitem> <para> This form adds a new constraint to a domain using the same syntax as <xref linkend="SQL-CREATEDOMAIN">. ! If NOT VALID is not specified, ! the command will only succeed if all columns using the ! domain satisfy the new constraint. ! The constraint is going to be enforced on new data inserted into columns ! using the domain in all cases, regardless of <literal>NOT VALID</>. ! <literal>NOT VALID</> is only accepted for <literal>CHECK</> constraints. </para> </listitem> </varlistentry> *************** *** 91,96 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> --- 97,113 ---- </varlistentry> <varlistentry> + <term>VALIDATE CONSTRAINT</term> + <listitem> + <para> + This form validates a constraint previously added, that is, verify that + all data in columns using the domain satisfy the specified constraint. + </para> + </listitem> + </varlistentry> + + + <varlistentry> <term>OWNER</term> <listitem> <para> *************** *** 156,161 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> --- 173,188 ---- </varlistentry> <varlistentry> + <term><replaceable class="PARAMETER">NOT VALID</replaceable></term> + <listitem> + <para> + Do not verify existing column data for constraint validity. + </para> + </listitem> + </varlistentry> + + + <varlistentry> <term><literal>CASCADE</literal></term> <listitem> <para> *************** *** 251,257 **** ALTER DOMAIN zipcode SET SCHEMA customers; <para> <command>ALTER DOMAIN</command> conforms to the <acronym>SQL</acronym> standard, ! except for the <literal>OWNER</> and <literal>SET SCHEMA</> variants, which are <productname>PostgreSQL</productname> extensions. </para> </refsect1> --- 278,286 ---- <para> <command>ALTER DOMAIN</command> conforms to the <acronym>SQL</acronym> standard, ! except for the <literal>OWNER</>, <literal>SET SCHEMA</> and ! <literal>VALIDATE CONSTRAINT</> variants, ! as well as the <literal>NOT VALID</> clause of the <literal>ADD CONSTRAINT</> variant, which are <productname>PostgreSQL</productname> extensions. </para> </refsect1> *** a/doc/src/sgml/ref/alter_table.sgml --- b/doc/src/sgml/ref/alter_table.sgml *************** *** 240,246 **** ALTER TABLE <replaceable class="PARAMETER">name</replaceable> <listitem> <para> This form adds a new constraint to a table using the same syntax as ! <xref linkend="SQL-CREATETABLE">. Newly added foreign key constraints can also be defined as <literal>NOT VALID</literal> to avoid the potentially lengthy initial check that must otherwise be performed. Constraint checks are skipped at create table time, so --- 240,246 ---- <listitem> <para> This form adds a new constraint to a table using the same syntax as ! <xref linkend="SQL-CREATETABLE">. Newly added foreign key and CHECK constraints can also be defined as <literal>NOT VALID</literal> to avoid the potentially lengthy initial check that must otherwise be performed. Constraint checks are skipped at create table time, so *************** *** 253,259 **** ALTER TABLE <replaceable class="PARAMETER">name</replaceable> <term><literal>VALIDATE CONSTRAINT</literal></term> <listitem> <para> ! This form validates a foreign key constraint that was previously created as <literal>NOT VALID</literal>. Constraints already marked valid do not cause an error response. </para> --- 253,259 ---- <term><literal>VALIDATE CONSTRAINT</literal></term> <listitem> <para> ! This form validates a foreign key or CHECK constraint that was previously created as <literal>NOT VALID</literal>. Constraints already marked valid do not cause an error response. </para> *** a/src/backend/access/common/tupdesc.c --- b/src/backend/access/common/tupdesc.c *************** *** 200,205 **** CreateTupleDescCopyConstr(TupleDesc tupdesc) --- 200,206 ---- cpy->check[i].ccname = pstrdup(constr->check[i].ccname); if (constr->check[i].ccbin) cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin); + cpy->check[i].ccvalid = constr->check[i].ccvalid; } } *** a/src/backend/catalog/heap.c --- b/src/backend/catalog/heap.c *************** *** 98,104 **** static Oid AddNewRelationType(const char *typeName, Oid new_array_type); static void RelationRemoveInheritance(Oid relid); static void StoreRelCheck(Relation rel, char *ccname, Node *expr, ! bool is_local, int inhcount); static void StoreConstraints(Relation rel, List *cooked_constraints); static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, bool allow_merge, bool is_local); --- 98,104 ---- Oid new_array_type); static void RelationRemoveInheritance(Oid relid); static void StoreRelCheck(Relation rel, char *ccname, Node *expr, ! bool is_validated, bool is_local, int inhcount); static void StoreConstraints(Relation rel, List *cooked_constraints); static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, bool allow_merge, bool is_local); *************** *** 1845,1851 **** StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr) */ static void StoreRelCheck(Relation rel, char *ccname, Node *expr, ! bool is_local, int inhcount) { char *ccbin; char *ccsrc; --- 1845,1851 ---- */ static void StoreRelCheck(Relation rel, char *ccname, Node *expr, ! bool is_validated, bool is_local, int inhcount) { char *ccbin; char *ccsrc; *************** *** 1906,1912 **** StoreRelCheck(Relation rel, char *ccname, Node *expr, CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ ! true, /* Is Validated */ RelationGetRelid(rel), /* relation */ attNos, /* attrs in the constraint */ keycount, /* # attrs in the constraint */ --- 1906,1912 ---- CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ ! is_validated, RelationGetRelid(rel), /* relation */ attNos, /* attrs in the constraint */ keycount, /* # attrs in the constraint */ *************** *** 1966,1972 **** StoreConstraints(Relation rel, List *cooked_constraints) StoreAttrDefault(rel, con->attnum, con->expr); break; case CONSTR_CHECK: ! StoreRelCheck(rel, con->name, con->expr, con->is_local, con->inhcount); numchecks++; break; --- 1966,1972 ---- StoreAttrDefault(rel, con->attnum, con->expr); break; case CONSTR_CHECK: ! StoreRelCheck(rel, con->name, con->expr, !con->skip_validation, con->is_local, con->inhcount); numchecks++; break; *************** *** 2080,2085 **** AddRelationNewConstraints(Relation rel, --- 2080,2086 ---- cooked->name = NULL; cooked->attnum = colDef->attnum; cooked->expr = expr; + cooked->skip_validation = false; cooked->is_local = is_local; cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); *************** *** 2193,2199 **** AddRelationNewConstraints(Relation rel, /* * OK, store it. */ ! StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1); numchecks++; --- 2194,2201 ---- /* * OK, store it. */ ! StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local, ! is_local ? 0 : 1); numchecks++; *************** *** 2202,2207 **** AddRelationNewConstraints(Relation rel, --- 2204,2210 ---- cooked->name = ccname; cooked->attnum = 0; cooked->expr = expr; + cooked->skip_validation = cdef->skip_validation; cooked->is_local = is_local; cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 258,264 **** static void AlterIndexNamespaces(Relation classRel, Relation rel, static void AlterSeqNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode); ! static void ATExecValidateConstraint(Relation rel, const char *constrName); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, --- 258,265 ---- static void AlterSeqNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode); ! static void ATExecValidateConstraint(Relation rel, const char *constrName, ! bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, *************** *** 269,274 **** static Oid transformFkeyCheckAttrs(Relation pkrel, --- 270,277 ---- int numattrs, int16 *attnums, Oid *opclasses); static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); + static void validateCheckConstraint(char *conname, Relation rel, + HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); *************** *** 560,565 **** DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) --- 563,569 ---- cooked->name = NULL; cooked->attnum = attnum; cooked->expr = colDef->cooked_default; + cooked->skip_validation = false; cooked->is_local = true; /* not used for defaults */ cooked->inhcount = 0; /* ditto */ cookedDefaults = lappend(cookedDefaults, cooked); *************** *** 1567,1572 **** MergeAttributes(List *schema, List *supers, char relpersistence, --- 1571,1577 ---- cooked->name = pstrdup(name); cooked->attnum = 0; /* not used for constraints */ cooked->expr = expr; + cooked->skip_validation = false; cooked->is_local = false; cooked->inhcount = 1; constraints = lappend(constraints, cooked); *************** *** 2932,2938 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddInherit(rel); pass = AT_PASS_MISC; break; ! case AT_ValidateConstraint: case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig: case AT_EnableReplicaTrig: --- 2937,2950 ---- ATPrepAddInherit(rel); pass = AT_PASS_MISC; break; ! case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ ! ATSimplePermissions(rel, ATT_TABLE); ! /* Recursion occurs during execution phase */ ! /* No command-specific prep needed except saving recurse flag */ ! if (recurse) ! cmd->subtype = AT_ValidateConstraintRecurse; ! pass = AT_PASS_MISC; ! break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig: case AT_EnableReplicaTrig: *************** *** 3097,3104 **** ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); break; ! case AT_ValidateConstraint: ! ATExecValidateConstraint(rel, cmd->name); break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, --- 3109,3120 ---- case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); break; ! case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */ ! ATExecValidateConstraint(rel, cmd->name, false, false, lockmode); ! break; ! case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with ! * recursion */ ! ATExecValidateConstraint(rel, cmd->name, true, false, lockmode); break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, *************** *** 5382,5400 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, list_make1(copyObject(constr)), recursing, !recursing); ! /* Add each constraint to Phase 3's queue */ foreach(lcon, newcons) { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - NewConstraint *newcon; ! newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); ! newcon->name = ccon->name; ! newcon->contype = ccon->contype; ! /* ExecQual wants implicit-AND format */ ! newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); ! tab->constraints = lappend(tab->constraints, newcon); /* Save the actually assigned name if it was defaulted */ if (constr->conname == NULL) --- 5398,5420 ---- list_make1(copyObject(constr)), recursing, !recursing); ! /* Add each to-be-validated constraint to Phase 3's queue */ foreach(lcon, newcons) { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); ! if (!ccon->skip_validation) ! { ! NewConstraint *newcon; ! newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); ! newcon->name = ccon->name; ! newcon->contype = ccon->contype; ! /* ExecQual wants implicit-AND format */ ! newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); ! ! tab->constraints = lappend(tab->constraints, newcon); ! } /* Save the actually assigned name if it was defaulted */ if (constr->conname == NULL) *************** *** 5753,5761 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * ALTER TABLE VALIDATE CONSTRAINT */ static void ! ATExecValidateConstraint(Relation rel, const char *constrName) { Relation conrel; SysScanDesc scan; --- 5773,5787 ---- /* * ALTER TABLE VALIDATE CONSTRAINT + * + * XXX The reason we handle recursion here rather than at Phase 1 is because + * there's no good way to skip recursing when handling foreign keys: there is + * no need to lock children in that case, yet we wouldn't be able to avoid + * doing so at that level. */ static void ! ATExecValidateConstraint(Relation rel, const char *constrName, bool recurse, ! bool recursing, LOCKMODE lockmode) { Relation conrel; SysScanDesc scan; *************** *** 5779,5786 **** ATExecValidateConstraint(Relation rel, const char *constrName) while (HeapTupleIsValid(tuple = systable_getnext(scan))) { con = (Form_pg_constraint) GETSTRUCT(tuple); ! if (con->contype == CONSTRAINT_FOREIGN && ! strcmp(NameStr(con->conname), constrName) == 0) { found = true; break; --- 5805,5811 ---- while (HeapTupleIsValid(tuple = systable_getnext(scan))) { con = (Form_pg_constraint) GETSTRUCT(tuple); ! if (strcmp(NameStr(con->conname), constrName) == 0) { found = true; break; *************** *** 5790,5828 **** ATExecValidateConstraint(Relation rel, const char *constrName) if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), ! errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist", constrName, RelationGetRelationName(rel)))); if (!con->convalidated) { ! Oid conid = HeapTupleGetOid(tuple); ! HeapTuple copyTuple = heap_copytuple(tuple); ! Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); ! Relation refrel; ! /* ! * Triggers are already in place on both tables, so a concurrent write ! * that alters the result here is not possible. Normally we can run a ! * query here to do the validation, which would only require ! * AccessShareLock. In some cases, it is possible that we might need ! * to fire triggers to perform the check, so we take a lock at ! * RowShareLock level just in case. ! */ ! refrel = heap_open(con->confrelid, RowShareLock); ! validateForeignKeyConstraint((char *) constrName, rel, refrel, ! con->conindid, ! conid); /* * Now update the catalog, while we have the door open. */ copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple); heap_freetuple(copyTuple); - - heap_close(refrel, NoLock); } systable_endscan(scan); --- 5815,5918 ---- if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), ! errmsg("constraint \"%s\" of relation \"%s\" does not exist", ! constrName, RelationGetRelationName(rel)))); ! ! if (con->contype != CONSTRAINT_FOREIGN && ! con->contype != CONSTRAINT_CHECK) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint", constrName, RelationGetRelationName(rel)))); if (!con->convalidated) { ! HeapTuple copyTuple; ! Form_pg_constraint copy_con; ! if (con->contype == CONSTRAINT_FOREIGN) ! { ! Oid conid = HeapTupleGetOid(tuple); ! Relation refrel; ! /* ! * Triggers are already in place on both tables, so a concurrent write ! * that alters the result here is not possible. Normally we can run a ! * query here to do the validation, which would only require ! * AccessShareLock. In some cases, it is possible that we might need ! * to fire triggers to perform the check, so we take a lock at ! * RowShareLock level just in case. ! */ ! refrel = heap_open(con->confrelid, RowShareLock); ! ! validateForeignKeyConstraint((char *) constrName, rel, refrel, ! con->conindid, ! conid); ! heap_close(refrel, NoLock); ! ! /* ! * Foreign keys do not inherit, so we purposedly ignore the ! * recursion bit here ! */ ! } ! else if (con->contype == CONSTRAINT_CHECK) ! { ! List *children = NIL; ! ListCell *child; ! ! /* ! * If we're recursing, the parent has already done this, so skip ! * it. ! */ ! if (!recursing) ! children = find_all_inheritors(RelationGetRelid(rel), ! lockmode, NULL); ! ! /* ! * For CHECK constraints, we must ensure that we only mark the ! * constraint as validated on the parent if it's already validated ! * on the children. ! * ! * We recurse before validating on the parent, to reduce risk of ! * deadlocks. ! */ ! foreach(child, children) ! { ! Oid childoid = lfirst_oid(child); ! Relation childrel; ! ! if (childoid == RelationGetRelid(rel)) ! continue; ! ! /* ! * If we are told not to recurse, there had better not be any ! * child tables; else the addition would put them out of step. ! */ ! if (!recurse) ! ereport(ERROR, ! (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ! errmsg("constraint must be validated on child tables too"))); ! ! /* find_all_inheritors already got lock */ ! childrel = heap_open(childoid, NoLock); ! ! ATExecValidateConstraint(childrel, constrName, false, ! true, lockmode); ! heap_close(childrel, NoLock); ! } ! ! validateCheckConstraint((char *) constrName, rel, tuple); ! } /* * Now update the catalog, while we have the door open. */ + copyTuple = heap_copytuple(tuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple); heap_freetuple(copyTuple); } systable_endscan(scan); *************** *** 6128,6133 **** checkFkeyPermissions(Relation rel, int16 *attnums, int natts) --- 6218,6292 ---- } /* + * Scan the existing rows in a table to verify they meet a proposed + * CHECK constraint. + * + * The caller must have opened and locked the relation appropriately. + */ + static void + validateCheckConstraint(char *conname, Relation rel, HeapTuple constrtup) + { + EState *estate; + Datum val; + char *conbin; + Expr *origexpr; + List *exprstate; + TupleDesc tupdesc; + HeapScanDesc scan; + HeapTuple tuple; + ExprContext *econtext; + MemoryContext oldcxt; + TupleTableSlot *slot; + bool isnull; + + estate = CreateExecutorState(); + /* + * XXX this tuple doesn't really come from a syscache, but this doesn't + * matter to SysCacheGetAttr, because it only wants to be able to fetch the + * tupdesc + */ + val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin, + &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", + HeapTupleGetOid(constrtup)); + conbin = TextDatumGetCString(val); + origexpr = (Expr *) stringToNode(conbin); + exprstate = (List *) ExecPrepareExpr((Expr *) make_ands_implicit((Expr *) origexpr), estate); + + econtext = GetPerTupleExprContext(estate); + tupdesc = RelationGetDescr(rel); + slot = MakeSingleTupleTableSlot(tupdesc); + econtext->ecxt_scantuple = slot; + + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + + /* + * Switch to per-tuple memory context and reset it for each tuple + * produced, so we don't leak memory. + */ + oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + if (!ExecQual(exprstate, econtext, true)) + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("check constraint \"%s\" is violated by some row", + conname))); + + ResetExprContext(econtext); + } + + MemoryContextSwitchTo(oldcxt); + heap_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + } + + /* * Scan the existing rows in a table to verify they meet a proposed FK * constraint. * *** a/src/backend/commands/typecmds.c --- b/src/backend/commands/typecmds.c *************** *** 86,91 **** static Oid findTypeSendFunction(List *procname, Oid typeOid); --- 86,92 ---- static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); + static void validateDomainConstraint(Oid domainoid, char *ccbin); static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); static void checkDomainOwner(HeapTuple tup); static void checkEnumOwner(HeapTuple tup); *************** *** 1941,1954 **** AlterDomainAddConstraint(List *names, Node *newConstraint) Relation typrel; HeapTuple tup; Form_pg_type typTup; - List *rels; - ListCell *rt; - EState *estate; - ExprContext *econtext; - char *ccbin; - Expr *expr; - ExprState *exprstate; Constraint *constr; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); --- 1942,1949 ---- Relation typrel; HeapTuple tup; Form_pg_type typTup; Constraint *constr; + char *ccbin; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); *************** *** 2027,2036 **** AlterDomainAddConstraint(List *names, Node *newConstraint) constr, NameStr(typTup->typname)); /* ! * Test all values stored in the attributes based on the domain the ! * constraint is being added to. */ ! expr = (Expr *) stringToNode(ccbin); /* Need an EState to run ExecEvalExpr */ estate = CreateExecutorState(); --- 2022,2150 ---- constr, NameStr(typTup->typname)); /* ! * If requested to validate the constraint, test all values stored in the ! * attributes based on the domain the constraint is being added to. */ ! if (!constr->skip_validation) ! validateDomainConstraint(domainoid, ccbin); ! ! /* Clean up */ ! heap_close(typrel, RowExclusiveLock); ! } ! ! /* ! * AlterDomainValidateConstraint ! * ! * Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement. ! */ ! void ! AlterDomainValidateConstraint(List *names, char *constrName) ! { ! TypeName *typename; ! Oid domainoid; ! Relation typrel; ! Relation conrel; ! HeapTuple tup; ! Form_pg_type typTup; ! Form_pg_constraint con; ! Form_pg_constraint copy_con; ! char *conbin; ! SysScanDesc scan; ! Datum val; ! bool found = false; ! bool isnull; ! HeapTuple tuple; ! HeapTuple copyTuple; ! ScanKeyData key; ! ! /* Make a TypeName so we can use standard type lookup machinery */ ! typename = makeTypeNameFromNameList(names); ! domainoid = typenameTypeId(NULL, typename); ! ! /* Look up the domain in the type table */ ! typrel = heap_open(TypeRelationId, AccessShareLock); ! ! tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid)); ! if (!HeapTupleIsValid(tup)) ! elog(ERROR, "cache lookup failed for type %u", domainoid); ! typTup = (Form_pg_type) GETSTRUCT(tup); ! ! /* Check it's a domain and check user has permission for ALTER DOMAIN */ ! checkDomainOwner(tup); ! ! /* ! * Find and check the target constraint ! */ ! conrel = heap_open(ConstraintRelationId, RowExclusiveLock); ! ScanKeyInit(&key, ! Anum_pg_constraint_contypid, ! BTEqualStrategyNumber, F_OIDEQ, ! ObjectIdGetDatum(domainoid)); ! scan = systable_beginscan(conrel, ConstraintTypidIndexId, ! true, SnapshotNow, 1, &key); ! ! while (HeapTupleIsValid(tuple = systable_getnext(scan))) ! { ! con = (Form_pg_constraint) GETSTRUCT(tuple); ! if (strcmp(NameStr(con->conname), constrName) == 0) ! { ! found = true; ! break; ! } ! } ! ! if (!found) ! { ! con = NULL; /* keep compiler quiet */ ! ereport(ERROR, ! (errcode(ERRCODE_UNDEFINED_OBJECT), ! errmsg("constraint \"%s\" of domain \"%s\" does not exist", ! constrName, NameStr(con->conname)))); ! } ! ! if (con->contype != CONSTRAINT_CHECK) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint", ! constrName, NameStr(con->conname)))); ! ! val = SysCacheGetAttr(CONSTROID, tuple, ! Anum_pg_constraint_conbin, ! &isnull); ! if (isnull) ! elog(ERROR, "null conbin for constraint %u", ! HeapTupleGetOid(tuple)); ! conbin = TextDatumGetCString(val); ! ! validateDomainConstraint(domainoid, conbin); ! ! /* ! * Now update the catalog, while we have the door open. ! */ ! copyTuple = heap_copytuple(tuple); ! copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); ! copy_con->convalidated = true; ! simple_heap_update(conrel, ©Tuple->t_self, copyTuple); ! CatalogUpdateIndexes(conrel, copyTuple); ! heap_freetuple(copyTuple); ! ! systable_endscan(scan); ! ! heap_close(typrel, AccessShareLock); ! heap_close(conrel, RowExclusiveLock); ! ! ReleaseSysCache(tup); ! } ! ! static void ! validateDomainConstraint(Oid domainoid, char *ccbin) ! { ! Expr *expr = (Expr *) stringToNode(ccbin); ! List *rels; ! ListCell *rt; ! EState *estate; ! ExprContext *econtext; ! ExprState *exprstate; /* Need an EState to run ExecEvalExpr */ estate = CreateExecutorState(); *************** *** 2092,2102 **** AlterDomainAddConstraint(List *names, Node *newConstraint) } FreeExecutorState(estate); - - /* Clean up */ - heap_close(typrel, RowExclusiveLock); } - /* * get_rels_with_domain * --- 2206,2212 ---- *************** *** 2416,2422 **** domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ ! true, /* Is Validated */ InvalidOid, /* not a relation constraint */ NULL, 0, --- 2526,2532 ---- CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */ false, /* Is Deferred */ ! !constr->skip_validation, /* Is Validated */ InvalidOid, /* not a relation constraint */ NULL, 0, *** a/src/backend/optimizer/util/plancat.c --- b/src/backend/optimizer/util/plancat.c *************** *** 552,558 **** get_relation_data_width(Oid relid, int32 *attr_widths) /* * get_relation_constraints * ! * Retrieve the CHECK constraint expressions of the given relation. * * Returns a List (possibly empty) of constraint expressions. Each one * has been canonicalized, and its Vars are changed to have the varno --- 552,558 ---- /* * get_relation_constraints * ! * Retrieve the validated CHECK constraint expressions of the given relation. * * Returns a List (possibly empty) of constraint expressions. Each one * has been canonicalized, and its Vars are changed to have the varno *************** *** 591,596 **** get_relation_constraints(PlannerInfo *root, --- 591,603 ---- { Node *cexpr; + /* + * If this constraint hasn't been fully validated yet, we must + * ignore it here. + */ + if (!constr->check[i].ccvalid) + continue; + cexpr = stringToNode(constr->check[i].ccbin); /* *************** *** 663,669 **** get_relation_constraints(PlannerInfo *root, * * Detect whether the relation need not be scanned because it has either * self-inconsistent restrictions, or restrictions inconsistent with the ! * relation's CHECK constraints. * * Note: this examines only rel->relid, rel->reloptkind, and * rel->baserestrictinfo; therefore it can be called before filling in --- 670,676 ---- * * Detect whether the relation need not be scanned because it has either * self-inconsistent restrictions, or restrictions inconsistent with the ! * relation's validated CHECK constraints. * * Note: this examines only rel->relid, rel->reloptkind, and * rel->baserestrictinfo; therefore it can be called before filling in *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** *** 2746,2751 **** ConstraintElem: --- 2746,2753 ---- n->location = @1; n->raw_expr = $3; n->cooked_expr = NULL; + n->skip_validation = false; + n->initially_valid = true; if ($5 != 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), *************** *** 2753,2758 **** ConstraintElem: --- 2755,2771 ---- parser_errposition(@5))); $$ = (Node *)n; } + | CHECK '(' a_expr ')' NOT VALID + { + Constraint *n = makeNode(Constraint); + n->contype = CONSTR_CHECK; + n->location = @1; + n->raw_expr = $3; + n->cooked_expr = NULL; + n->skip_validation = true; + n->initially_valid = false; + $$ = (Node *)n; + } | UNIQUE '(' columnList ')' opt_definition OptConsTableSpace ConstraintAttributeSpec { *************** *** 7563,7568 **** AlterDomainStmt: --- 7576,7590 ---- n->behavior = $7; $$ = (Node *)n; } + /* ALTER DOMAIN <domain> VALIDATE CONSTRAINT <name> */ + | ALTER DOMAIN_P any_name VALIDATE CONSTRAINT name + { + AlterDomainStmt *n = makeNode(AlterDomainStmt); + n->subtype = 'V'; + n->typeName = $3; + n->name = $6; + $$ = (Node *)n; + } ; opt_as: AS {} *** a/src/backend/tcop/utility.c --- b/src/backend/tcop/utility.c *************** *** 820,825 **** standard_ProcessUtility(Node *parsetree, --- 820,829 ---- stmt->name, stmt->behavior); break; + case 'V': /* VALIDATE CONSTRAINT */ + AlterDomainValidateConstraint(stmt->typeName, + stmt->name); + break; default: /* oops */ elog(ERROR, "unrecognized alter domain type: %d", (int) stmt->subtype); *** a/src/backend/utils/cache/relcache.c --- b/src/backend/utils/cache/relcache.c *************** *** 3251,3256 **** CheckConstraintFetch(Relation relation) --- 3251,3257 ---- elog(ERROR, "unexpected constraint record found for rel %s", RelationGetRelationName(relation)); + check[found].ccvalid = conform->convalidated; check[found].ccname = MemoryContextStrdup(CacheMemoryContext, NameStr(conform->conname)); *** a/src/include/access/tupdesc.h --- b/src/include/access/tupdesc.h *************** *** 29,34 **** typedef struct constrCheck --- 29,35 ---- { char *ccname; char *ccbin; /* nodeToString representation of expr */ + bool ccvalid; } ConstrCheck; /* This structure contains constraints of a tuple */ *** a/src/include/catalog/heap.h --- b/src/include/catalog/heap.h *************** *** 30,35 **** typedef struct CookedConstraint --- 30,36 ---- char *name; /* name, or NULL if none */ AttrNumber attnum; /* which attr (only for DEFAULT) */ Node *expr; /* transformed default or check expr */ + bool skip_validation; /* skip validation? (only for CHECK) */ bool is_local; /* constraint has local (non-inherited) def */ int inhcount; /* number of times constraint is inherited */ } CookedConstraint; *** a/src/include/commands/typecmds.h --- b/src/include/commands/typecmds.h *************** *** 31,36 **** extern Oid AssignTypeArrayOid(void); --- 31,37 ---- extern void AlterDomainDefault(List *names, Node *defaultRaw); extern void AlterDomainNotNull(List *names, bool notNull); extern void AlterDomainAddConstraint(List *names, Node *constr); + extern void AlterDomainValidateConstraint(List *names, char *constrName); extern void AlterDomainDropConstraint(List *names, const char *constrName, DropBehavior behavior); *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** *** 1190,1195 **** typedef enum AlterTableType --- 1190,1196 ---- AT_AddConstraint, /* add constraint */ AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ValidateConstraint, /* validate constraint */ + AT_ValidateConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ProcessedConstraint, /* pre-processed add constraint (local in * parser/parse_utilcmd.c) */ AT_AddIndexConstraint, /* add constraint using existing index */ *************** *** 1543,1548 **** typedef struct Constraint --- 1544,1551 ---- char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */ char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */ + + /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /* skip validation of existing rows? */ bool initially_valid; /* start the new constraint as valid */ } Constraint; *** a/src/test/regress/expected/alter_table.out --- b/src/test/regress/expected/alter_table.out *************** *** 196,205 **** DELETE FROM tmp3 where a=5; --- 196,241 ---- -- Try (and succeed) and repeat to show it works on already valid constraint ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr; + -- Try a non-verified CHECK constraint + ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail + ERROR: check constraint "b_greater_than_ten" is violated by some row + ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails + ERROR: check constraint "b_greater_than_ten" is violated by some row + DELETE FROM tmp3 WHERE NOT b > 10; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds + -- Test inherited NOT VALID CHECK constraints + select * from tmp3; + a | b + ---+---- + 1 | 20 + (1 row) + + CREATE TABLE tmp6 () INHERITS (tmp3); + CREATE TABLE tmp7 () INHERITS (tmp3); + INSERT INTO tmp6 VALUES (6, 30), (7, 16); + ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails + ERROR: check constraint "b_le_20" is violated by some row + DELETE FROM tmp6 WHERE b > 20; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds + -- An already validated constraint must not be revalidated + CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$; + INSERT INTO tmp7 VALUES (8, 18); + ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b)); + NOTICE: boo: 18 + ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID; + NOTICE: merging constraint "identity" with inherited definition + ALTER TABLE tmp3 VALIDATE CONSTRAINT identity; + NOTICE: boo: 16 + NOTICE: boo: 20 -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- tmp4 is a,b ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; ERROR: there is no unique constraint matching given keys for referenced table "tmp4" + DROP TABLE tmp7; + DROP TABLE tmp6; DROP TABLE tmp5; DROP TABLE tmp4; DROP TABLE tmp3; *** a/src/test/regress/expected/domain.out --- b/src/test/regress/expected/domain.out *************** *** 352,357 **** alter domain con drop constraint t; --- 352,368 ---- insert into domcontest values (-5); --fails ERROR: value for domain con violates check constraint "con_check" insert into domcontest values (42); + -- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID + create domain things AS INT; + CREATE TABLE thethings (stuff things); + INSERT INTO thethings (stuff) VALUES (55); + ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11); + ERROR: column "stuff" of table "thethings" contains values that violate the new constraint + ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID; + ALTER DOMAIN things VALIDATE CONSTRAINT meow; + ERROR: column "stuff" of table "thethings" contains values that violate the new constraint + UPDATE thethings SET stuff = 10; + ALTER DOMAIN things VALIDATE CONSTRAINT meow; -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer); create domain dom as integer; *** a/src/test/regress/sql/alter_table.sql --- b/src/test/regress/sql/alter_table.sql *************** *** 236,247 **** DELETE FROM tmp3 where a=5; --- 236,276 ---- ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr; + -- Try a non-verified CHECK constraint + ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail + ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails + DELETE FROM tmp3 WHERE NOT b > 10; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds + + -- Test inherited NOT VALID CHECK constraints + select * from tmp3; + CREATE TABLE tmp6 () INHERITS (tmp3); + CREATE TABLE tmp7 () INHERITS (tmp3); + + INSERT INTO tmp6 VALUES (6, 30), (7, 16); + ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails + DELETE FROM tmp6 WHERE b > 20; + ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds + + -- An already validated constraint must not be revalidated + CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$; + INSERT INTO tmp7 VALUES (8, 18); + ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b)); + ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID; + ALTER TABLE tmp3 VALIDATE CONSTRAINT identity; -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- tmp4 is a,b ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; + DROP TABLE tmp7; + + DROP TABLE tmp6; + DROP TABLE tmp5; DROP TABLE tmp4; *** a/src/test/regress/sql/domain.sql --- b/src/test/regress/sql/domain.sql *************** *** 259,264 **** alter domain con drop constraint t; --- 259,274 ---- insert into domcontest values (-5); --fails insert into domcontest values (42); + -- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID + create domain things AS INT; + CREATE TABLE thethings (stuff things); + INSERT INTO thethings (stuff) VALUES (55); + ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11); + ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID; + ALTER DOMAIN things VALIDATE CONSTRAINT meow; + UPDATE thethings SET stuff = 10; + ALTER DOMAIN things VALIDATE CONSTRAINT meow; + -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer); create domain dom as integer; -- Álvaro Herrera <alvhe...@commandprompt.com> The PostgreSQL Company - Command Prompt, Inc. PostgreSQL Replication, Consulting, Custom Development, 24x7 support -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers