On Wed, 2025-09-03 at 11:55 -0400, Andres Freund wrote: > I think the regression is not due to anything inherent to worker, but > due to > pressure on AioWorkerSubmissionQueueLock - at least that's what I'm > seeing on > a older two socket machine. It's possible the bottleneck is different > on a > newer machine (my newer workstation is busy on another benchmark rn).
I believe what's happening is that parallelism of the IO completion work (e.g. checksum verification) is reduced. In worker mode, the completion work is happening on the io workers (of which there are 3); while in sync mode the completion work is happening in the backends (of which there are 32). There may be lock contention too, but I don't think that's the primary issue. I attached a test patch for illustration. It simplifies the code inside the LWLock to enqueue/dequeue only, and simplifies and reduces the wakeups by doing pseudo-random wakeups only when enqueuing. Reducing the wakeups should reduce the number of signals generated without hurting my case, because the workers are never idle. And reducing the instructions while holding the LWLock should reduce lock contention. But the patch barely makes a difference: still around 24tps. What *does* make a difference is changing io_worker_queue_size. A lower value of 16 effectively starves the workers of work to do, and I get a speedup to about 28tps. A higher value of 512 gives the workers more chance to issue the IOs -- and more responsibility to complete them -- and it drops to 17tps. Furthermore, while the test is running, the io workers are constantly at 100% (mostly verifying checksums) and the backends are at 50% (20% when io_worker_queue_size=512). As an aside, I'm building with meson using -Dc_args="-msse4.2 -Wtype- limits -Werror=missing-braces". But I notice that the meson build doesn't seem to use -funroll-loops or -ftree-vectorize when building checksums.c. Is that intentional? If not, perhaps slower checksum calculations explain my results. Regards, Jeff Davis
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index b5ac073a910..25afa382932 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -48,10 +48,6 @@ #include "utils/wait_event.h" -/* How many workers should each worker wake up if needed? */ -#define IO_WORKER_WAKEUP_FANOUT 2 - - typedef struct PgAioWorkerSubmissionQueue { uint32 size; @@ -69,10 +65,11 @@ typedef struct PgAioWorkerSlot typedef struct PgAioWorkerControl { - uint64 idle_worker_mask; + int dummy; PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; } PgAioWorkerControl; +static int nextWakeupWorker = 0; static size_t pgaio_worker_shmem_size(void); static void pgaio_worker_shmem_init(bool first_time); @@ -94,7 +91,7 @@ const IoMethodOps pgaio_worker_ops = { int io_workers = 3; -static int io_worker_queue_size = 64; +static int io_worker_queue_size = 512; static int MyIoWorkerId; static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static PgAioWorkerControl *io_worker_control; @@ -152,7 +149,6 @@ pgaio_worker_shmem_init(bool first_time) &found); if (!found) { - io_worker_control->idle_worker_mask = 0; for (int i = 0; i < MAX_IO_WORKERS; ++i) { io_worker_control->workers[i].latch = NULL; @@ -162,17 +158,20 @@ pgaio_worker_shmem_init(bool first_time) } static int -pgaio_worker_choose_idle(void) +pgaio_worker_choose(void) { - int worker; + int worker; - if (io_worker_control->idle_worker_mask == 0) - return -1; + worker = nextWakeupWorker; + nextWakeupWorker = (nextWakeupWorker + 1) % io_workers; - /* Find the lowest bit position, and clear it. */ - worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); - Assert(io_worker_control->workers[worker].in_use); + if (MyBackendType == B_IO_WORKER && nextWakeupWorker == MyIoWorkerId) + { + if (io_workers > 1) + nextWakeupWorker = (nextWakeupWorker + 1) % io_workers; + else + return -1; + } return worker; } @@ -245,8 +244,6 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; - Latch *wakeup = NULL; - int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); @@ -261,25 +258,20 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) * we can to workers, to maximize concurrency. */ synchronous_ios[nsync++] = staged_ios[i]; - continue; } + } + LWLockRelease(AioWorkerSubmissionQueueLock); + + if (num_staged_ios > nsync) + { + int worker = pgaio_worker_choose(); - if (wakeup == NULL) + if (worker >= 0) { - /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; - - pgaio_debug_io(DEBUG4, staged_ios[i], - "choosing worker %d", - worker); + Latch *wakeup = io_worker_control->workers[worker].latch; + SetLatch(wakeup); } } - LWLockRelease(AioWorkerSubmissionQueueLock); - - if (wakeup) - SetLatch(wakeup); /* Run whatever is left synchronously. */ if (nsync > 0) @@ -317,7 +309,6 @@ pgaio_worker_die(int code, Datum arg) Assert(io_worker_control->workers[MyIoWorkerId].in_use); Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].in_use = false; io_worker_control->workers[MyIoWorkerId].latch = NULL; LWLockRelease(AioWorkerSubmissionQueueLock); @@ -354,7 +345,6 @@ pgaio_worker_register(void) if (MyIoWorkerId == -1) elog(ERROR, "couldn't find a free worker slot"); - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].latch = MyLatch; LWLockRelease(AioWorkerSubmissionQueueLock); @@ -458,10 +448,6 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) while (!ShutdownRequestPending) { uint32 io_index; - Latch *latches[IO_WORKER_WAKEUP_FANOUT]; - int nlatches = 0; - int nwakeups = 0; - int worker; /* * Try to get a job to do. @@ -470,36 +456,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) * to ensure that we don't see an outdated data in the handle. */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - if ((io_index = pgaio_worker_submission_queue_consume()) == -1) - { - /* - * Nothing to do. Mark self idle. - * - * XXX: Invent some kind of back pressure to reduce useless - * wakeups? - */ - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); - } - else - { - /* Got one. Clear idle flag. */ - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); - - /* See if we can wake up some peers. */ - nwakeups = Min(pgaio_worker_submission_queue_depth(), - IO_WORKER_WAKEUP_FANOUT); - for (int i = 0; i < nwakeups; ++i) - { - if ((worker = pgaio_worker_choose_idle()) < 0) - break; - latches[nlatches++] = io_worker_control->workers[worker].latch; - } - } + io_index = pgaio_worker_submission_queue_consume(); LWLockRelease(AioWorkerSubmissionQueueLock); - for (int i = 0; i < nlatches; ++i) - SetLatch(latches[i]); - if (io_index != -1) { PgAioHandle *ioh = NULL;