On Fri, 10 May 2024 at 07:39, Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Dear Vignesh,
>
> Thanks for raising idea!
>
> > a) Introduce a new latch to handle worker attach and exit.
>
> Just to confirm - there are three wait events for launchers, so I feel we may 
> be
> able to create latches per wait event. Is there a reason to introduce
> "a" latch?

One latch is enough, we can use the new latch for both worker starting
and worker exiting. The other existing latch can be used for other
purposes.  Something like the attached patch.

Regards,
Vignesh
From 072cfefd717a0c7c496a5ca66e9239ccb808ece6 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 29 May 2024 14:55:49 +0530
Subject: [PATCH v1] Improving the latch handling between logical replication
 launcher and the worker processes.

Currently the launcher's latch is used for the following: a) worker
process attach b) worker process exit and c) subscription creation.
Since this same latch is used for multiple cases, the launcher process
is not able to handle concurrent scenarios like: a) Launcher started a
new apply worker and waiting for apply worker to attach and b) create
subscription sub2 sending launcher wake up signal. Fixed it by having
a different latch for worker start and exit.
---
 .../replication/logical/applyparallelworker.c |  1 +
 src/backend/replication/logical/launcher.c    | 19 ++++++++++++++-----
 src/backend/replication/logical/worker.c      |  1 +
 src/include/replication/worker_internal.h     |  6 ++++++
 4 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index e7f7d4c5e4..62c3791ceb 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -912,6 +912,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * the uninitialized memory queue.
 	 */
 	logicalrep_worker_attach(worker_slot);
+	SetLatch(&MyLogicalRepWorker->launcherWakeupLatch);
 
 	/*
 	 * Register the shutdown callback after we are attached to the worker
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 27c3a91fb7..74bfdb9b6e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -187,6 +187,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 	BgwHandleStatus status;
 	int			rc;
 
+	OwnLatch(&worker->launcherWakeupLatch);
 	for (;;)
 	{
 		pid_t		pid;
@@ -199,6 +200,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 		if (!worker->in_use || worker->proc)
 		{
 			LWLockRelease(LogicalRepWorkerLock);
+			DisownLatch(&worker->launcherWakeupLatch);
 			return worker->in_use;
 		}
 
@@ -214,6 +216,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 			if (generation == worker->generation)
 				logicalrep_worker_cleanup(worker);
 			LWLockRelease(LogicalRepWorkerLock);
+			DisownLatch(&worker->launcherWakeupLatch);
 			return false;
 		}
 
@@ -221,13 +224,13 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 		 * We need timeout because we generally don't get notified via latch
 		 * about the worker attach.  But we don't expect to have to wait long.
 		 */
-		rc = WaitLatch(MyLatch,
+		rc = WaitLatch(&worker->launcherWakeupLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 					   10L, WAIT_EVENT_BGWORKER_STARTUP);
 
 		if (rc & WL_LATCH_SET)
 		{
-			ResetLatch(MyLatch);
+			ResetLatch(&worker->launcherWakeupLatch);
 			CHECK_FOR_INTERRUPTS();
 		}
 	}
@@ -573,6 +576,8 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 			break;
 	}
 
+	OwnLatch(&worker->launcherWakeupLatch);
+
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, signo);
 
@@ -583,18 +588,21 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 
 		/* is it gone? */
 		if (!worker->proc || worker->generation != generation)
+		{
+			DisownLatch(&worker->launcherWakeupLatch);
 			break;
+		}
 
 		LWLockRelease(LogicalRepWorkerLock);
 
 		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
+		rc = WaitLatch(&worker->launcherWakeupLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 					   10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
 
 		if (rc & WL_LATCH_SET)
 		{
-			ResetLatch(MyLatch);
+			ResetLatch(&worker->launcherWakeupLatch);
 			CHECK_FOR_INTERRUPTS();
 		}
 
@@ -837,7 +845,7 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (!InitializingApplyWorker)
 		LockReleaseAll(DEFAULT_LOCKMETHOD, true);
 
-	ApplyLauncherWakeup();
+	SetLatch(&MyLogicalRepWorker->launcherWakeupLatch);
 }
 
 /*
@@ -976,6 +984,7 @@ ApplyLauncherShmemInit(void)
 
 			memset(worker, 0, sizeof(LogicalRepWorker));
 			SpinLockInit(&worker->relmutex);
+			InitSharedLatch(&worker->launcherWakeupLatch);
 		}
 	}
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..33b1336136 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4652,6 +4652,7 @@ SetupApplyOrSyncWorker(int worker_slot)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
 	pqsignal(SIGTERM, die);
 	BackgroundWorkerUnblockSignals();
+	SetLatch(&MyLogicalRepWorker->launcherWakeupLatch);
 
 	/*
 	 * We don't currently need any ResourceOwner in a walreceiver process, but
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 515aefd519..7cc70a1b9e 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,6 +77,12 @@ typedef struct LogicalRepWorker
 	 */
 	FileSet    *stream_fileset;
 
+	/*
+	 * launcherWakeupLatch is used to wake up the launcher process after the
+	 * worker process is attached.
+	 */
+	Latch		launcherWakeupLatch;
+
 	/*
 	 * PID of leader apply worker if this slot is used for a parallel apply
 	 * worker, InvalidPid otherwise.
-- 
2.34.1

Reply via email to