Wow, that's a heap of code --- that's my only comment.  :-)

---------------------------------------------------------------------------

Tom Lane wrote:
> Stephan Szabo <[EMAIL PROTECTED]> writes:
> > It's not cleaned up, but yes. It appears to work for the simple tests I've
> > done and should fall back if the permissions don't work to do a single
> > query on both tables.
> 
> Here's my code-reviewed version of the patch.  Anyone else want to take
> a look?
> 
> > I wasn't sure what to do about some of the spi error conditions.  For many
> > of them I'm just returning false now so that it will try the other
> > mechanism in case that might work. I'm not really sure if that's worth it,
> > or if I should just elog(ERROR) and give up.
> 
> I think you may as well keep it the same as the other RI routines and
> just elog() on SPI error.  If SPI is broken, the trigger procedure is
> gonna fail too.
> 
> I changed that, consolidated the error-reporting code, and fixed a couple
> other little issues, notably:
> 
> * The generated query applied ONLY to the FK table but not the PK table.
>   I assume this was just an oversight.
> 
> * The query is now run using SPI_execp_current and selecting "current"
>   snapshot.  Without this, we could fail in a serializable transaction
>   if someone else has already committed changes to either relation.
>   For example:
> 
>       create pk and fk tables;
> 
>       begin serializable xact;
>       insert into pk values(1);
>       insert into fk values(1);
> 
>                                               begin;
>                                               insert into fk values(2);
>                                               commit;
> 
>       alter table fk add foreign key ...;
> 
>   The ALTER will not be blocked from acquiring exclusive lock, since
>   T2 already committed.  But if we run the query in the serializable
>   snapshot, it won't see the violating row fk=2.
> 
> The old trigger-based check avoids this error because the scan loop uses
> SnapshotNow to select live rows from the FK table.  There is a dual race
> condition where T2 deletes a row from the PK table.  In current CVS tip
> this will be detected and reported as a serialization failure, because
> T1 won't be able to get SELECT FOR UPDATE lock on the deleted row.  With
> the proposed patch you'll instead see a "no such key" failure, which I
> think is fine, even though it nominally violates serializability.
> 
> Comments?  Can anyone else do a code review (Jan??)?
> 
>                       regards, tom lane
> 

Content-Description: RIcheck.patch

> *** src/backend/commands/tablecmds.c.orig     Thu Oct  2 15:24:52 2003
> --- src/backend/commands/tablecmds.c  Sun Oct  5 16:29:51 2003
> ***************
> *** 3455,3460 ****
> --- 3455,3467 ----
>       int                     count;
>   
>       /*
> +      * See if we can do it with a single LEFT JOIN query.  A FALSE result
> +      * indicates we must proceed with the fire-the-trigger method.
> +      */
> +     if (RI_Initial_Check(fkconstraint, rel, pkrel))
> +             return;
> + 
> +     /*
>        * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
>        * as if that tuple had just been inserted.  If any of those fail, it
>        * should ereport(ERROR) and that's that.
> *** src/backend/utils/adt/ri_triggers.c.orig  Wed Oct  1 17:30:52 2003
> --- src/backend/utils/adt/ri_triggers.c       Sun Oct  5 16:42:37 2003
> ***************
> *** 40,45 ****
> --- 40,46 ----
>   #include "rewrite/rewriteHandler.h"
>   #include "utils/lsyscache.h"
>   #include "utils/typcache.h"
> + #include "utils/acl.h"
>   #include "miscadmin.h"
>   
>   
> ***************
> *** 164,170 ****
>                                Datum *vals, char *nulls);
>   static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
>                                  Relation pk_rel, Relation fk_rel,
> !                                HeapTuple violator, bool spi_err);
>   
>   
>   /* ----------
> --- 165,172 ----
>                                Datum *vals, char *nulls);
>   static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
>                                  Relation pk_rel, Relation fk_rel,
> !                                HeapTuple violator, TupleDesc tupdesc,
> !                                bool spi_err);
>   
>   
>   /* ----------
> ***************
> *** 2540,2546 ****
> --- 2542,2743 ----
>   }
>   
>   
> + /* ----------
> +  * RI_Initial_Check -
> +  *
> +  *  Check an entire table for non-matching values using a single query.
> +  *  This is not a trigger procedure, but is called during ALTER TABLE
> +  *  ADD FOREIGN KEY to validate the initial table contents.
> +  *
> +  *  We expect that an exclusive lock has been taken on rel and pkrel;
> +  *  hence, we do not need to lock individual rows for the check.
> +  *
> +  *  If the check fails because the current user doesn't have permissions
> +  *  to read both tables, return false to let our caller know that they will
> +  *  need to do something else to check the constraint.
> +  * ----------
> +  */
> + bool
> + RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
> + {
> +     const char *constrname = fkconstraint->constr_name;
> +     char            querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
> +                                             (MAX_QUOTED_NAME_LEN + 32) * 
> ((RI_MAX_NUMKEYS * 4)+1)];
> +     char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
> +     char            relname[MAX_QUOTED_REL_NAME_LEN];
> +     char            attname[MAX_QUOTED_NAME_LEN];
> +     char            fkattname[MAX_QUOTED_NAME_LEN];
> +     const char *sep;
> +     List            *list;
> +     List            *list2;
> +     int                     spi_result;
> +     void            *qplan;
> + 
> +     /*
> +      * Check to make sure current user has enough permissions to do the
> +      * test query.  (If not, caller can fall back to the trigger method,
> +      * which works because it changes user IDs on the fly.)
> +      *
> +      * XXX are there any other show-stopper conditions to check?
> +      */
> +     if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != 
> ACLCHECK_OK)
> +             return false;
> +     if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != 
> ACLCHECK_OK) 
> +             return false;
> + 
> +     /*----------
> +      * The query string built is:
> +      *  SELECT fk.keycols FROM ONLY relname fk 
> +      *   LEFT OUTER JOIN ONLY pkrelname pk 
> +      *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
> +      *   WHERE pk.pkkeycol1 IS NULL AND
> +      * For MATCH unspecified:
> +      *   (fk.keycol1 IS NOT NULL [AND ...])
> +      * For MATCH FULL:
> +      *   (fk.keycol1 IS NOT NULL [OR ...])
> +      *----------
> +      */
> + 
> +     sprintf(querystr, "SELECT ");
> +     sep="";
> +     foreach(list, fkconstraint->fk_attrs)
> +     {
> +             quoteOneName(attname, strVal(lfirst(list)));
> +             snprintf(querystr + strlen(querystr), sizeof(querystr) - 
> strlen(querystr),
> +                              "%sfk.%s", sep, attname);
> +             sep = ", ";
> +     }
> + 
> +     quoteRelationName(pkrelname, pkrel);
> +     quoteRelationName(relname, rel);
> +     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
> +                      " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
> +                      relname, pkrelname);
> + 
> +     sep="";
> +     for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; 
> +              list != NIL && list2 != NIL; 
> +              list=lnext(list), list2=lnext(list2))
> +     {
> +             quoteOneName(attname, strVal(lfirst(list)));
> +             quoteOneName(fkattname, strVal(lfirst(list2)));
> +             snprintf(querystr + strlen(querystr), sizeof(querystr) - 
> strlen(querystr),
> +                              "%spk.%s=fk.%s",
> +                              sep, attname, fkattname);
> +             sep = " AND ";
> +     }
> +     /*
> +      * It's sufficient to test any one pk attribute for null to detect a
> +      * join failure.
> +      */
> +     quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
> +     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
> +                      ") WHERE pk.%s IS NULL AND (", attname);
> + 
> +     sep="";
> +     foreach(list, fkconstraint->fk_attrs)
> +     {
> +             quoteOneName(attname, strVal(lfirst(list)));
> +             snprintf(querystr + strlen(querystr), sizeof(querystr) - 
> strlen(querystr),
> +                              "%sfk.%s IS NOT NULL",
> +                              sep, attname);
> +             switch (fkconstraint->fk_matchtype)
> +             {
> +                     case FKCONSTR_MATCH_UNSPECIFIED:
> +                             sep=" AND ";
> +                             break;
> +                     case FKCONSTR_MATCH_FULL:
> +                             sep=" OR ";
> +                             break;
> +                     case FKCONSTR_MATCH_PARTIAL:
> +                             ereport(ERROR,
> +                                             
> (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> +                                              errmsg("MATCH PARTIAL not yet 
> implemented")));
> +                             break;
> +                     default:
> +                             elog(ERROR, "unrecognized match type: %d",
> +                                      fkconstraint->fk_matchtype);
> +                             break;
> +             }
> +     }
> +     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
> +                      ")");
> + 
> +     if (SPI_connect() != SPI_OK_CONNECT)
> +             elog(ERROR, "SPI_connect failed");
> + 
> +     /*
> +      * Generate the plan.  We don't need to cache it, and there are no
> +      * arguments to the plan. 
> +      */
> +     qplan = SPI_prepare(querystr, 0, NULL);
> + 
> +     /*
> +      * Run the plan.  For safety we force a current query snapshot to be
> +      * used.  (In serializable mode, this arguably violates serializability,
> +      * but we really haven't got much choice.)  We need at most one tuple
> +      * returned, so pass limit = 1.
> +      */
> +     spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
>   
> +     /* Check result */
> +     if (spi_result != SPI_OK_SELECT)
> +             elog(ERROR, "SPI_execp_current returned %d", spi_result);
> + 
> +     /* Did we find a tuple violating the constraint? */
> +     if (SPI_processed > 0)
> +     {
> +             HeapTuple       tuple = SPI_tuptable->vals[0];
> +             TupleDesc       tupdesc = SPI_tuptable->tupdesc;
> +             int                     nkeys = length(fkconstraint->fk_attrs);
> +             int                     i;
> +             RI_QueryKey     qkey;
> + 
> +             /*
> +              * If it's MATCH FULL, and there are any nulls in the FK keys,
> +              * complain about that rather than the lack of a match.  MATCH FULL
> +              * disallows partially-null FK rows.
> +              */
> +             if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
> +             {
> +                     bool    isnull = false;
> + 
> +                     for (i = 1; i <= nkeys; i++)
> +                     {
> +                             (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
> +                             if (isnull)
> +                                     break;
> +                     }
> +                     if (isnull)
> +                             ereport(ERROR,
> +                                             
> (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
> +                                              errmsg("insert or update on table 
> \"%s\" violates foreign key constraint \"%s\"",
> +                                                             
> RelationGetRelationName(rel),
> +                                                             constrname),
> +                                              errdetail("MATCH FULL does not allow 
> mixing of null and nonnull key values.")));
> +             }
> + 
> +             /*
> +              * Although we didn't cache the query, we need to set up a fake
> +              * query key to pass to ri_ReportViolation.
> +              */
> +             MemSet(&qkey, 0, sizeof(qkey));
> +             qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
> +             qkey.nkeypairs = nkeys;
> +             for (i = 0; i < nkeys; i++)
> +                     qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
> + 
> +             ri_ReportViolation(&qkey, constrname,
> +                                                pkrel, rel,
> +                                                tuple, tupdesc,
> +                                                false);
> +     }
> + 
> +     if (SPI_finish() != SPI_OK_FINISH)
> +             elog(ERROR, "SPI_finish failed");
> + 
> +     return true;
> + }
>   
>   
>   /* ----------
> ***************
> *** 2905,2910 ****
> --- 3102,3108 ----
>               ri_ReportViolation(qkey, constrname ? constrname : "",
>                                                  pk_rel, fk_rel,
>                                                  new_tuple ? new_tuple : old_tuple,
> +                                                NULL,
>                                                  true);
>   
>       /* XXX wouldn't it be clearer to do this part at the caller? */
> ***************
> *** 2913,2918 ****
> --- 3111,3117 ----
>               ri_ReportViolation(qkey, constrname,
>                                                  pk_rel, fk_rel,
>                                                  new_tuple ? new_tuple : old_tuple,
> +                                                NULL,
>                                                  false);
>   
>       return SPI_processed != 0;
> ***************
> *** 2950,2956 ****
>   static void
>   ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
>                                  Relation pk_rel, Relation fk_rel,
> !                                HeapTuple violator, bool spi_err)
>   {
>   #define BUFLENGTH   512
>       char            key_names[BUFLENGTH];
> --- 3149,3156 ----
>   static void
>   ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
>                                  Relation pk_rel, Relation fk_rel,
> !                                HeapTuple violator, TupleDesc tupdesc,
> !                                bool spi_err)
>   {
>   #define BUFLENGTH   512
>       char            key_names[BUFLENGTH];
> ***************
> *** 2958,2964 ****
>       char       *name_ptr = key_names;
>       char       *val_ptr = key_values;
>       bool            onfk;
> -     Relation        rel;
>       int                     idx,
>                               key_idx;
>   
> --- 3158,3163 ----
> ***************
> *** 2972,2989 ****
>                                errhint("This is most likely due to a rule having 
> rewritten the query.")));
>   
>       /*
> !      * rel is set to where the tuple description is coming from.
>        */
>       onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
>       if (onfk)
>       {
> -             rel = fk_rel;
>               key_idx = RI_KEYPAIR_FK_IDX;
>       }
>       else
>       {
> -             rel = pk_rel;
>               key_idx = RI_KEYPAIR_PK_IDX;
>       }
>   
>       /*
> --- 3171,3191 ----
>                                errhint("This is most likely due to a rule having 
> rewritten the query.")));
>   
>       /*
> !      * Determine which relation to complain about.  If tupdesc wasn't
> !      * passed by caller, assume the violator tuple came from there.
>        */
>       onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
>       if (onfk)
>       {
>               key_idx = RI_KEYPAIR_FK_IDX;
> +             if (tupdesc == NULL)
> +                     tupdesc = fk_rel->rd_att;
>       }
>       else
>       {
>               key_idx = RI_KEYPAIR_PK_IDX;
> +             if (tupdesc == NULL)
> +                     tupdesc = pk_rel->rd_att;
>       }
>   
>       /*
> ***************
> *** 3008,3015 ****
>               char       *name,
>                                  *val;
>   
> !             name = SPI_fname(rel->rd_att, fnum);
> !             val = SPI_getvalue(violator, rel->rd_att, fnum);
>               if (!val)
>                       val = "null";
>   
> --- 3210,3217 ----
>               char       *name,
>                                  *val;
>   
> !             name = SPI_fname(tupdesc, fnum);
> !             val = SPI_getvalue(violator, tupdesc, fnum);
>               if (!val)
>                       val = "null";
>   
> *** src/include/commands/trigger.h.orig       Sun Aug  3 23:01:31 2003
> --- src/include/commands/trigger.h    Sun Oct  5 16:11:44 2003
> ***************
> *** 197,201 ****
> --- 197,204 ----
>    * in utils/adt/ri_triggers.c
>    */
>   extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
> + extern bool RI_Initial_Check(FkConstraint *fkconstraint, 
> +                                                      Relation rel, 
> +                                                      Relation pkrel);
>   
>   #endif   /* TRIGGER_H */

> 
> ---------------------------(end of broadcast)---------------------------
> TIP 9: the planner will ignore your desire to choose an index scan if your
>       joining column's datatypes do not match

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  [EMAIL PROTECTED]               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

---------------------------(end of broadcast)---------------------------
TIP 5: Have you checked our extensive FAQ?

               http://www.postgresql.org/docs/faqs/FAQ.html

Reply via email to