Hi, While reading the code of logicalrep_worker_launch(), I had two questions:
(1) When the sync worker limit per subscription is reached, logicalrep_worker_launch() runs garbage collection to try to free up slots before checking the limit again. That makes sense. But should we do the same when the parallel apply worker limit is reached? Currently, if we've hit the parallel apply worker limit but not the sync worker limit and we find an unused worker slot, garbage collection doesn't run. Would it make sense to also run garbage collection in that case? (2) If garbage collection removes at least one worker, logicalrep_worker_launch() scans all worker slots again to look for a free one. But since we know at least one slot was freed, this retry might be unnecessary. We could just reuse the freed slot directly. Is that correct? The attached patch addresses both points. Since logicalrep_worker_launch() isn't a performance-critical path, this might not be a high-priority change. But if my understanding is correct, I'm a bit tempted to apply it as a refactoring. Regards, -- Fujii Masao Advanced Computing Technology Center Research and Development Headquarters NTT DATA CORPORATION
From 3e541cbaacfde2c212dd95c7d53329399511a9ec Mon Sep 17 00:00:00 2001 From: Fujii Masao <fu...@postgresql.org> Date: Sat, 26 Apr 2025 00:06:40 +0900 Subject: [PATCH v1] Refactor logicalrep_worker_launch(). --- src/backend/replication/logical/launcher.c | 64 +++++++++++----------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..e2af6ccbdeb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *npa); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); -retry: /* Find unused worker slot. */ for (i = 0; i < max_logical_replication_workers; i++) { @@ -350,7 +349,7 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); @@ -359,7 +358,8 @@ retry: * reason we do this is because if some worker failed to start up and its * parent has crashed while waiting, the in_use state was never cleared. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -381,11 +381,17 @@ retry: logicalrep_worker_cleanup(w); did_cleanup = true; + + if (worker == NULL) + { + worker = w; + slot = i; + } } } if (did_cleanup) - goto retry; + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); } /* @@ -399,8 +405,6 @@ retry: return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -844,48 +848,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (isTablesyncWorker(w) && w->subid == subid) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *npa) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (npa != NULL) + *npa = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->subid == subid) - res++; + if (w->subid == subid) + { + if (nsync != NULL && isTablesyncWorker(w)) + (*nsync)++; + if (npa != NULL && isParallelApplyWorker(w)) + (*npa)++; + } } - - return res; } /* -- 2.49.0