On Wed, 4 Sept 2024 at 08:32, Kyotaro Horiguchi <horikyota....@gmail.com> wrote:
>
> At Tue, 3 Sep 2024 09:10:07 +0530, vignesh C <vignes...@gmail.com> wrote in
> > The attached v2 version patch has the changes for the same.
>
> Sorry for jumping in at this point. I've just reviewed the latest
> patch (v2), and the frequent Own/Disown-Latch operations caught my
> attention. Additionally, handling multiple concurrently operating
> trigger sources with nested latch waits seems bug-prone, which I’d
> prefer to avoid from both a readability and safety perspective.
>
> With that in mind, I’d like to suggest an alternative approach. I may
> not be fully aware of all the previous discussions, so apologies if
> this idea has already been considered and dismissed.
>
> Currently, WaitForReplicationWorkerAttach() and
> logicalrep_worker_stop_internal() wait on a latch after verifying the
> desired state. This ensures that even if there are spurious or missed
> wakeups, they won't cause issues. In contrast, ApplyLauncherMain()
> enters a latch wait without checking the desired state
> first. Consequently, if another process sets the latch to wake up the
> main loop while the former two functions are waiting, that wakeup
> could be missed. If my understanding is correct, the problem lies in
> ApplyLauncherMain() not checking the expected state before beginning
> to wait on the latch. There is no issue with waiting if the state
> hasn't been satisfied yet.
>
> So, I propose that ApplyLauncherMain() should check the condition that
> triggers a main loop wakeup before calling WaitLatch(). To do this, we
> could add a flag in LogicalRepCtxStruct to signal that the main loop
> has immediate tasks to handle. ApplyLauncherWakeup() would set this
> flag before sending SIGUSR1. In turn, ApplyLauncherMain() would check
> this flag before calling WaitLatch() and skip the WaitLatch() call if
> the flag is set.
>
> I think this approach could solve the issue without adding
> complexity. What do you think?

I agree that this approach is more simple than the other approach. How
about something like the attached patch to handle the same.

Regards,
Vignesh
From 3efa4ea7dff937f60d16664ec4e3a79f798a72ab Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 4 Sep 2024 16:39:07 +0530
Subject: [PATCH v3] Resolve a problem where detection of concurrent
 subscription creation was skipped.

The current implementation uses a single latch for multiple purposes:
a) attaching worker processes, b) handling worker process exits, and
c) creating subscriptions. This overlap causes issues in concurrent scenarios,
such as when the launcher starts a new apply worker and is waiting for it to
attach, while simultaneously a new subscription creation request arrives.

This commit addresses the issue by adding a new launcher_reload_sub flag
to the LogicalRepCtxStruct. Although the latch will be reset once the
worker is attached, the launcher can still detect concurrent operations
using this flag. The flag will be activated during new subscription
creation or modification, as well as when an apply worker exits. The
launcher will then check this flag in its main loop, allowing it to
reload the subscription list and initiate the necessary apply workers.
---
 src/backend/replication/logical/launcher.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e5fdca8bbf..92f1253b39 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -62,6 +62,9 @@ typedef struct LogicalRepCtxStruct
 	dsa_handle	last_start_dsa;
 	dshash_table_handle last_start_dsh;
 
+	/* Flag to reload the subscription list and start worker if required */
+	bool launcher_reload_sub;
+
 	/* Background workers. */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -1118,7 +1121,10 @@ static void
 ApplyLauncherWakeup(void)
 {
 	if (LogicalRepCtx->launcher_pid != 0)
+	{
+		LogicalRepCtx->launcher_reload_sub = true;
 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
+	}
 }
 
 /*
@@ -1164,6 +1170,8 @@ ApplyLauncherMain(Datum main_arg)
 									   ALLOCSET_DEFAULT_SIZES);
 		oldctx = MemoryContextSwitchTo(subctx);
 
+		LogicalRepCtx->launcher_reload_sub = false;
+
 		/* Start any missing workers for enabled subscriptions. */
 		sublist = get_subscription_list();
 		foreach(lc, sublist)
@@ -1220,6 +1228,9 @@ ApplyLauncherMain(Datum main_arg)
 		/* Clean the temporary memory. */
 		MemoryContextDelete(subctx);
 
+		if (LogicalRepCtx->launcher_reload_sub)
+			continue;
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-- 
2.34.1

Reply via email to