On Tuesday, November 19, 2024 9:42 PM Shlok Kyal <shlok.kyal....@gmail.com> 
wrote:
> I agree that we can remove the test. I debugged and found the test modified in
> above patch does not hit the condition added in commit adedf54.
> Also, according to me we cannot trigger the bug after the fix in this thread. 
> So, I
> think we can remove the testcase.
> 
> I have attached the latest patch with an updated commit message and also
> removed the testcase.

Thanks for updating the patch.

Out of curiosity, I tried to merge the pub_collist_contains_invalid_column()
and replident_has_unpublished_gen_col() functions into a single function to
evaluate if this approach might be better.

As shown in the attached diff, it reduces some code redundancy *but* also
introduces additional IF conditions and parameters in the new combined
function, which might complicate the logic a bit.

Personally, I think the existing V10 patch looks good and clear. I am just
sharing the diff for feedback in case others want to have a look.

Best Regards,
Hou zj
From 8640a4605f32f8072d093902da0af23dfa6d119b Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.f...@cn.fujitsu.com>
Date: Wed, 20 Nov 2024 16:34:04 +0800
Subject: [PATCH v10] combine functions

---
 src/backend/commands/publicationcmds.c | 219 ++++++++++---------------
 src/backend/utils/cache/relcache.c     |  38 ++---
 src/include/commands/publicationcmds.h |   7 +-
 3 files changed, 105 insertions(+), 159 deletions(-)

diff --git a/src/backend/commands/publicationcmds.c 
b/src/backend/commands/publicationcmds.c
index 053877c524..0d5daf7626 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -336,21 +336,36 @@ pub_rf_contains_invalid_column(Oid pubid, Relation 
relation, List *ancestors,
 }
 
 /*
- * Check if all columns referenced in the REPLICA IDENTITY are covered by
- * the column list.
+ * Check for invalid columns in the publication table definition.
  *
- * Returns true if any replica identity column is not covered by column list.
+ * This function evaluates two conditions:
+ *
+ * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
+ *    by the column list. If any column is missing, *invalid_column_list is set
+ *    to true.
+ *
+ * 2. Ensures that the REPLICA IDENTITY does not contain unpublished generated
+ *    columns. If an unpublished generated column is found,
+ *    *unpublished_gen_col is set to true.
+ *
+ * Returns true if any of the above conditions are not met.
  */
 bool
-pub_collist_contains_invalid_column(Oid pubid, Relation relation, List 
*ancestors,
-                                                                       bool 
pubviaroot)
+pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
+                                                       bool pubviaroot, bool 
pubgencols,
+                                                       bool 
*invalid_column_list,
+                                                       bool 
*unpublished_gen_col)
 {
-       HeapTuple       tuple;
        Oid                     relid = RelationGetRelid(relation);
        Oid                     publish_as_relid = RelationGetRelid(relation);
-       bool            result = false;
-       Datum           datum;
-       bool            isnull;
+       Bitmapset  *idattrs;
+       Bitmapset  *columns = NULL;
+       TupleDesc       desc = RelationGetDescr(relation);
+       Publication *pub;
+       int                     x;
+
+       *invalid_column_list = false;
+       *unpublished_gen_col = false;
 
        /*
         * For a partition, if pubviaroot is true, find the topmost ancestor 
that
@@ -368,158 +383,90 @@ pub_collist_contains_invalid_column(Oid pubid, Relation 
relation, List *ancestor
                        publish_as_relid = relid;
        }
 
-       tuple = SearchSysCache2(PUBLICATIONRELMAP,
-                                                       
ObjectIdGetDatum(publish_as_relid),
-                                                       
ObjectIdGetDatum(pubid));
-
-       if (!HeapTupleIsValid(tuple))
-               return false;
-
-       datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
-                                                       
Anum_pg_publication_rel_prattrs,
-                                                       &isnull);
+       /* Fetch the column list */
+       pub = GetPublication(pubid);
+       check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
 
-       if (!isnull)
+       if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
        {
-               int                     x;
-               Bitmapset  *idattrs;
-               Bitmapset  *columns = NULL;
-
                /* With REPLICA IDENTITY FULL, no column list is allowed. */
-               if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-                       result = true;
-
-               /* Transform the column list datum to a bitmapset. */
-               columns = pub_collist_to_bitmapset(NULL, datum, NULL);
-
-               /* Remember columns that are part of the REPLICA IDENTITY */
-               idattrs = RelationGetIndexAttrBitmap(relation,
-                                                                               
         INDEX_ATTR_BITMAP_IDENTITY_KEY);
+               *invalid_column_list = (columns != NULL);
 
                /*
-                * Attnums in the bitmap returned by RelationGetIndexAttrBitmap 
are
-                * offset (to handle system columns the usual way), while 
column list
-                * does not use offset, so we can't do bms_is_subset(). 
Instead, we
-                * have to loop over the idattrs and check all of them are in 
the
-                * list.
+                * REPLICA IDENTITY can be FULL only if there is no column list 
for
+                * publication. When REPLICA IDENTITY is FULL and the relation
+                * includes a generated column, but the 
publish_generated_columns
+                * option is set to false, this scenario is invalid.
                 */
-               x = -1;
-               while ((x = bms_next_member(idattrs, x)) >= 0)
-               {
-                       AttrNumber      attnum = (x + 
FirstLowInvalidHeapAttributeNumber);
-
-                       /*
-                        * If pubviaroot is true, we are validating the column 
list of the
-                        * parent table, but the bitmap contains the replica 
identity
-                        * information of the child table. The parent/child 
attnums may
-                        * not match, so translate them to the parent - get the 
attname
-                        * from the child, and look it up in the parent.
-                        */
-                       if (pubviaroot)
-                       {
-                               /* attribute name in the child table */
-                               char       *colname = get_attname(relid, 
attnum, false);
-
-                               /*
-                                * Determine the attnum for the attribute name 
in parent (we
-                                * are using the column list defined on the 
parent).
-                                */
-                               attnum = get_attnum(publish_as_relid, colname);
-                       }
-
-                       /* replica identity column, not covered by the column 
list */
-                       if (!bms_is_member(attnum, columns))
-                       {
-                               result = true;
-                               break;
-                       }
-               }
+               if (!pubgencols && relation->rd_att->constr &&
+                       relation->rd_att->constr->has_generated_stored)
+                       *unpublished_gen_col = true;
 
-               bms_free(idattrs);
-               bms_free(columns);
+               if (*unpublished_gen_col && *invalid_column_list)
+                       return true;
        }
 
-       ReleaseSysCache(tuple);
-
-       return result;
-}
-
-/*
- * Check if REPLICA IDENTITY consists of any unpublished generated column.
- *
- * Returns true if any replica identity column is an unpublished generated
- * column.
- */
-bool
-replident_has_unpublished_gen_col(Oid pubid, Relation relation, List 
*ancestors,
-                                                                 bool 
pubviaroot)
-{
-       Oid                     relid = RelationGetRelid(relation);
-       Oid                     publish_as_relid = RelationGetRelid(relation);
-       bool            result = false;
-       bool            found;
-       Publication *pub;
-
-       /* Return if the table does not contain any generated columns */
-       if (!relation->rd_att->constr ||
-               !relation->rd_att->constr->has_generated_stored)
-               return false;
+       /* Remember columns that are part of the REPLICA IDENTITY */
+       idattrs = RelationGetIndexAttrBitmap(relation,
+                                                                               
 INDEX_ATTR_BITMAP_IDENTITY_KEY);
 
        /*
-        * For a partition, if pubviaroot is true, find the topmost ancestor 
that
-        * is published via this publication as we need to use its column list 
for
-        * the changes.
-        *
-        * Note that even though the column list used is for an ancestor, the
-        * REPLICA IDENTITY used will be for the actual child table.
+        * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are 
offset
+        * (to handle system columns the usual way), while column list does not
+        * use offset, so we can't do bms_is_subset(). Instead, we have to loop
+        * over the idattrs and check all of them are in the list.
         */
-       if (pubviaroot && relation->rd_rel->relispartition)
+       x = -1;
+       while ((x = bms_next_member(idattrs, x)) >= 0)
        {
-               publish_as_relid = GetTopMostAncestorInPublication(pubid, 
ancestors, NULL);
+               AttrNumber      attnum = (x + 
FirstLowInvalidHeapAttributeNumber);
+               Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
 
-               if (!OidIsValid(publish_as_relid))
-                       publish_as_relid = relid;
-       }
+               Assert(!att->attisdropped);
 
-       pub = GetPublication(pubid);
-       found = check_and_fetch_column_list(pub, publish_as_relid, NULL, NULL);
+               /* Check if generated column is part of REPLICA IDENTITY */
+               *unpublished_gen_col |= att->attgenerated;
 
-       if (!found)
-       {
-               TupleDesc       desc = RelationGetDescr(relation);
-               Bitmapset  *idattrs;
-               int                     x;
+               if (columns == NULL)
+               {
+                       /* Break the loop if unpublished generated columns 
exist. */
+                       if (*unpublished_gen_col)
+                               break;
+
+                       /* Skip validating the column list since it is not 
defined */
+                       continue;
+               }
 
                /*
-                * REPLICA IDENTITY can be FULL only if there is no column list 
for
-                * publication. If REPLICA IDENTITY is set as FULL and relation 
has a
-                * generated column we should error out.
+                * If pubviaroot is true, we are validating the column list of 
the
+                * parent table, but the bitmap contains the replica identity
+                * information of the child table. The parent/child attnums may 
not
+                * match, so translate them to the parent - get the attname 
from the
+                * child, and look it up in the parent.
                 */
-               if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-                       return true;
-
-               /* Remember columns that are part of the REPLICA IDENTITY */
-               idattrs = RelationGetIndexAttrBitmap(relation,
-                                                                               
         INDEX_ATTR_BITMAP_IDENTITY_KEY);
-
-               x = -1;
-               while ((x = bms_next_member(idattrs, x)) >= 0)
+               if (pubviaroot)
                {
-                       AttrNumber      attnum = (x + 
FirstLowInvalidHeapAttributeNumber);
-                       Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
+                       /* attribute name in the child table */
+                       char       *colname = get_attname(relid, attnum, false);
 
-                       /* Check if generated column is part of REPLICA 
IDENTITY */
-                       if (!att->attisdropped && att->attgenerated)
-                       {
-                               result = true;
-                               break;
-                       }
+                       /*
+                        * Determine the attnum for the attribute name in 
parent (we are
+                        * using the column list defined on the parent).
+                        */
+                       attnum = get_attnum(publish_as_relid, colname);
                }
 
-               bms_free(idattrs);
+               /* replica identity column, not covered by the column list */
+               *invalid_column_list |= !bms_is_member(attnum, columns);
+
+               if (*invalid_column_list && *unpublished_gen_col)
+                       break;
        }
 
-       return result;
+       bms_free(columns);
+       bms_free(idattrs);
+
+       return *invalid_column_list || *unpublished_gen_col;
 }
 
 /* check_functions_in_node callback */
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index be8f8eea8f..c1199596bf 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5783,6 +5783,8 @@ RelationBuildPublicationDesc(Relation relation, 
PublicationDesc *pubdesc)
                Oid                     pubid = lfirst_oid(lc);
                HeapTuple       tup;
                Form_pg_publication pubform;
+               bool            invalid_column_list;
+               bool            unpublished_gen_col;
 
                tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
 
@@ -5817,33 +5819,27 @@ RelationBuildPublicationDesc(Relation relation, 
PublicationDesc *pubdesc)
                /*
                 * Check if all columns are part of the REPLICA IDENTITY index 
or not.
                 *
-                * If the publication is FOR ALL TABLES then it means the table 
has no
-                * column list and we can skip the validation.
-                */
-               if (!pubform->puballtables &&
-                       (pubform->pubupdate || pubform->pubdelete) &&
-                       pub_collist_contains_invalid_column(pubid, relation, 
ancestors,
-                                                                               
                pubform->pubviaroot))
-               {
-                       if (pubform->pubupdate)
-                               pubdesc->cols_valid_for_update = false;
-                       if (pubform->pubdelete)
-                               pubdesc->cols_valid_for_delete = false;
-               }
-
-               /*
                 * Check if all generated columns included in the REPLICA 
IDENTITY are
                 * published.
                 */
-               if (!pubform->pubgencols &&
-                       (pubform->pubupdate || pubform->pubdelete) &&
-                       replident_has_unpublished_gen_col(pubid, relation, 
ancestors,
-                                                                               
          pubform->pubviaroot))
+               if ((pubform->pubupdate || pubform->pubdelete) &&
+                       pub_contains_invalid_column(pubid, relation, ancestors,
+                                                                               
pubform->pubviaroot,
+                                                                               
pubform->pubgencols,
+                                                                               
&invalid_column_list,
+                                                                               
&unpublished_gen_col))
                {
                        if (pubform->pubupdate)
-                               pubdesc->replident_valid_for_update = false;
+                       {
+                               pubdesc->cols_valid_for_update = 
!invalid_column_list;
+                               pubdesc->replident_valid_for_update = 
!unpublished_gen_col;
+                       }
+
                        if (pubform->pubdelete)
-                               pubdesc->replident_valid_for_delete = false;
+                       {
+                               pubdesc->cols_valid_for_delete = 
!invalid_column_list;
+                               pubdesc->replident_valid_for_delete = 
!unpublished_gen_col;
+                       }
                }
 
                ReleaseSysCache(tup);
diff --git a/src/include/commands/publicationcmds.h 
b/src/include/commands/publicationcmds.h
index b18e576b77..fbf78ea08e 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -33,8 +33,11 @@ extern void AlterPublicationOwner_oid(Oid subid, Oid 
newOwnerId);
 extern void InvalidatePublicationRels(List *relids);
 extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation,
                                                                                
   List *ancestors, bool pubviaroot);
-extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation,
-                                                                               
                List *ancestors, bool pubviaroot);
+extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
+                                                                               
List *ancestors, bool pubviaroot,
+                                                                               
bool pubgencols,
+                                                                               
bool *invalid_column_list,
+                                                                               
bool *unpublished_gen_col);
 extern bool replident_has_unpublished_gen_col(Oid pubid, Relation relation,
                                                                                
          List *ancestors, bool pubviaroot);
 
-- 
2.30.0.windows.2

Reply via email to