Hi,

While reading the code of logicalrep_worker_launch(), I had two questions:

(1)
When the sync worker limit per subscription is reached, 
logicalrep_worker_launch()
runs garbage collection to try to free up slots before checking the limit again.
That makes sense.

But should we do the same when the parallel apply worker limit is reached?
Currently, if we've hit the parallel apply worker limit but not the sync worker 
limit
and we find an unused worker slot, garbage collection doesn't run. Would it
make sense to also run garbage collection in that case?

(2)
If garbage collection removes at least one worker, logicalrep_worker_launch()
scans all worker slots again to look for a free one. But since we know at least 
one
slot was freed, this retry might be unnecessary. We could just reuse the freed
slot directly. Is that correct?


The attached patch addresses both points. Since logicalrep_worker_launch()
isn't a performance-critical path, this might not be a high-priority change.
But if my understanding is correct, I'm a bit tempted to apply it as a 
refactoring.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
From 3e541cbaacfde2c212dd95c7d53329399511a9ec Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Sat, 26 Apr 2025 00:06:40 +0900
Subject: [PATCH v1] Refactor logicalrep_worker_launch().

---
 src/backend/replication/logical/launcher.c | 64 +++++++++++-----------
 1 file changed, 31 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 10677da56b2..e2af6ccbdeb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int     logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int *npa);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-retry:
        /* Find unused worker slot. */
        for (i = 0; i < max_logical_replication_workers; i++)
        {
@@ -350,7 +349,7 @@ retry:
                }
        }
 
-       nsyncworkers = logicalrep_sync_worker_count(subid);
+       logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
        now = GetCurrentTimestamp();
 
@@ -359,7 +358,8 @@ retry:
         * reason we do this is because if some worker failed to start up and 
its
         * parent has crashed while waiting, the in_use state was never cleared.
         */
-       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription 
||
+               nparallelapplyworkers >= 
max_parallel_apply_workers_per_subscription)
        {
                bool            did_cleanup = false;
 
@@ -381,11 +381,17 @@ retry:
 
                                logicalrep_worker_cleanup(w);
                                did_cleanup = true;
+
+                               if (worker == NULL)
+                               {
+                                       worker = w;
+                                       slot = i;
+                               }
                        }
                }
 
                if (did_cleanup)
-                       goto retry;
+                       logicalrep_worker_count(subid, &nsyncworkers, 
&nparallelapplyworkers);
        }
 
        /*
@@ -399,8 +405,6 @@ retry:
                return false;
        }
 
-       nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
        /*
         * Return false if the number of parallel apply workers reached the 
limit
         * per subscription.
@@ -844,48 +848,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-       int                     i;
        int                     res = 0;
 
-       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-       /* Search for attached worker for a given subscription id. */
-       for (i = 0; i < max_logical_replication_workers; i++)
-       {
-               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-               if (isTablesyncWorker(w) && w->subid == subid)
-                       res++;
-       }
-
+       logicalrep_worker_count(subid, &res, NULL);
        return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *npa)
 {
-       int                     i;
-       int                     res = 0;
-
        Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+       if (nsync != NULL)
+               *nsync = 0;
+       if (npa != NULL)
+               *npa = 0;
+
        /*
-        * Scan all attached parallel apply workers, only counting those which
-        * have the given subscription id.
+        * Scan all attached sync and parallel apply workers, only counting 
those
+        * which have the given subscription id.
         */
-       for (i = 0; i < max_logical_replication_workers; i++)
+       for (int i = 0; i < max_logical_replication_workers; i++)
        {
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-               if (isParallelApplyWorker(w) && w->subid == subid)
-                       res++;
+               if (w->subid == subid)
+               {
+                       if (nsync != NULL && isTablesyncWorker(w))
+                               (*nsync)++;
+                       if (npa != NULL && isParallelApplyWorker(w))
+                               (*npa)++;
+               }
        }
-
-       return res;
 }
 
 /*
-- 
2.49.0

Reply via email to