On Wed, 2025-09-03 at 11:55 -0400, Andres Freund wrote:
> I think the regression is not due to anything inherent to worker, but
> due to
> pressure on AioWorkerSubmissionQueueLock - at least that's what I'm
> seeing on
> a older two socket machine. It's possible the bottleneck is different
> on a
> newer machine (my newer workstation is busy on another benchmark rn).

I believe what's happening is that parallelism of the IO completion
work (e.g. checksum verification) is reduced. In worker mode, the
completion work is happening on the io workers (of which there are 3);
while in sync mode the completion work is happening in the backends (of
which there are 32).

There may be lock contention too, but I don't think that's the primary
issue.

I attached a test patch for illustration. It simplifies the code inside
the LWLock to enqueue/dequeue only, and simplifies and reduces the
wakeups by doing pseudo-random wakeups only when enqueuing. Reducing
the wakeups should reduce the number of signals generated without
hurting my case, because the workers are never idle. And reducing the
instructions while holding the LWLock should reduce lock contention.
But the patch barely makes a difference: still around 24tps.

What *does* make a difference is changing io_worker_queue_size. A lower
value of 16 effectively starves the workers of work to do, and I get a
speedup to about 28tps. A higher value of 512 gives the workers more
chance to issue the IOs -- and more responsibility to complete them --
and it drops to 17tps. Furthermore, while the test is running, the io
workers are constantly at 100% (mostly verifying checksums) and the
backends are at 50% (20% when io_worker_queue_size=512).

As an aside, I'm building with meson using -Dc_args="-msse4.2 -Wtype-
limits -Werror=missing-braces". But I notice that the meson build
doesn't seem to use -funroll-loops or -ftree-vectorize when building
checksums.c. Is that intentional? If not, perhaps slower checksum
calculations explain my results.

Regards,
        Jeff Davis

diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index b5ac073a910..25afa382932 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -48,10 +48,6 @@
 #include "utils/wait_event.h"
 
 
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
-
 typedef struct PgAioWorkerSubmissionQueue
 {
 	uint32		size;
@@ -69,10 +65,11 @@ typedef struct PgAioWorkerSlot
 
 typedef struct PgAioWorkerControl
 {
-	uint64		idle_worker_mask;
+	int dummy;
 	PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
 } PgAioWorkerControl;
 
+static int	nextWakeupWorker = 0;
 
 static size_t pgaio_worker_shmem_size(void);
 static void pgaio_worker_shmem_init(bool first_time);
@@ -94,7 +91,7 @@ const IoMethodOps pgaio_worker_ops = {
 int			io_workers = 3;
 
 
-static int	io_worker_queue_size = 64;
+static int	io_worker_queue_size = 512;
 static int	MyIoWorkerId;
 static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
 static PgAioWorkerControl *io_worker_control;
@@ -152,7 +149,6 @@ pgaio_worker_shmem_init(bool first_time)
 						&found);
 	if (!found)
 	{
-		io_worker_control->idle_worker_mask = 0;
 		for (int i = 0; i < MAX_IO_WORKERS; ++i)
 		{
 			io_worker_control->workers[i].latch = NULL;
@@ -162,17 +158,20 @@ pgaio_worker_shmem_init(bool first_time)
 }
 
 static int
-pgaio_worker_choose_idle(void)
+pgaio_worker_choose(void)
 {
-	int			worker;
+	int worker;
 
-	if (io_worker_control->idle_worker_mask == 0)
-		return -1;
+	worker = nextWakeupWorker;
+	nextWakeupWorker = (nextWakeupWorker + 1) % io_workers;
 
-	/* Find the lowest bit position, and clear it. */
-	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
-	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
-	Assert(io_worker_control->workers[worker].in_use);
+	if (MyBackendType == B_IO_WORKER && nextWakeupWorker == MyIoWorkerId)
+	{
+		if (io_workers > 1)
+			nextWakeupWorker = (nextWakeupWorker + 1) % io_workers;
+		else
+			return -1;
+	}
 
 	return worker;
 }
@@ -245,8 +244,6 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
 {
 	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
 	int			nsync = 0;
-	Latch	   *wakeup = NULL;
-	int			worker;
 
 	Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
 
@@ -261,25 +258,20 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
 			 * we can to workers, to maximize concurrency.
 			 */
 			synchronous_ios[nsync++] = staged_ios[i];
-			continue;
 		}
+	}
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	if (num_staged_ios > nsync)
+	{
+		int			worker = pgaio_worker_choose();
 
-		if (wakeup == NULL)
+		if (worker >= 0)
 		{
-			/* Choose an idle worker to wake up if we haven't already. */
-			worker = pgaio_worker_choose_idle();
-			if (worker >= 0)
-				wakeup = io_worker_control->workers[worker].latch;
-
-			pgaio_debug_io(DEBUG4, staged_ios[i],
-						   "choosing worker %d",
-						   worker);
+			Latch	   *wakeup = io_worker_control->workers[worker].latch;
+			SetLatch(wakeup);
 		}
 	}
-	LWLockRelease(AioWorkerSubmissionQueueLock);
-
-	if (wakeup)
-		SetLatch(wakeup);
 
 	/* Run whatever is left synchronously. */
 	if (nsync > 0)
@@ -317,7 +309,6 @@ pgaio_worker_die(int code, Datum arg)
 	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
 	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
 
-	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
 	io_worker_control->workers[MyIoWorkerId].in_use = false;
 	io_worker_control->workers[MyIoWorkerId].latch = NULL;
 	LWLockRelease(AioWorkerSubmissionQueueLock);
@@ -354,7 +345,6 @@ pgaio_worker_register(void)
 	if (MyIoWorkerId == -1)
 		elog(ERROR, "couldn't find a free worker slot");
 
-	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
 	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
 	LWLockRelease(AioWorkerSubmissionQueueLock);
 
@@ -458,10 +448,6 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 	while (!ShutdownRequestPending)
 	{
 		uint32		io_index;
-		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
-		int			nlatches = 0;
-		int			nwakeups = 0;
-		int			worker;
 
 		/*
 		 * Try to get a job to do.
@@ -470,36 +456,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 		 * to ensure that we don't see an outdated data in the handle.
 		 */
 		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
-		if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
-		{
-			/*
-			 * Nothing to do.  Mark self idle.
-			 *
-			 * XXX: Invent some kind of back pressure to reduce useless
-			 * wakeups?
-			 */
-			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
-		}
-		else
-		{
-			/* Got one.  Clear idle flag. */
-			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
-
-			/* See if we can wake up some peers. */
-			nwakeups = Min(pgaio_worker_submission_queue_depth(),
-						   IO_WORKER_WAKEUP_FANOUT);
-			for (int i = 0; i < nwakeups; ++i)
-			{
-				if ((worker = pgaio_worker_choose_idle()) < 0)
-					break;
-				latches[nlatches++] = io_worker_control->workers[worker].latch;
-			}
-		}
+		io_index = pgaio_worker_submission_queue_consume();
 		LWLockRelease(AioWorkerSubmissionQueueLock);
 
-		for (int i = 0; i < nlatches; ++i)
-			SetLatch(latches[i]);
-
 		if (io_index != -1)
 		{
 			PgAioHandle *ioh = NULL;

Reply via email to