On Mon, Nov 3, 2025 at 12:35 AM vignesh C <[email protected]> wrote:
>
Some inor comments on 0001.
1.
+ /*
+ * Acquire LogicalRepWorkerLock in LW_EXCLUSIVE mode to block the apply
+ * worker (holding LW_SHARED) from reading or updating
+ * last_seqsync_start_time. See ProcessSyncingSequencesForApply().
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
Is it required to have LW_EXCLUSIVE lock here? In the function
ProcessSyncingSequencesForApply(), apply_worker access/update
last_seqsync_start_time only once it ensures that sequence sync worker
has exited. I have made changes related to this in the attached to
show you what I have in mind.
2.
+ /*
+ * Worker needs to process sequences across transaction boundary, so
+ * allocate them under long-lived context.
+ */
+ oldctx = MemoryContextSwitchTo(TopMemoryContext);
+
+ seq = palloc0_object(LogicalRepSequenceInfo);
…
...
+ /*
+ * Allocate in a long-lived memory context, since these
+ * errors will be reported after the transaction commits.
+ */
+ oldctx = MemoryContextSwitchTo(TopMemoryContext);
+ mismatched_seqs = lappend_int(mismatched_seqs, seqidx);
At the above and other places in syncworker, we don't need to use
TopMemoryContext; rather, we can use ApplyContext allocated via
SequenceSyncWorkerMain()->SetupApplyOrSyncWorker()->InitializeLogRepWorker().
3.
ProcessSyncingTablesForApply(current_lsn);
+ ProcessSyncingSequencesForApply();
I am not sure if the function name ProcessSyncingSequencesForApply is
appropriate. For tables, we do some work for concurrently running
tablesync workers and launch new as well but for sequences, we don't
do any work for sequences that are already being synced. How about
ProcessSequencesForSync()?
4.
+ /* Should never happen. */
+ elog(ERROR, "Sequence synchronization worker not expected to
process relations");
The first letter of the ERROR message should be small. How about:
"sequence synchronization worker is not expected to process
relations"? I have made this change in the attached.
5.
@@ -5580,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos)
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ !am_tablesync_worker());
Why this change?
6.
@@ -264,6 +267,8 @@ extern bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid userid, Oid relid,
dsm_handle subworker_dsm,
bool retain_dead_tuples);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+ Oid relid, TimestampTz *last_start_time);
extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
All the other functions except the newly added one are from
launcher.c. So, this one should be after those, no? It should be after
the InvalidateSyncingRelStates() declaration.
Apart from above, please find attached top-up patch to improve
comments and some other cosmetic stuff. The 0001 patch looks good to
me apart from the above minor points.
--
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 2a1d4e03fe2..ff9bc02a3df 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -872,11 +872,11 @@ logicalrep_reset_seqsync_start_time(void)
LogicalRepWorker *worker;
/*
- * Acquire LogicalRepWorkerLock in LW_EXCLUSIVE mode to block the apply
- * worker (holding LW_SHARED) from reading or updating
- * last_seqsync_start_time. See ProcessSyncingSequencesForApply().
+ * The apply worker can't access last_seqsync_start_time concurrently,
so
+ * it is okay to use SHARED lock here. See
+ * ProcessSyncingSequencesForApply().
*/
- LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(WORKERTYPE_APPLY,
MyLogicalRepWorker->subid, InvalidOid,
diff --git a/src/backend/replication/logical/sequencesync.c
b/src/backend/replication/logical/sequencesync.c
index 4bf70abcbaf..7f8afc1fec8 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -127,6 +127,10 @@ ProcessSyncingSequencesForApply(void)
nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
LWLockRelease(LogicalRepWorkerLock);
+ /*
+ * It is okay to read/update last_seqsync_start_time here in apply
worker
+ * as we have already ensured that sync worker doesn't exist.
+ */
launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
&MyLogicalRepWorker->last_seqsync_start_time);
}
@@ -419,7 +423,7 @@ copy_sequences(WalReceiverConn *conn)
* - On each node, a background worker acquires a lock on a
sequence
* as part of a sync operation.
*
- * - Concurrently, a user transaction attempts to alter the
same
+ * - Concurrently, a user transaction attempts to alter the same
* sequence, waiting on the background worker's lock.
*
* - Meanwhile, a query from the other node tries to access
metadata
@@ -485,8 +489,9 @@ copy_sequences(WalReceiverConn *conn)
case COPYSEQ_MISMATCH:
/*
- * Allocate in a long-lived memory
context, since these
- * errors will be reported after the
transaction commits.
+ * Remember mismatched sequences in a
long-lived memory
+ * context, since these will be used
after the transaction
+ * commits.
*/
oldctx =
MemoryContextSwitchTo(TopMemoryContext);
mismatched_seqs =
lappend_int(mismatched_seqs, seqidx);
@@ -496,8 +501,9 @@ copy_sequences(WalReceiverConn *conn)
case COPYSEQ_INSUFFICIENT_PERM:
/*
- * Allocate in a long-lived memory
context, since these
- * errors will be reported after the
transaction commits.
+ * Remember the sequences with
insufficient privileges in a
+ * long-lived memory context, since
these will be used after
+ * the transaction commits.
*/
oldctx =
MemoryContextSwitchTo(TopMemoryContext);
insuffperm_seqs =
lappend_int(insuffperm_seqs, seqidx);
@@ -573,7 +579,7 @@ copy_sequences(WalReceiverConn *conn)
}
/*
- * Determines which sequences require synchronization and initiates their
+ * Identifies sequences that require synchronization and initiates the
* synchronization process.
*/
static void
diff --git a/src/backend/replication/logical/syncutils.c
b/src/backend/replication/logical/syncutils.c
index f8b1a3d4827..53530bb39b6 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -107,8 +107,8 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32
hashvalue)
}
/*
- * Attempt to launch a sync worker (sequence or table) if there is a sync
- * worker slot available and the retry interval has elapsed.
+ * Attempt to launch a sync worker for one or more sequences or a table, if
+ * a worker slot is available and the retry interval has elapsed.
*
* wtype: sync worker type.
* nsyncworkers: Number of currently running sync workers for the subscription.
@@ -179,7 +179,7 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_SEQUENCESYNC:
/* Should never happen. */
- elog(ERROR, "Sequence synchronization worker not
expected to process relations");
+ elog(ERROR, "sequence synchronization worker is not
expected to process relations");
break;
case WORKERTYPE_UNKNOWN:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 8026a007ec3..bc8b5a5cb69 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -704,7 +704,7 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
case WORKERTYPE_SEQUENCESYNC:
/* Should never happen. */
- elog(ERROR, "Sequence synchronization worker not
expected to apply changes");
+ elog(ERROR, "sequence synchronization worker is not
expected to apply changes");
break;
case WORKERTYPE_UNKNOWN:
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 32ef365f4a6..87fb211d040 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -267,8 +267,6 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType
wtype,
Oid
userid, Oid relid,
dsm_handle subworker_dsm,
bool
retain_dead_tuples);
-extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
- Oid relid,
TimestampTz *last_start_time);
extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
@@ -292,6 +290,8 @@ extern void ProcessSyncingSequencesForApply(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32
hashvalue);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+ Oid relid,
TimestampTz* last_start_time);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
extern void FetchRelationStates(bool *has_pending_subtables,
bool
*has_pending_sequences, bool *started_tx);