I've been annoyed for awhile because, while a parallel check-world
run usually takes a bit over a minute on my machine, sometimes it
takes between three and four minutes.  I was finally able to
track down what is happening, and it's this: sometimes one or
another of the src/test/subscription tests takes an extra three
minutes because the logical replication launcher is sleeping
instead of launching the next task.  It eventually reaches its
hard-wired maximum wait of DEFAULT_NAPTIME_PER_CYCLE (3min),
wakes up and notices it has something to do, and then we're
on our merry way again.  I'm not sure how often this is a problem
in the real world, but it happens often enough to be annoying
during development.

There are two distinct bugs involved:

1. WaitForReplicationWorkerAttach sometimes has to clear a process
latch event so that it can keep waiting for the worker to launch.
It neglects to set the latch again, allowing ApplyLauncherMain
to miss events.

2. ApplyLauncherMain ignores a failure return from
logicalrep_worker_launch, which is bad because (unless it has
another worker launch pending) it will then sleep for
DEFAULT_NAPTIME_PER_CYCLE before reconsidering.  What it ought to do
is try again after wal_retrieve_retry_interval.  This situation can
arise in resource-exhaustion cases (cf. the early exits in
logicalrep_worker_launch), but what's more likely in the regression
tests is that the worker stops due to some error condition before
WaitForReplicationWorkerAttach sees it attached, which is then duly
reported as a failure.

It's possible to make the test slowness extremely reproducible
with this change, which widens the race condition window for
both problems:

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 1c3c051403d..724e82bcdc1 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -214,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                 */
                rc = WaitLatch(MyLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | 
WL_EXIT_ON_PM_DEATH,
-                                          10L, WAIT_EVENT_BGWORKER_STARTUP);
+                                          1000L, WAIT_EVENT_BGWORKER_STARTUP);
 
                if (rc & WL_LATCH_SET)
                {

I don't recommend that as a permanent change, but it's helpful
for testing the attached patch.

In the attached, I made two other non-cosmetic changes:

3. In WaitForReplicationWorkerAttach, capture worker->in_use
before not after releasing LogicalRepWorkerLock.  Maybe there
is a reason why that's not a dangerous race condition, but
it sure is un-obvious to me.

4. In process_syncing_tables_for_apply (the other caller of
logicalrep_worker_launch), it seems okay to ignore the
result of logicalrep_worker_launch, but I think it should
fill hentry->last_start_time before not after the call.
Otherwise we might be changing a hashtable entry that's
no longer relevant to this worker.  I'm not sure exactly
where the failed worker will be cleaned-up-after, but
it could very easily be out of the system entirely before
logicalrep_worker_launch returns.

Barring objections, I plan to apply and back-patch this.

                        regards, tom lane

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 1c3c051403d..14d8efbd25b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 							   uint16 generation,
 							   BackgroundWorkerHandle *handle)
 {
-	BgwHandleStatus status;
-	int			rc;
+	bool		result = false;
+	bool		dropped_latch = false;
 
 	for (;;)
 	{
+		BgwHandleStatus status;
 		pid_t		pid;
+		int			rc;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 		/* Worker either died or has started. Return false if died. */
 		if (!worker->in_use || worker->proc)
 		{
+			result = worker->in_use;
 			LWLockRelease(LogicalRepWorkerLock);
-			return worker->in_use;
+			break;
 		}
 
 		LWLockRelease(LogicalRepWorkerLock);
@@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 			if (generation == worker->generation)
 				logicalrep_worker_cleanup(worker);
 			LWLockRelease(LogicalRepWorkerLock);
-			return false;
+			break;				/* result is already false */
 		}
 
 		/*
@@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 		{
 			ResetLatch(MyLatch);
 			CHECK_FOR_INTERRUPTS();
+			dropped_latch = true;
 		}
 	}
+
+	/*
+	 * If we had to clear a latch event in order to wait, be sure to restore
+	 * it before exiting.  Otherwise caller may miss events.
+	 */
+	if (dropped_latch)
+		SetLatch(MyLatch);
+
+	return result;
 }
 
 /*
@@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg)
 				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
 			{
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(WORKERTYPE_APPLY,
-										 sub->dbid, sub->oid, sub->name,
-										 sub->owner, InvalidOid,
-										 DSM_HANDLE_INVALID);
+				if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+											  sub->dbid, sub->oid, sub->name,
+											  sub->owner, InvalidOid,
+											  DSM_HANDLE_INVALID))
+				{
+					/*
+					 * We get here either if we failed to launch a worker
+					 * (perhaps for resource-exhaustion reasons) or if we
+					 * launched one but it immediately quit.  Either way, it
+					 * seems appropriate to try again after
+					 * wal_retrieve_retry_interval.
+					 */
+					wait_time = Min(wait_time,
+									wal_retrieve_retry_interval);
+				}
 			}
 			else
 			{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..b679156e3d8 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -603,6 +603,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
+						/*
+						 * Set the last_start_time even if we fail to start
+						 * the worker, so that we won't retry until
+						 * wal_retrieve_retry_interval has elapsed.
+						 */
+						hentry->last_start_time = now;
 						logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
 												 MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
@@ -610,7 +616,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 MyLogicalRepWorker->userid,
 												 rstate->relid,
 												 DSM_HANDLE_INVALID);
-						hentry->last_start_time = now;
 					}
 				}
 			}

Reply via email to