Hi Vignesh,

Here are my review comments for your latest 0730_2* patches.

Patch v20240730_2-0001 looks good to me.

Patch v20240730_2-0002 looks good to me.

My comments for the v20240730_2-0003 patch are below:

//////////

GENERAL

1. Inconsistent terms

I've noticed there are many variations of how the sequence sync worker is known:
- "sequencesync worker"
- "sequence sync worker"
- "sequence-sync worker"
- "sequence synchronization worker"
- more?

We must settle on some standardized name.

AFAICT we generally use "table synchronization worker" in the docs,
and "tablesync worker" in the code and comments.  IMO, we should do
same as that for sequences -- e.g. "sequence synchronization worker"
in the docs, and "sequencesync worker" in the code and comments.

======
doc/src/sgml/catalogs.sgml

nitpick - the links should jump directly to REFRESH PUBLICATION or
REFRESH PUBLICATION SEQUENCES. Currently they go to the top of the
ALTER SUBSCRIPTION page which is not as useful.

======
src/backend/commands/sequence.c

do_setval:
nitpick - minor wording in the function header
nitpick - change some param names to more closely resemble the fields
they get assigned to (/logcnt/log_cnt/, /iscalled/is_called/)

~

2.
  seq->is_called = iscalled;
- seq->log_cnt = 0;
+ seq->log_cnt = (logcnt == SEQ_LOG_CNT_INVALID) ? 0: logcnt;

The logic here for SEQ_LOG_CNT_INVALID seemed strange. Why not just
#define SEQ_LOG_CNT_INVALID as 0 in the first place if that is what
you will assign for invalid? Then you won't need to do anything here
except seq->log_cnt = log_cnt;

======
src/backend/catalog/pg_subscription.c

HasSubscriptionRelations:
nitpick - I think the comment "If even a single tuple exists..." is
not quite accurate. e.g. It also has to be the right kind of tuple.

~~

GetSubscriptionRelations:
nitpick - Give more description in the function header about the other
parameters.
nitpick - I felt that a better name for 'all_relations' is all_states.
Because in my mind *all relations* sounds more like when both
'all_tables' and 'all_sequences' are true.
nitpick - IMO add an Assert to be sure something is being fetched.
Assert(get_tables || get_sequences);
nitpick - Rephrase the "skip the tables" and "skip the sequences"
comments to be more aligned with the code condition.

~

3.
- if (not_ready)
+ /* Get the relations that are not in ready state */
+ if (get_tables && !all_relations)
  ScanKeyInit(&skey[nkeys++],
  Anum_pg_subscription_rel_srsubstate,
  BTEqualStrategyNumber, F_CHARNE,
  CharGetDatum(SUBREL_STATE_READY));
+ /* Get the sequences that are in init state */
+ else if (get_sequences && !all_relations)
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srsubstate,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(SUBREL_STATE_INIT));

This is quite tricky, using multiple flags (get_tables and
get_sequences) in such a way. It might even be a bug -- e.g. Is the
'else' keyword correct? Otherwise, when both get_tables and
get_sequences are true, and all_relations is false, then the sequence
part wouldn't even get executed (???).

======
src/backend/commands/subscriptioncmds.c

CreateSubscription:
nitpick - let's move the 'tables' declaration to be beside the
'sequences' var for consistency. (in passing move other vars too)
nitpick - it's not strictly required for the patch, but let's change
the 'tables' loop to be consistent with the new sequences loop.

~~~

4. AlterSubscription_refresh

My first impression (from the function comment) is that these function
parameters are a bit awkward. For example,
- It says:  If 'copy_data' parameter is true, the function will set
the state to "init"; otherwise, it will set the state to "ready".
- It also says: "If 'all_relations' is true, mark all objects with
"init" state..."
Those statements seem to clash. e.g. if copy_data is false but
all_relations is true, then what (???)

~

nitpick - tweak function comment wording.
nitpick - introduce a 'relkind' variable to avoid multiple calls of
get_rel_relkind(relid)
nitpick - use an existing 'relkind' variable instead of calling
get_rel_relkind(relid);
nitpick - add another comment about skipping (for dropping tablesync slots)

~

5.
+ /*
+ * If all the relations should be re-synchronized, then set the
+ * state to init for re-synchronization. This is currently
+ * supported only for sequences.
+ */
+ else if (all_relations)
+ {
+ ereport(DEBUG1,
+ (errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to
INIT state",
  get_namespace_name(get_rel_namespace(relid)),
  get_rel_name(relid),
  sub->name)));
+ UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+    InvalidXLogRecPtr);

(This is a continuation of my doubts regarding 'all_relations' in the
previous review comment #4 above)

Here are some more questions about it:

~

5a. Why is this an 'else' of the !bsearch? It needs more explanation
what this case means.

~

5b. Along with more description, it might be better to reverse the
!bsearch condition, so this ('else') code is not so distantly
separated from the condition.

~

5c. Saying "only supported for sequences" seems strange: e.g. what
would it even mean to "re-synchronize" tables? They would all have to
be truncated first -- so if re-sync for tables has no meaning maybe
the parameter is misnamed and should just be 'resync_all_sequences' or
similar? In any case, an Assert here might be good.

======
src/backend/replication/logical/launcher.c

logicalrep_worker_find:

nitpick - I feel the function comment "We are only interested in..."
is now redundant since you are passing the exact worker type you want.
nitpick - I added an Assert for the types you are expecting to look for
nitpick - The comment "Search for attached worker..." is stale now
because there are more search criteria
nitpick - IMO the "Skip parallel apply workers." code is no longer
needed now that you are matching the worker type.

~~~

6. logicalrep_worker_launch

  * - must be valid worker type
  * - tablesync workers are only ones to have relid
  * - parallel apply worker is the only kind of subworker
+ * - sequencesync workers will not have relid
  */
  Assert(wtype != WORKERTYPE_UNKNOWN);
  Assert(is_tablesync_worker == OidIsValid(relid));
  Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert(!is_sequencesync_worker || !OidIsValid(relid));

On further reflection, is that added comment and added Assert even
needed? I think they can be removed because saying "tablesync workers
are only ones to have relid" seems to already cover what we needed to
say/assert.

~~~

logicalrep_worker_stop:
nitpick - /type/wtype/ for readability

~~~

7.
/*
 * Count the number of registered (not necessarily running) sync workers
 * for a subscription.
 */
int
logicalrep_sync_worker_count(Oid subid)

~

I thought this function should count the sequencesync worker as well.

======
.../replication/logical/sequencesync.c

fetch_remote_sequence_data:
nitpick - tweaked function comment
nitpick - /value/last_value/ for readability

~

8.
+ *lsn = DatumGetInt64(slot_getattr(slot, 4, &isnull));
+ Assert(!isnull);

Should that be DatumGetUInt64?

~~~

copy_sequence:
nitpick - tweak function header.
nitpick - renamed the sequence vars for consistency, and declared them
all together.

======
src/backend/replication/logical/tablesync.c

9.
 void
 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 {
- table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
+ relation_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
 }

I assume you changed the 'table_states_validity' name because this is
no longer exclusively for tables. So, should the function name also be
similarly changed?

~~~

process_syncing_sequences_for_apply:
nitpick - tweaked the function comment
nitpick - cannot just say "if there is not one already." a sequence
syn worker might not even be needed.
nitpick - added blank line for readability

~

10.
+ if (syncworker)
+ {
+ /* Now safe to release the LWLock */
+ LWLockRelease(LogicalRepWorkerLock);
+ break;
+ }
+ else
+ {

This 'else' can be removed if you wish to pull back all the indentation.

~~~

11.
process_syncing_tables(XLogRecPtr current_lsn)

Is the function name still OK given that is is now also syncing for sequences?

~~~

FetchTableStates:
nitpick - Reworded some of the function comment
nitpick - Function comment is stale because it is still referring to
the function parameter which this patch removed.
nitpick - tweak a comment

======
src/include/commands/sequence.h

12.
+#define SEQ_LOG_CNT_INVALID (-1)

See a previous review comment (#2 above) where I wondered why not use
value 0 for this.

~~~

13.
 extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
+extern void do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt);
 extern void ResetSequenceCaches(void);

do_setval() was an OK function name when it was static, but as an
exposed API it seems like a terrible name. IMO rename it to something
like 'SetSequence' to match the other API functions nearby.

~

nitpick - same change to the parameter names as suggested for the
implementation.

======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 22b2a93..16c427e 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8110,9 +8110,10 @@ SCRAM-SHA-256$<replaceable>&lt;iteration 
count&gt;</replaceable>:<replaceable>&l
   <para>
    This catalog only contains tables and sequences known to the subscription
    after running either
-   <link linkend="sql-createsubscription"><command>CREATE 
SUBSCRIPTION</command></link>
-   or <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... 
REFRESH
-   PUBLICATION</command></link> or <link linkend="sql-altersubscription">
+   <link linkend="sql-createsubscription"><command>CREATE 
SUBSCRIPTION</command></link> or
+  <link linkend="sql-altersubscription-params-refresh-publication">
+   <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link> or
+   <link linkend="sql-altersubscription-params-refresh-publication-sequences">
    <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION 
SEQUENCES</command></link>.
   </para>
 
diff --git a/src/backend/catalog/pg_subscription.c 
b/src/backend/catalog/pg_subscription.c
index bc6d18b..a1ee74b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -490,7 +490,10 @@ HasSubscriptionRelations(Oid subid)
 
                subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
 
-               /* If even a single tuple exists then the subscription has 
tables. */
+               /*
+                * Skip sequence tuples. If even a single table tuple
+                * exists then the subscription has tables.
+                */
                if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
                {
                        has_subrels = true;
@@ -508,17 +511,21 @@ HasSubscriptionRelations(Oid subid)
 /*
  * Get the relations for the subscription.
  *
- * all_relations:
- * If returning sequences, if all_relations=true get all sequences,
- * otherwise only get sequences that are in 'init' state.
- * If returning tables, if all_relation=true get all tables, otherwise
+ * get_tables: get relations for tables of the subscription.
+ *
+ * get_sequences: get relations for sequences of the subscription.
+ *
+ * all_states:
+ * If getting tables, if all_states is true get all tables, otherwise
  * only get tables that have not reached 'READY' state.
+ * If getting sequences, if all_states is true get all sequences,
+ * otherwise only get sequences that are in 'init' state.
  *
  * The returned list is palloc'ed in the current memory context.
  */
 List *
 GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences,
-                                                bool all_relations)
+                                                bool all_states)
 {
        List       *res = NIL;
        Relation        rel;
@@ -527,6 +534,9 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool 
get_sequences,
        ScanKeyData skey[2];
        SysScanDesc scan;
 
+       /* One or both of 'get_tables' and 'get_sequences' must be true. */
+       Assert(get_tables || get_sequences);
+
        rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
        ScanKeyInit(&skey[nkeys++],
@@ -535,13 +545,13 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool 
get_sequences,
                                ObjectIdGetDatum(subid));
 
        /* Get the relations that are not in ready state */
-       if (get_tables && !all_relations)
+       if (get_tables && !all_states)
                ScanKeyInit(&skey[nkeys++],
                                        Anum_pg_subscription_rel_srsubstate,
                                        BTEqualStrategyNumber, F_CHARNE,
                                        CharGetDatum(SUBREL_STATE_READY));
        /* Get the sequences that are in init state */
-       else if (get_sequences && !all_relations)
+       else if (get_sequences && !all_states)
                ScanKeyInit(&skey[nkeys++],
                                        Anum_pg_subscription_rel_srsubstate,
                                        BTEqualStrategyNumber, F_CHAREQ,
@@ -561,11 +571,11 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool 
get_sequences,
                subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
                subreltype = get_rel_relkind(subrel->srrelid);
 
-               /* If only tables were requested, skip the sequences */
+               /* Skip sequences if they were not requested */
                if (subreltype == RELKIND_SEQUENCE && !get_sequences)
                        continue;
 
-               /* If only sequences were requested, skip the tables */
+               /* Skip tables if they were not requested */
                if (subreltype != RELKIND_SEQUENCE && !get_tables)
                        continue;
 
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index e8bd53c..2e63925 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -942,11 +942,11 @@ lastval(PG_FUNCTION_ARGS)
  * it is the only way to clear the is_called flag in an existing
  * sequence.
  *
- * logcnt is currently used only by sequence syncworker to set the log_cnt for
- * sequences while synchronization of sequence values from the publisher.
+ * log_cnt is currently used only by the sequence syncworker to set the
+ * log_cnt for sequences while synchronizing values from the publisher.
  */
 void
-do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt)
+do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt)
 {
        SeqTable        elm;
        Relation        seqrel;
@@ -997,7 +997,7 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 
logcnt)
                                                (long long) minv, (long long) 
maxv)));
 
        /* Set the currval() state only if iscalled = true */
-       if (iscalled)
+       if (is_called)
        {
                elm->last = next;               /* last returned number */
                elm->last_valid = true;
@@ -1014,8 +1014,8 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 
logcnt)
        START_CRIT_SECTION();
 
        seq->last_value = next;         /* last fetched number */
-       seq->is_called = iscalled;
-       seq->log_cnt = (logcnt == SEQ_LOG_CNT_INVALID) ? 0: logcnt;
+       seq->is_called = is_called;
+       seq->log_cnt = (log_cnt == SEQ_LOG_CNT_INVALID) ? 0: log_cnt;
 
        MarkBufferDirty(buf);
 
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 984f72d..1c01a2b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -735,9 +735,6 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        {
                char       *err;
                WalReceiverConn *wrconn;
-               List       *tables;
-               ListCell   *lc;
-               char            table_state;
                bool            must_use_password;
 
                /* Try to connect to the publisher. */
@@ -752,7 +749,9 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
 
                PG_TRY();
                {
+                       List       *tables;
                        List       *sequences;
+                       char            table_state;
 
                        check_publications(wrconn, publications);
                        check_publications_origin(wrconn, publications, 
opts.copy_data,
@@ -769,9 +768,8 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                         * info.
                         */
                        tables = fetch_table_list(wrconn, publications);
-                       foreach(lc, tables)
+                       foreach_ptr(RangeVar, rv, tables)
                        {
-                               RangeVar   *rv = (RangeVar *) lfirst(lc);
                                Oid                     relid;
 
                                relid = RangeVarGetRelid(rv, AccessShareLock, 
false);
@@ -884,9 +882,9 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
  * sequences that have been added or removed since the last subscription
  * creation or publication refresh.
  *
- * If 'all_relations' is true, it will mark all objects with "init" state
- * for re-synchronization; otherwise, only the newly added tables and
- * sequences will be updated based on the copy_data parameter.
+ * If 'all_relations' is true, mark all objects with "init" state
+ * for re-synchronization; otherwise, only update the newly added tables and
+ * sequences based on the copy_data parameter.
  */
 static void
 AlterSubscription_refresh(Subscription *sub, bool copy_data,
@@ -984,12 +982,13 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
                {
                        RangeVar   *rv = (RangeVar *) lfirst(lc);
                        Oid                     relid;
+                       char            relkind;
 
                        relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
                        /* Check for supported relkind. */
-                       CheckSubscriptionRelkind(get_rel_relkind(relid),
-                                                                        
rv->schemaname, rv->relname);
+                       relkind = get_rel_relkind(relid);
+                       CheckSubscriptionRelkind(relkind, rv->schemaname, 
rv->relname);
 
                        pubrel_local_oids[off++] = relid;
 
@@ -1001,7 +1000,7 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
                                                                                
InvalidXLogRecPtr, true);
                                ereport(DEBUG1,
                                                (errmsg_internal("%s \"%s.%s\" 
added to subscription \"%s\"",
-                                                                               
 get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table",
+                                                                               
 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
                                                                                
 rv->schemaname, rv->relname, sub->name)));
                        }
                }
@@ -1086,7 +1085,7 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
 
                                ereport(DEBUG1,
                                                (errmsg_internal("%s \"%s.%s\" 
removed from subscription \"%s\"",
-                                                                               
 get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table",
+                                                                               
 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
                                                                                
 get_namespace_name(get_rel_namespace(relid)),
                                                                                
 get_rel_name(relid),
                                                                                
 sub->name)));
@@ -1116,6 +1115,7 @@ AlterSubscription_refresh(Subscription *sub, bool 
copy_data,
                 */
                for (off = 0; off < remove_rel_len; off++)
                {
+                       /* Skip relations belonging to sequences. */
                        if (get_rel_relkind(sub_remove_rels[off].relid) == 
RELKIND_SEQUENCE)
                                continue;
 
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 04d76e7..5da5529 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -236,30 +236,27 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 /*
  * Walks the workers array and searches for one that matches given
  * subscription id, relid and type.
- *
- * We are only interested in the leader apply worker, table sync worker, or
- * sequence sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType type,
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
                                           bool only_running)
 {
        int                     i;
        LogicalRepWorker *res = NULL;
 
+       Assert(wtype == WORKERTYPE_TABLESYNC ||
+                  wtype == WORKERTYPE_SEQUENCESYNC ||
+                  wtype == WORKERTYPE_APPLY);
+
        Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
-       /* Search for attached worker for a given subscription id. */
+       /* Search for the attached worker matching the specified criteria. */
        for (i = 0; i < max_logical_replication_workers; i++)
        {
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-               /* Skip parallel apply workers. */
-               if (isParallelApplyWorker(w))
-                       continue;
-
                if (w->in_use && w->subid == subid && w->relid == relid &&
-                       w->type == type && (!only_running || w->proc))
+                       w->type == wtype && (!only_running || w->proc))
                {
                        res = w;
                        break;
@@ -626,13 +623,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, 
int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType type)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
 {
        LogicalRepWorker *worker;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       worker = logicalrep_worker_find(subid, relid, type, false);
+       worker = logicalrep_worker_find(subid, relid, wtype, false);
 
        if (worker)
        {
diff --git a/src/backend/replication/logical/sequencesync.c 
b/src/backend/replication/logical/sequencesync.c
index fc36bf9..9aef45a 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -32,9 +32,11 @@
 /*
  * fetch_remote_sequence_data
  *
- * Retrieve the last_value, log_cnt, page_lsn and is_called of the sequence
- * from the remote node. The last_value will be returned directly, while
- * log_cnt, is_called and page_lsn will be provided through the output
+ * Retrieve sequence data (last_value, log_cnt, page_lsn and is_called)
+ * from the remote node.
+ *
+ * The sequence last_value will be returned directly, while
+ * log_cnt, is_called and page_lsn will be returned via the output
  * parameters log_cnt, is_called and lsn, respectively.
  */
 static int64
@@ -46,7 +48,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
remoteid, char *nspname,
        StringInfoData cmd;
        TupleTableSlot *slot;
        Oid                     tableRow[4] = {INT8OID, INT8OID, BOOLOID, 
LSNOID};
-       int64           value = (Datum) 0;
+       int64           last_value = (Datum) 0;
        bool            isnull;
 
        initStringInfo(&cmd);
@@ -70,7 +72,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
remoteid, char *nspname,
                                 errmsg("sequence \"%s.%s\" not found on 
publisher",
                                                nspname, relname)));
 
-       value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+       last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
        Assert(!isnull);
 
        *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
@@ -86,23 +88,24 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid 
remoteid, char *nspname,
 
        walrcv_clear_result(res);
 
-       return value;
+       return last_value;
 }
 
 /*
  * Copy existing data of a sequence from publisher.
  *
  * Fetch the sequence value from the publisher and set the subscriber sequence
- * with the retrieved value. Caller is responsible for locking the local
+ * with the same value. Caller is responsible for locking the local
  * relation.
  */
 static XLogRecPtr
 copy_sequence(WalReceiverConn *conn, Relation rel)
 {
        StringInfoData cmd;
-       int64           sequence_value;
-       int64           log_cnt;
-       XLogRecPtr      lsn = InvalidXLogRecPtr;
+       int64           seq_last_value;
+       int64           seq_log_cnt;
+       bool            seq_is_called;
+       XLogRecPtr      seq_lsn = InvalidXLogRecPtr;
        WalRcvExecResult *res;
        Oid                     tableRow[] = {OIDOID, CHAROID};
        TupleTableSlot *slot;
@@ -111,7 +114,6 @@ copy_sequence(WalReceiverConn *conn, Relation rel)
        bool            isnull;
        char       *nspname = get_namespace_name(RelationGetNamespace(rel));
        char       *relname = RelationGetRelationName(rel);
-       bool            is_called;
 
        /* Fetch Oid. */
        initStringInfo(&cmd);
@@ -148,14 +150,14 @@ copy_sequence(WalReceiverConn *conn, Relation rel)
        ExecDropSingleTupleTableSlot(slot);
        walrcv_clear_result(res);
 
-       sequence_value = fetch_remote_sequence_data(conn, remoteid, nspname,
-                                                                               
                relname, &log_cnt, &is_called,
-                                                                               
                &lsn);
+       seq_last_value = fetch_remote_sequence_data(conn, remoteid, nspname,
+                                                                               
                relname, &seq_log_cnt, &seq_is_called,
+                                                                               
                &seq_lsn);
 
-       do_setval(RelationGetRelid(rel), sequence_value, is_called, log_cnt);
+       do_setval(RelationGetRelid(rel), seq_last_value, seq_is_called, 
seq_log_cnt);
 
        /* return the LSN when the sequence state was set */
-       return lsn;
+       return seq_lsn;
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index 3e162b9..6e7ed8e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -680,7 +680,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
  * apply process (currently, all that have state SUBREL_STATE_INIT) and manage
  * synchronization for them.
  *
- * If there is a sequence synchronization worker running already, no need to
+ * If a sequence synchronization worker is running already, there is no need to
  * start a new one; the existing sequence sync worker will synchronize all the
  * sequences. If there are still any sequences to be synced after the sequence
  * sync worker exited, then a new sequence sync worker can be started in the
@@ -697,7 +697,7 @@ process_syncing_sequences_for_apply()
        Assert(!IsTransactionState());
 
        /*
-        * Start sequence sync worker if there is not one already.
+        * Start the sequence sync worker if needed, and there is not one 
already.
         */
        foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready)
        {
@@ -753,6 +753,7 @@ process_syncing_sequences_for_apply()
                                                                                
           now, wal_retrieve_retry_interval))
                                {
                                        
MyLogicalRepWorker->sequencesync_failure_time = 0;
+
                                        
logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC,
                                                                                
         MyLogicalRepWorker->dbid,
                                                                                
         MySubscription->oid,
@@ -1689,16 +1690,14 @@ copy_table_done:
 /*
  * Common code to fetch the up-to-date sync state info into the static lists.
  *
- * Copy tables that are not ready into table_states_not_ready and sequences
- * that are not ready into sequence_states_not_ready. The pg_subscription_rel
- * table is shared between sequences and tables. Because changes to either
- * sequences or relations can affect the validity of relation states, we update
- * both table_states_not_ready and sequence_states_not_ready simultaneously
- * to ensure consistency, rather than updating them separately. Returns true if
- * subscription has 1 or more tables, else false.
+ * Copy tables that are not READY state into table_states_not_ready, and 
sequences
+ * that have INIT state into sequence_states_not_ready. The pg_subscription_rel
+ * catalog is shared by tables and sequences. Changes to either sequences or
+ * tables can affect the validity of relation states, so we update both
+ * table_states_not_ready and sequence_states_not_ready simultaneously
+ * to ensure consistency.
  *
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * Returns true if subscription has 1 or more tables, else false.
  */
 static bool
 FetchTableStates(void)
@@ -1728,7 +1727,7 @@ FetchTableStates(void)
                }
 
                /*
-                * Fetch the tables that are in non-ready state and the 
sequences that
+                * Fetch tables that are in non-ready state, and sequences that
                 * are in init state.
                 */
                rstates = GetSubscriptionRelations(MySubscription->oid, true, 
true,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 71d8c76..b81f496 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -62,7 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, 
AlterSeqStmt *stmt);
 extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
 extern void DeleteSequenceTuple(Oid relid);
 extern void ResetSequence(Oid seq_relid);
-extern void do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt);
+extern void do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt);
 extern void ResetSequenceCaches(void);
 
 extern void seq_redo(XLogReaderState *record);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 8a12ecb..6b201d6 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -242,7 +242,7 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
-                                                                               
                LogicalRepWorkerType type,
+                                                                               
                LogicalRepWorkerType wtype,
                                                                                
                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running,
                                                                         bool 
acquire_lock);

Reply via email to