Hi Vignesh, I have reviewed your latest patchset:

v20240814-0001. No comments
v20240814-0002. No comments
v20240814-0003. No comments
v20240814-0004. See below
v20240814-0005. No comments

//////

v20240814-0004.

======
src/backend/commands/subscriptioncmds.c

CreateSubscription:
nit - XXX comments

AlterSubscription_refresh:
nit - unnecessary parens in ereport

AlterSubscription:
nit - unnecessary parens in ereport

fetch_sequence_list:
nit - unnecessary parens in ereport

======
.../replication/logical/sequencesync.c

1. fetch_remote_sequence_data

+ * Returns:
+ * - TRUE if there are discrepancies between the sequence parameters in
+ *   the publisher and subscriber.
+ * - FALSE if the parameters match.
+ */
+static bool
+fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid,
+    char *nspname, char *relname, int64 *log_cnt,
+    bool *is_called, XLogRecPtr *page_lsn,
+    int64 *last_value)

IMO it is more natural to return TRUE for good results and FALSE for
bad ones. (FYI, I have implemented this reversal in the nitpicks
attachment).

~

nit - swapped columns seqmin and seqmax in the SQL to fetch them in
the natural order
nit - unnecessary parens in ereport

~~~

copy_sequence:
nit - update function comment to document the output parameter
nit - Assert that *sequence_mismatch is false on entry to this function
nit - tweak wrapping and add \n in the SQL
nit - unnecessary parens in ereport

report_sequence_mismatch:
nit - modify function comment
nit - function name changed
/report_sequence_mismatch/report_mismatched_sequences/ (now plural
(and more like the other one)

append_mismatched_sequences:
nit - param name /rel/seqrel/

~~~

2. LogicalRepSyncSequences:
+ Relation sequence_rel;
+ XLogRecPtr sequence_lsn;
+ bool sequence_mismatch;

The 'sequence_mismatch' variable must be initialized false, otherwise
we cannot trust it gets assigned.

~

LogicalRepSyncSequences:
nit - unnecessary parens in ereport
nit - move the for-loop variable declaration
nit - remove a blank line

process_syncing_sequences_for_apply:
nit - variable declaration indent

======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 9fff288..22115bd 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -726,10 +726,10 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
        /*
-        * XXX: If the subscription is for a sequence-only publication,
-        * creating this origin is unnecessary at this point. It can be created
-        * later during the ALTER SUBSCRIPTION ... REFRESH command, if the
-        * publication is updated to include tables or tables in schemas.
+        * XXX: If the subscription is for a sequence-only publication, creating
+        * this origin is unnecessary. It can be created later during the ALTER
+        * SUBSCRIPTION ... REFRESH command, if the publication is updated to
+        * include tables or tables in schemas.
         */
        ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, 
sizeof(originname));
        replorigin_create(originname);
@@ -800,9 +800,9 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                         * export it.
                         *
                         * XXX: If the subscription is for a sequence-only 
publication,
-                        * creating this slot is not necessary at the moment. 
It can be
-                        * created during the ALTER SUBSCRIPTION ... REFRESH 
command if the
-                        * publication is updated to include tables or tables 
in schema.
+                        * creating this slot. It can be created later during 
the ALTER
+                        * SUBSCRIPTION ... REFRESH command, if the publication 
is updated
+                        * to include tables or tables in schema.
                         */
                        if (opts.create_slot)
                        {
@@ -1021,9 +1021,9 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
                                                                                
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
                                                                                
InvalidXLogRecPtr, true);
                                ereport(DEBUG1,
-                                               (errmsg_internal("%s \"%s.%s\" 
added to subscription \"%s\"",
-                                                                               
 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
-                                                                               
 rv->schemaname, rv->relname, sub->name)));
+                                               errmsg_internal("%s \"%s.%s\" 
added to subscription \"%s\"",
+                                                                               
relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+                                                                               
rv->schemaname, rv->relname, sub->name));
                        }
                }
 
@@ -1125,11 +1125,11 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
                                }
 
                                ereport(DEBUG1,
-                                               (errmsg_internal("%s \"%s.%s\" 
removed from subscription \"%s\"",
+                                               errmsg_internal("%s \"%s.%s\" 
removed from subscription \"%s\"",
                                                                                
 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
                                                                                
 get_namespace_name(get_rel_namespace(relid)),
                                                                                
 get_rel_name(relid),
-                                                                               
 sub->name)));
+                                                                               
 sub->name));
                        }
                }
 
@@ -1615,8 +1615,8 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                        {
                                if (!sub->enabled)
                                        ereport(ERROR,
-                                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                                        errmsg("ALTER 
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled 
subscriptions")));
+                                                       
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                       errmsg("ALTER 
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES is not allowed for disabled 
subscriptions"));
 
                                PreventInTransactionBlock(isTopLevel, "ALTER 
SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES");
 
@@ -2494,8 +2494,8 @@ fetch_sequence_list(WalReceiverConn *wrconn, char 
*subname, List *publications)
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
-                               (errmsg("could not receive list of sequences 
from the publisher: %s",
-                                               res->err)));
+                               errmsg("could not receive list of sequences 
from the publisher: %s",
+                                               res->err));
 
        /* Process sequences. */
        slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
diff --git a/src/backend/replication/logical/sequencesync.c 
b/src/backend/replication/logical/sequencesync.c
index 8211121..1f45564 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -82,9 +82,8 @@ List *sequence_states_not_ready = NIL;
  * - last_value: The last value of the sequence.
  *
  * Returns:
- * - TRUE if there are discrepancies between the sequence parameters in
- *   the publisher and subscriber.
- * - FALSE if the parameters match.
+ * - TRUE if parameters match for the local and remote sequences.
+ * - FALSE if parameters differ for the local and remote sequences.
  */
 static bool
 fetch_remote_sequence_data(WalReceiverConn *conn, Oid relid, Oid remoteid,
@@ -101,17 +100,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
relid, Oid remoteid,
        Oid                     seqtypid;
        int64           seqstart;
        int64           seqincrement;
-       int64           seqmax;
        int64           seqmin;
+       int64           seqmax;
        bool            seqcycle;
-       bool            seq_not_match = false;
+       bool            seq_params_match;
        HeapTuple       tup;
        Form_pg_sequence seqform;
 
        initStringInfo(&cmd);
        appendStringInfo(&cmd,
                                         "SELECT last_value, log_cnt, 
is_called, page_lsn,\n"
-                                        "seqtypid, seqstart, seqincrement, 
seqmax, seqmin, seqcycle\n"
+                                        "seqtypid, seqstart, seqincrement, 
seqmin, seqmax, seqcycle\n"
                                         "FROM pg_sequence_state(%d), 
pg_sequence where seqrelid = %d",
                                         remoteid, remoteid);
 
@@ -120,16 +119,16 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
relid, Oid remoteid,
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
-                               (errmsg("could not receive sequence list from 
the publisher: %s",
-                                               res->err)));
+                               errmsg("could not receive sequence list from 
the publisher: %s",
+                                          res->err));
 
        /* Process the sequence. */
        slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
        if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_OBJECT),
-                                errmsg("sequence \"%s.%s\" not found on 
publisher",
-                                               nspname, relname)));
+                               errcode(ERRCODE_UNDEFINED_OBJECT),
+                               errmsg("sequence \"%s.%s\" not found on 
publisher",
+                                               nspname, relname));
 
        *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
        Assert(!isnull);
@@ -152,10 +151,10 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
relid, Oid remoteid,
        seqincrement = DatumGetInt64(slot_getattr(slot, 7, &isnull));
        Assert(!isnull);
 
-       seqmax = DatumGetInt64(slot_getattr(slot, 8, &isnull));
+       seqmin = DatumGetInt64(slot_getattr(slot, 8, &isnull));
        Assert(!isnull);
 
-       seqmin = DatumGetInt64(slot_getattr(slot, 9, &isnull));
+       seqmax = DatumGetInt64(slot_getattr(slot, 9, &isnull));
        Assert(!isnull);
 
        seqcycle = DatumGetBool(slot_getattr(slot, 10, &isnull));
@@ -169,16 +168,17 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
relid, Oid remoteid,
 
        seqform = (Form_pg_sequence) GETSTRUCT(tup);
 
-       if (seqform->seqtypid != seqtypid || seqform->seqmin != seqmin ||
-               seqform->seqmax != seqmax || seqform->seqstart != seqstart ||
-               seqform->seqincrement != seqincrement || seqform->seqcycle != 
seqcycle)
-               seq_not_match = true;
+       seq_params_match = seqform->seqtypid == seqtypid &&
+               seqform->seqmin == seqmin && seqform->seqmax == seqmax &&
+               seqform->seqcycle == seqcycle &&
+               seqform->seqstart == seqstart &&
+               seqform->seqincrement == seqincrement;
 
        ReleaseSysCache(tup);
        ExecDropSingleTupleTableSlot(slot);
        walrcv_clear_result(res);
 
-       return seq_not_match;
+       return seq_params_match;
 }
 
 /*
@@ -187,6 +187,9 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
relid, Oid remoteid,
  * Fetch the sequence value from the publisher and set the subscriber sequence
  * with the same value. Caller is responsible for locking the local
  * relation.
+ *
+ * The output parameter 'sequence_mismatch' indicates if a local/remote
+ * sequence parameter mismatch was detected.
  */
 static XLogRecPtr
 copy_sequence(WalReceiverConn *conn, Relation rel,
@@ -207,14 +210,15 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
        char       *relname = RelationGetRelationName(rel);
        Oid                     relid = RelationGetRelid(rel);
 
+       Assert(!*sequence_mismatch);
+
        /* Fetch Oid. */
        initStringInfo(&cmd);
-       appendStringInfo(&cmd, "SELECT c.oid, c.relkind"
-                                        "  FROM pg_catalog.pg_class c"
-                                        "  INNER JOIN pg_catalog.pg_namespace 
n"
-                                        "        ON (c.relnamespace = n.oid)"
-                                        " WHERE n.nspname = %s"
-                                        "   AND c.relname = %s",
+       appendStringInfo(&cmd, "SELECT c.oid, c.relkind\n"
+                                        "FROM pg_catalog.pg_class c\n"
+                                        "INNER JOIN pg_catalog.pg_namespace 
n\n"
+                                        "  ON (c.relnamespace = n.oid)\n"
+                                        "WHERE n.nspname = %s AND c.relname = 
%s",
                                         quote_literal_cstr(nspname),
                                         quote_literal_cstr(relname));
 
@@ -222,16 +226,16 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
                                          lengthof(tableRow), tableRow);
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
-                               (errcode(ERRCODE_CONNECTION_FAILURE),
-                                errmsg("sequence \"%s.%s\" info could not be 
fetched from publisher: %s",
-                                               nspname, relname, res->err)));
+                               errcode(ERRCODE_CONNECTION_FAILURE),
+                               errmsg("sequence \"%s.%s\" info could not be 
fetched from publisher: %s",
+                                          nspname, relname, res->err));
 
        slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
        if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_OBJECT),
-                                errmsg("sequence \"%s.%s\" not found on 
publisher",
-                                               nspname, relname)));
+                               errcode(ERRCODE_UNDEFINED_OBJECT),
+                               errmsg("sequence \"%s.%s\" not found on 
publisher",
+                                          nspname, relname));
 
        remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
        Assert(!isnull);
@@ -242,7 +246,7 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
        ExecDropSingleTupleTableSlot(slot);
        walrcv_clear_result(res);
 
-       *sequence_mismatch = fetch_remote_sequence_data(conn, relid, remoteid,
+       *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid,
                                                                                
                        nspname, relname,
                                                                                
                        &seq_log_cnt, &seq_is_called,
                                                                                
                        &seq_page_lsn, &seq_last_value);
@@ -255,12 +259,12 @@ copy_sequence(WalReceiverConn *conn, Relation rel,
 }
 
 /*
- * report_sequence_mismatch
+ * report_mismatched_sequences
  *
- * Records details of sequence mismatches as a warning.
+ * Report any sequence mismatches as a single warning log.
  */
 static void
-report_sequence_mismatch(StringInfo warning_sequences)
+report_mismatched_sequences(StringInfo warning_sequences)
 {
        if (warning_sequences->len)
        {
@@ -269,6 +273,7 @@ report_sequence_mismatch(StringInfo warning_sequences)
                                errmsg("parameters differ for the remote and 
local sequences (%s) for subscription \"%s\"",
                                        warning_sequences->data, 
MySubscription->name),
                                errhint("Alter/Re-create local sequences to 
have the same parameters as the remote sequences."));
+
                resetStringInfo(warning_sequences);
        }
 }
@@ -280,14 +285,14 @@ report_sequence_mismatch(StringInfo warning_sequences)
  * and subscriber to the warning_sequences string.
  */
 static void
-append_mismatched_sequences(StringInfo warning_sequences, Relation     rel)
+append_mismatched_sequences(StringInfo warning_sequences, Relation seqrel)
 {
        if (warning_sequences->len)
                appendStringInfoString(warning_sequences, ", ");
 
        appendStringInfo(warning_sequences, "\"%s.%s\"",
-                                        
get_namespace_name(RelationGetNamespace(rel)),
-                                        RelationGetRelationName(rel));
+                                        
get_namespace_name(RelationGetNamespace(seqrel)),
+                                        RelationGetRelationName(seqrel));
 }
 
 /*
@@ -355,15 +360,15 @@ LogicalRepSyncSequences(void)
                                           slotname, &err);
        if (LogRepWorkerWalRcvConn == NULL)
                ereport(ERROR,
-                               (errcode(ERRCODE_CONNECTION_FAILURE),
-                                errmsg("could not connect to the publisher: 
%s", err)));
+                               errcode(ERRCODE_CONNECTION_FAILURE),
+                               errmsg("could not connect to the publisher: 
%s", err));
 
        seq_count = list_length(sequences_not_synced);
        foreach_ptr(SubscriptionRelState, seqinfo, sequences_not_synced)
        {
                Relation        sequence_rel;
                XLogRecPtr      sequence_lsn;
-               bool            sequence_mismatch;
+               bool            sequence_mismatch = false;
 
                CHECK_FOR_INTERRUPTS();
 
@@ -422,7 +427,7 @@ LogicalRepSyncSequences(void)
                        if (sequence_mismatch)
                                append_mismatched_sequences(warning_sequences, 
sequence_rel);
 
-                       report_sequence_mismatch(warning_sequences);
+                       report_mismatched_sequences(warning_sequences);
                        PG_RE_THROW();
                }
                PG_END_TRY();
@@ -444,11 +449,9 @@ LogicalRepSyncSequences(void)
                if (((curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) ||
                        curr_seq == seq_count)
                {
-                       /* Obtain the starting index of the current batch. */
-                       int                     i = (curr_seq - 1) - ((curr_seq 
- 1) % MAX_SEQUENCES_SYNC_PER_BATCH);
-
                        /* LOG all the sequences synchronized during current 
batch. */
-                       for (; i < curr_seq; i++)
+                       for (int i = (curr_seq - 1) - ((curr_seq - 1) % 
MAX_SEQUENCES_SYNC_PER_BATCH);
+                                i < curr_seq; i++)
                        {
                                SubscriptionRelState *done_seq;
 
@@ -459,7 +462,7 @@ LogicalRepSyncSequences(void)
                                                           
get_subscription_name(subid, false), get_rel_name(done_seq->relid)));
                        }
 
-                       report_sequence_mismatch(warning_sequences);
+                       report_mismatched_sequences(warning_sequences);
 
                        ereport(LOG,
                                        errmsg("logical replication 
synchronized %d of %d sequences for subscription \"%s\" ",
@@ -469,7 +472,6 @@ LogicalRepSyncSequences(void)
                        CommitTransactionCommand();
                        start_txn = true;
                }
-
        }
 
        list_free_deep(sequences_not_synced);
@@ -554,7 +556,7 @@ process_syncing_sequences_for_apply(void)
        foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready)
        {
                LogicalRepWorker *syncworker;
-               int                     nsyncworkers;
+               int                               nsyncworkers;
 
                if (!started_tx)
                {

Reply via email to