On Fri, 5 Jul 2024 at 18:38, Heikki Linnakangas <hlinn...@iki.fi> wrote:
>
> On 05/07/2024 14:07, vignesh C wrote:
> > On Thu, 4 Jul 2024 at 16:52, Heikki Linnakangas <hlinn...@iki.fi> wrote:
> >>
> >> I'm don't quite understand the problem we're trying to fix:
> >>
> >>> Currently the launcher's latch is used for the following: a) worker
> >>> process attach b) worker process exit and c) subscription creation.
> >>> Since this same latch is used for multiple cases, the launcher process
> >>> is not able to handle concurrent scenarios like: a) Launcher started a
> >>> new apply worker and waiting for apply worker to attach and b) create
> >>> subscription sub2 sending launcher wake up signal. In this scenario,
> >>> both of them will set latch of the launcher process, the launcher
> >>> process is not able to identify that both operations have occurred 1)
> >>> worker is attached 2) subscription is created and apply worker should
> >>> be started. As a result the apply worker does not get started for the
> >>> new subscription created immediately and gets started after the
> >>> timeout of 180 seconds.
> >>
> >> I don't see how we could miss a notification. Yes, both events will set
> >> the same latch. Why is that a problem?
> >
> > The launcher process waits for the apply worker to attach at
> > WaitForReplicationWorkerAttach function. The launcher's latch is
> > getting set concurrently for another create subscription and apply
> > worker attached. The launcher now detects the latch is set while
> > waiting at WaitForReplicationWorkerAttach, it resets the latch and
> > proceed to the main loop and waits for DEFAULT_NAPTIME_PER_CYCLE(as
> > the latch has already been reset). Further details are provided below.
> >
> > The loop will see that the new
> >> worker has attached, and that the new subscription has been created, and
> >> process both events. Right?
> >
> > Since the latch is reset at WaitForReplicationWorkerAttach, it skips
> > processing the create subscription event.
> >
> > Slightly detailing further:
> > In the scenario when we execute two concurrent create subscription
> > commands, first CREATE SUBSCRIPTION sub1, followed immediately by
> > CREATE SUBSCRIPTION sub2.
> > In a few random scenarios, the following events may unfold:
> > After the first create subscription command(sub1), the backend will
> > set the launcher's latch because of ApplyLauncherWakeupAtCommit.
> > Subsequently, the launcher process will reset the latch and identify
> > the addition of a new subscription in the pg_subscription list. The
> > launcher process will proceed to request the postmaster to start the
> > apply worker background process (sub1) and request the postmaster to
> > notify the launcher once the apply worker(sub1) has been started.
> > Launcher will then wait for the apply worker(sub1) to attach in the
> > WaitForReplicationWorkerAttach function.
> > Meanwhile, the second CREATE SUBSCRIPTION command (sub2) which was
> > executed concurrently, also set the launcher's latch(because of
> > ApplyLauncherWakeupAtCommit).
> > Simultaneously when the launcher remains in the
> > WaitForReplicationWorkerAttach function waiting for apply worker of
> > sub1 to start, the postmaster will start the apply worker for
> > subscription sub1 and send a SIGUSR1 signal to the launcher process
> > via ReportBackgroundWorkerPID. Upon receiving this signal, the
> > launcher process will then set its latch.
> >
> > Now, the launcher's latch has been set concurrently after the apply
> > worker for sub1 is started and the execution of the CREATE
> > SUBSCRIPTION sub2 command.
> >
> > At this juncture, the launcher, which had been awaiting the attachment
> > of the apply worker, detects that the latch is set and proceeds to
> > reset it. The reset operation of the latch occurs within the following
> > section of code in WaitForReplicationWorkerAttach:
> > ...
> > rc = WaitLatch(MyLatch,
> >     WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
> >     10L, WAIT_EVENT_BGWORKER_STARTUP);
> >
> > if (rc & WL_LATCH_SET)
> > {
> > ResetLatch(MyLatch);
> > CHECK_FOR_INTERRUPTS();
> > }
> > ...
> >
> > After resetting the latch here, the activation signal intended to
> > start the apply worker for subscription sub2 is no longer present. The
> > launcher will return to the ApplyLauncherMain function, where it will
> > await the DEFAULT_NAPTIME_PER_CYCLE, which is 180 seconds, before
> > processing the create subscription request (i.e., creating a new apply
> > worker for sub2).
> > The issue arises from the latch being reset in
> > WaitForReplicationWorkerAttach, which can occasionally delay the
> > synchronization of table data for the subscription.
>
> Ok, I see it now. Thanks for the explanation. So the problem isn't using
> the same latch for different purposes per se. It's that we're trying to
> use it in a nested fashion, resetting it in the inner loop.
>
> Looking at the proposed patch more closely:
>
> > @@ -221,13 +224,13 @@ WaitForReplicationWorkerAttach(LogicalRepWorker 
> > *worker,
> >                * We need timeout because we generally don't get notified 
> > via latch
> >                * about the worker attach.  But we don't expect to have to 
> > wait long.
> >                */
> > -             rc = WaitLatch(MyLatch,
> > +             rc = WaitLatch(&worker->launcherWakeupLatch,
> >                                          WL_LATCH_SET | WL_TIMEOUT | 
> > WL_EXIT_ON_PM_DEATH,
> >                                          10L, WAIT_EVENT_BGWORKER_STARTUP);
> >
> >               if (rc & WL_LATCH_SET)
> >               {
> > -                     ResetLatch(MyLatch);
> > +                     ResetLatch(&worker->launcherWakeupLatch);
> >                       CHECK_FOR_INTERRUPTS();
> >               }
> >       }
>
> The comment here is now outdated, right? The patch adds an explicit
> SetLatch() call to ParallelApplyWorkerMain(), to notify the launcher
> about the attachment.

I have removed these comments

> Is the launcherWakeupLatch field protected by some lock, to protect
> which process owns it? OwnLatch() is called in
> logicalrep_worker_stop_internal() without holding a lock for example. Is
> there a guarantee that only one process can do that at the same time?

Added a lock to prevent concurrent access

> What happens if a process never calls DisownLatch(), e.g. because it
> bailed out with an error while waiting for the worker to attach or stop?

This will not happen now as we call own and disown just before wait
latch and not in other cases.

The attached v2 version patch has the changes for the same.

Regards,
Vignesh
From 89b7feb30cb70898595366c0203d420c9502c7b6 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 29 May 2024 14:55:49 +0530
Subject: [PATCH v2] Improved latch handling between the logical replication
 launcher and worker processes.

The current implementation uses a single latch for multiple purposes:
a) attaching worker processes, b) handling worker process exits, and
c) creating subscriptions. This overlap causes issues in concurrent scenarios,
such as when the launcher starts a new apply worker and is waiting for it to
attach, while simultaneously a new subscription creation request arrives.

This commit fixes the problem by introducing a new wakeupLatch. This latch
will be set once the logical replication processes (apply worker,
parallel apply worker, or tablesync worker) are started. Consequently,
bgw_notify_pid is no longer required, as wakeupLatch will handle notifications
when a logical replication worker process attaches and will signal the
launcher or apply worker upon process exit from logicalrep_worker_onexit.
---
 .../replication/logical/applyparallelworker.c |  1 +
 src/backend/replication/logical/launcher.c    | 93 +++++++++++--------
 src/backend/replication/logical/worker.c      |  1 +
 src/include/replication/worker_internal.h     |  7 ++
 4 files changed, 62 insertions(+), 40 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index e7f7d4c5e4..2fb4d63ebc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -912,6 +912,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * the uninitialized memory queue.
 	 */
 	logicalrep_worker_attach(worker_slot);
+	SetLatch(&MyLogicalRepWorker->wakeupLatch);
 
 	/*
 	 * Register the shutdown callback after we are attached to the worker
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index c566d50a07..dea41b7ea1 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -171,6 +171,35 @@ get_subscription_list(void)
 	return res;
 }
 
+/*
+ * Wait for wakeupLatch to be set or a timeout.
+ */
+static void
+WaitForWakeupLatch(LogicalRepWorker *worker, uint32 wait_event_info)
+{
+	int			rc;
+
+	SpinLockAcquire(&worker->wakeupmutex);
+	OwnLatch(&worker->wakeupLatch);
+
+	rc = WaitLatch(&worker->wakeupLatch,
+					WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					1000L, wait_event_info);
+
+	if (rc & WL_LATCH_SET)
+	{
+		ResetLatch(&worker->wakeupLatch);
+		DisownLatch(&worker->wakeupLatch);
+		SpinLockRelease(&worker->wakeupmutex);
+		CHECK_FOR_INTERRUPTS();
+	}
+	else
+	{
+		DisownLatch(&worker->wakeupLatch);
+		SpinLockRelease(&worker->wakeupmutex);
+	}
+}
+
 /*
  * Wait for a background worker to start up and attach to the shmem context.
  *
@@ -185,7 +214,6 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 							   BackgroundWorkerHandle *handle)
 {
 	BgwHandleStatus status;
-	int			rc;
 
 	for (;;)
 	{
@@ -217,19 +245,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 			return false;
 		}
 
-		/*
-		 * We need timeout because we generally don't get notified via latch
-		 * about the worker attach.  But we don't expect to have to wait long.
-		 */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
+		WaitForWakeupLatch(worker, WAIT_EVENT_BGWORKER_STARTUP);
 	}
 }
 
@@ -456,6 +472,12 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
+	SpinLockAcquire(&worker->wakeupmutex);
+	OwnLatch(&worker->wakeupLatch);
+	ResetLatch(&worker->wakeupLatch);
+	DisownLatch(&worker->wakeupLatch);
+	SpinLockRelease(&worker->wakeupmutex);
+
 	/* Before releasing lock, remember generation for future identification. */
 	generation = worker->generation;
 
@@ -503,7 +525,7 @@ retry:
 	}
 
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
-	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_notify_pid = 0;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
@@ -548,20 +570,9 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 	 */
 	while (worker->in_use && !worker->proc)
 	{
-		int			rc;
-
 		LWLockRelease(LogicalRepWorkerLock);
 
-		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
+		WaitForWakeupLatch(worker, WAIT_EVENT_BGWORKER_STARTUP);
 
 		/* Recheck worker status. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@@ -585,24 +596,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 	/* ... and wait for it to die. */
 	for (;;)
 	{
-		int			rc;
-
 		/* is it gone? */
 		if (!worker->proc || worker->generation != generation)
 			break;
 
 		LWLockRelease(LogicalRepWorkerLock);
 
-		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
+		WaitForWakeupLatch(worker, WAIT_EVENT_BGWORKER_SHUTDOWN);
 
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 	}
@@ -823,6 +823,8 @@ logicalrep_launcher_onexit(int code, Datum arg)
 static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
+	LogicalRepWorkerType type = MyLogicalRepWorker->type;
+
 	/* Disconnect gracefully from the remote side. */
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
@@ -843,7 +845,16 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (!InitializingApplyWorker)
 		LockReleaseAll(DEFAULT_LOCKMETHOD, true);
 
-	ApplyLauncherWakeup();
+	SetLatch(&MyLogicalRepWorker->wakeupLatch);
+
+	/*
+	 * Since we don't set bgw_notify_pid, send an exit signal to the parent
+	 * process.
+	 */
+	if (type == WORKERTYPE_APPLY)
+		ApplyLauncherWakeup();
+	else
+		logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 }
 
 /*
@@ -982,6 +993,8 @@ ApplyLauncherShmemInit(void)
 
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(&worker->relmutex);
+			InitSharedLatch(&worker->wakeupLatch);
+			SpinLockInit(&worker->wakeupmutex);
 		}
 	}
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0fb577d328..15cfe5693e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4737,6 +4737,7 @@ SetupApplyOrSyncWorker(int worker_slot)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
 	pqsignal(SIGTERM, die);
 	BackgroundWorkerUnblockSignals();
+	SetLatch(&MyLogicalRepWorker->wakeupLatch);
 
 	/*
 	 * We don't currently need any ResourceOwner in a walreceiver process, but
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9646261d7e..61f63dc1a3 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,6 +77,13 @@ typedef struct LogicalRepWorker
 	 */
 	FileSet    *stream_fileset;
 
+	/*
+	 * Used to wake up the launcher/apply worker after the logical replication
+	 * process is attached/exited.
+	 */
+	Latch		wakeupLatch;
+	slock_t		wakeupmutex;
+
 	/*
 	 * PID of leader apply worker if this slot is used for a parallel apply
 	 * worker, InvalidPid otherwise.
-- 
2.34.1

Reply via email to