It's hard to know how to set io_workers=3. If it's too small, io_method=worker's small submission queue overflows and it silently falls back to synchronous IO. If it's too high, it generates a lot of pointless wakeups and scheduling overhead, which might be considered an independent problem or not, but having the right size pool certainly mitigates it. Here's a patch to replace that GUC with:
io_min_workers=1 io_max_workers=8 io_worker_idle_timeout=60s io_worker_launch_interval=500ms It grows the pool when a backlog is detected (better ideas for this logic welcome), and lets idle workers time out. IO jobs were already concentrated into the lowest numbered workers, partly because that seemed to have marginally better latency than anything else tried so far due to latch collapsing with lucky timing, and partly in anticipation of this. The patch also reduces bogus wakeups a bit by being a bit more cautious about fanout. That could probably be improved a lot more and needs more research. It's quite tricky to figure out how to suppress wakeups without throwing potential concurrency away. The first couple of patches are independent of this topic, and might be potential cleanups/fixes for master/v18. The last is a simple latency test. Ideas, testing, flames etc welcome.
From 1dbba36f67df5d3d34a990613d6d68d15caf1b17 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 29 Mar 2025 13:25:27 +1300 Subject: [PATCH 1/5] aio: Regularize io_method=worker naming conventions. method_worker.c didn't keep up with the pattern of PgAioXXX for type names in the pgaio module. Add the missing "Pg" prefix used else where. Likewise for pgaio_choose_idle_worker() which alone failed to use a pgaio_worker_XXX() name refecting its submodule. Rename. Standardize on parameter names num_staged_ios, staged_ios for the internal submission function. Rename the array of handle IDs in PgAioSubmissionQueue to sqes, since that's a term of art seen in many of these types of systems. --- src/backend/storage/aio/method_worker.c | 54 ++++++++++++------------- src/tools/pgindent/typedefs.list | 6 +-- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 8ad17ec1ef7..ba5bc5e44ba 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -51,26 +51,26 @@ #define IO_WORKER_WAKEUP_FANOUT 2 -typedef struct AioWorkerSubmissionQueue +typedef struct PgAioWorkerSubmissionQueue { uint32 size; uint32 mask; uint32 head; uint32 tail; - uint32 ios[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerSubmissionQueue; + uint32 sqes[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerSubmissionQueue; -typedef struct AioWorkerSlot +typedef struct PgAioWorkerSlot { Latch *latch; bool in_use; -} AioWorkerSlot; +} PgAioWorkerSlot; -typedef struct AioWorkerControl +typedef struct PgAioWorkerControl { uint64 idle_worker_mask; - AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerControl; + PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerControl; static size_t pgaio_worker_shmem_size(void); @@ -95,8 +95,8 @@ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; -static AioWorkerSubmissionQueue *io_worker_submission_queue; -static AioWorkerControl *io_worker_control; +static PgAioWorkerSubmissionQueue *io_worker_submission_queue; +static PgAioWorkerControl *io_worker_control; static size_t @@ -105,15 +105,15 @@ pgaio_worker_queue_shmem_size(int *queue_size) /* Round size up to next power of two so we can make a mask. */ *queue_size = pg_nextpower2_32(io_worker_queue_size); - return offsetof(AioWorkerSubmissionQueue, ios) + + return offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(uint32) * *queue_size; } static size_t pgaio_worker_control_shmem_size(void) { - return offsetof(AioWorkerControl, workers) + - sizeof(AioWorkerSlot) * MAX_IO_WORKERS; + return offsetof(PgAioWorkerControl, workers) + + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS; } static size_t @@ -161,7 +161,7 @@ pgaio_worker_shmem_init(bool first_time) } static int -pgaio_choose_idle_worker(void) +pgaio_worker_choose_idle(void) { int worker; @@ -178,7 +178,7 @@ pgaio_choose_idle_worker(void) static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 new_head; queue = io_worker_submission_queue; @@ -190,7 +190,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) return false; /* full */ } - queue->ios[queue->head] = pgaio_io_get_id(ioh); + queue->sqes[queue->head] = pgaio_io_get_id(ioh); queue->head = new_head; return true; @@ -199,14 +199,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) static uint32 pgaio_worker_submission_queue_consume(void) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 result; queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ - result = queue->ios[queue->tail]; + result = queue->sqes[queue->tail]; queue->tail = (queue->tail + 1) & (queue->size - 1); return result; @@ -239,37 +239,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) } static void -pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) +pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; Latch *wakeup = NULL; int worker; - Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); + Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < nios; ++i) + for (int i = 0; i < num_staged_ios; ++i) { - Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); - if (!pgaio_worker_submission_queue_insert(ios[i])) + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) { /* * We'll do it synchronously, but only after we've sent as many as * we can to workers, to maximize concurrency. */ - synchronous_ios[nsync++] = ios[i]; + synchronous_ios[nsync++] = staged_ios[i]; continue; } if (wakeup == NULL) { /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_choose_idle_worker(); + worker = pgaio_worker_choose_idle(); if (worker >= 0) wakeup = io_worker_control->workers[worker].latch; - pgaio_debug_io(DEBUG4, ios[i], + pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } @@ -482,7 +482,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) IO_WORKER_WAKEUP_FANOUT); for (int i = 0; i < nwakeups; ++i) { - if ((worker = pgaio_choose_idle_worker()) < 0) + if ((worker = pgaio_worker_choose_idle()) < 0) break; latches[nlatches++] = io_worker_control->workers[worker].latch; } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d16bc208654..9946cfcec41 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -55,9 +55,6 @@ AggStrategy AggTransInfo Aggref AggregateInstrumentation -AioWorkerControl -AioWorkerSlot -AioWorkerSubmissionQueue AlenState Alias AllocBlock @@ -2175,6 +2172,9 @@ PgAioTargetID PgAioTargetInfo PgAioUringContext PgAioWaitRef +PgAioWorkerControl +PgAioWorkerSlot +PgAioWorkerSubmissionQueue PgArchData PgBackendGSSStatus PgBackendSSLStatus -- 2.39.5
From 99c9a303d37d7e2232d3c28ee091aed82fe5b8eb Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 11 Apr 2025 23:10:10 +1200 Subject: [PATCH 2/5] aio: Remove IO worker ID references from postmaster.c. An ancient ancestor of this code had the postmaster assign IDs to IO workers. Now it tracks them in an unordered array, and it might be confusing to readers that it refers to their indexes as IDs in various places. Fix. --- src/backend/postmaster/postmaster.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 17fed96fe20..0e8623dea18 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -4337,15 +4337,15 @@ maybe_start_bgworkers(void) static bool maybe_reap_io_worker(int pid) { - for (int id = 0; id < MAX_IO_WORKERS; ++id) + for (int i = 0; i < MAX_IO_WORKERS; ++i) { - if (io_worker_children[id] && - io_worker_children[id]->pid == pid) + if (io_worker_children[i] && + io_worker_children[i]->pid == pid) { - ReleasePostmasterChildSlot(io_worker_children[id]); + ReleasePostmasterChildSlot(io_worker_children[i]); --io_worker_count; - io_worker_children[id] = NULL; + io_worker_children[i] = NULL; return true; } } @@ -4389,22 +4389,22 @@ maybe_adjust_io_workers(void) while (io_worker_count < io_workers) { PMChild *child; - int id; + int i; /* find unused entry in io_worker_children array */ - for (id = 0; id < MAX_IO_WORKERS; ++id) + for (i = 0; i < MAX_IO_WORKERS; ++i) { - if (io_worker_children[id] == NULL) + if (io_worker_children[i] == NULL) break; } - if (id == MAX_IO_WORKERS) - elog(ERROR, "could not find a free IO worker ID"); + if (i == MAX_IO_WORKERS) + elog(ERROR, "could not find a free IO worker slot"); /* Try to launch one. */ child = StartChildProcess(B_IO_WORKER); if (child != NULL) { - io_worker_children[id] = child; + io_worker_children[i] = child; ++io_worker_count; } else @@ -4415,11 +4415,11 @@ maybe_adjust_io_workers(void) if (io_worker_count > io_workers) { /* ask the IO worker in the highest slot to exit */ - for (int id = MAX_IO_WORKERS - 1; id >= 0; --id) + for (int i = MAX_IO_WORKERS - 1; i >= 0; --i) { - if (io_worker_children[id] != NULL) + if (io_worker_children[i] != NULL) { - kill(io_worker_children[id]->pid, SIGUSR2); + kill(io_worker_children[i]->pid, SIGUSR2); break; } } -- 2.39.5
From a90a692725eedd692f934bf3ed56a2e3a7f3fc2c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 11 Apr 2025 21:17:26 +1200 Subject: [PATCH 3/5] aio: Try repeatedly to give batched IOs to workers. Previously, if the first of a batch of IOs didn't fit in a batch we'd run all of them synchronously. Andres rightly pointed out that we should really try again between synchronous IOs, since the workers might have made progress. Suggested-by: Andres Freund <and...@anarazel.de> --- src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index ba5bc5e44ba..c20d6d0f18b 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -280,12 +280,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) SetLatch(wakeup); /* Run whatever is left synchronously. */ - if (nsync > 0) + for (int i = 0; i < nsync; ++i) { - for (int i = 0; i < nsync; ++i) + wakeup = NULL; + + /* + * Between synchronous IO operations, try again to enqueue as many as + * we can. + */ + if (i > 0) { - pgaio_io_perform_synchronously(synchronous_ios[i]); + wakeup = NULL; + + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + while (i < nsync && + pgaio_worker_submission_queue_insert(synchronous_ios[i])) + { + if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0) + wakeup = io_worker_control->workers[worker].latch; + i++; + } + LWLockRelease(AioWorkerSubmissionQueueLock); + + if (wakeup) + SetLatch(wakeup); + + if (i == nsync) + break; } + + pgaio_io_perform_synchronously(synchronous_ios[i]); } } -- 2.39.5
From 02325442bea440e65b5f3817c3fb8bd4681bbd25 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 22 Mar 2025 00:36:49 +1300 Subject: [PATCH 4/5] aio: Adjust IO worker pool size automatically. Replace the simple io_workers setting with: io_min_workers=1 io_max_workers=8 io_worker_idle_timeout=60s io_worker_launch_interval=500ms The pool is automatically sized within the configured range according to demand. XXX WIP --- doc/src/sgml/config.sgml | 70 ++- src/backend/postmaster/postmaster.c | 64 ++- src/backend/storage/aio/method_worker.c | 450 ++++++++++++++---- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 46 +- src/backend/utils/misc/postgresql.conf.sample | 5 +- src/include/storage/io_worker.h | 9 +- src/include/storage/lwlocklist.h | 1 + src/include/storage/pmsignal.h | 1 + src/test/modules/test_aio/t/002_io_workers.pl | 15 +- 10 files changed, 541 insertions(+), 121 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c1674c22cb2..9f2e7ae6785 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2769,16 +2769,76 @@ include_dir 'conf.d' </listitem> </varlistentry> - <varlistentry id="guc-io-workers" xreflabel="io_workers"> - <term><varname>io_workers</varname> (<type>int</type>) + <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers"> + <term><varname>io_min_workers</varname> (<type>int</type>) <indexterm> - <primary><varname>io_workers</varname> configuration parameter</primary> + <primary><varname>io_min_workers</varname> configuration parameter</primary> </indexterm> </term> <listitem> <para> - Selects the number of I/O worker processes to use. The default is - 3. This parameter can only be set in the + Sets the minimum number of I/O worker processes to use. The default is + 1. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers"> + <term><varname>io_max_workers</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_max_workers</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the maximum number of I/O worker processes to use. The default is + 8. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout"> + <term><varname>io_worker_idle_timeout</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the time after which idle I/O worker processes will exit, reducing the + maximum size of the I/O worker pool towards the minimum. The default + is 1 minute. + This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command + line. + </para> + <para> + Only has an effect if <xref linkend="guc-io-method"/> is set to + <literal>worker</literal>. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval"> + <term><varname>io_worker_launch_interval</varname> (<type>int</type>) + <indexterm> + <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the minimum time between launching new I/O workers. This can be used to avoid + sudden bursts of new I/O workers. The default is 100ms. + This parameter can only be set in the <filename>postgresql.conf</filename> file or on the server command line. </para> diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 0e8623dea18..b3f68897194 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL; #endif /* State for IO worker management. */ +static TimestampTz io_worker_launch_delay_until = 0; static int io_worker_count = 0; static PMChild *io_worker_children[MAX_IO_WORKERS]; @@ -1569,6 +1570,15 @@ DetermineSleepTime(void) if (StartWorkerNeeded) return 0; + /* If we need a new IO worker, defer until launch delay expires. */ + if (pgaio_worker_test_new_worker_needed() && + io_worker_count < io_max_workers) + { + if (io_worker_launch_delay_until == 0) + return 0; + next_wakeup = io_worker_launch_delay_until; + } + if (HaveCrashedWorker) { dlist_mutable_iter iter; @@ -3750,6 +3760,15 @@ process_pm_pmsignal(void) StartWorkerNeeded = true; } + /* Process IO worker start requets. */ + if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE)) + { + /* + * No local flag, as the state is exposed through pgaio_worker_*() + * functions. This signal is received on potentially actionable level + * changes, so that maybe_adjust_io_workers() will run. + */ + } /* Process background worker state changes. */ if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)) { @@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid) /* * Start or stop IO workers, to close the gap between the number of running * workers and the number of configured workers. Used to respond to change of - * the io_workers GUC (by increasing and decreasing the number of workers), as - * well as workers terminating in response to errors (by starting + * the io_{min,max}_workers GUCs (by increasing and decreasing the number of + * workers) and requests to start a new one due to submission queue backlog, + * as well as workers terminating in response to errors (by starting * "replacement" workers). */ static void @@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void) Assert(pmState < PM_WAIT_IO_WORKERS); - /* Not enough running? */ - while (io_worker_count < io_workers) + /* Cancel the launch delay when it expires to minimize clock access. */ + if (io_worker_launch_delay_until != 0 && + io_worker_launch_delay_until <= GetCurrentTimestamp()) + io_worker_launch_delay_until = 0; + + /* Not enough workers running? */ + while (io_worker_launch_delay_until == 0 && + io_worker_count < io_max_workers && + ((io_worker_count < io_min_workers || + pgaio_worker_clear_new_worker_needed()))) { PMChild *child; int i; @@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void) if (i == MAX_IO_WORKERS) elog(ERROR, "could not find a free IO worker slot"); + /* + * Apply launch delay even for failures to avoid retrying too fast on + * fork() failure, but not while we're still building the minimum pool + * size. + */ + if (io_worker_count >= io_min_workers) + io_worker_launch_delay_until = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + io_worker_launch_interval); + /* Try to launch one. */ child = StartChildProcess(B_IO_WORKER); if (child != NULL) @@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void) break; /* try again next time */ } - /* Too many running? */ - if (io_worker_count > io_workers) - { - /* ask the IO worker in the highest slot to exit */ - for (int i = MAX_IO_WORKERS - 1; i >= 0; --i) - { - if (io_worker_children[i] != NULL) - { - kill(io_worker_children[i]->pid, SIGUSR2); - break; - } - } - } + /* + * If there are too many running because io_max_workers changed, that will + * be handled by the IO workers themselves so they can shut down in + * preferred order. + */ } diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index c20d6d0f18b..78817bb4196 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -11,9 +11,10 @@ * infrastructure for reopening the file, and must processed synchronously by * the client code when submitted. * - * So that the submitter can make just one system call when submitting a batch - * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This - * could be improved by using futexes instead of latches to wake N waiters. + * When a batch of IOs is submitted, the lowest numbered idle worker is woken + * up. If it sees more work in the queue it wakes a peer to help, and so on + * in a chain. When a backlog is detected, the pool size is increased. When + * the highest numbered worker times out after a period of inactivity. * * This method of AIO is available in all builds on all operating systems, and * is the default. @@ -40,16 +41,16 @@ #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/pmsignal.h" #include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/memdebug.h" #include "utils/ps_status.h" #include "utils/wait_event.h" - -/* How many workers should each worker wake up if needed? */ -#define IO_WORKER_WAKEUP_FANOUT 2 - +/* Saturation for stats counters used to estimate wakeup:work ratio. */ +#define PGAIO_WORKER_STATS_MAX 64 typedef struct PgAioWorkerSubmissionQueue { @@ -62,17 +63,25 @@ typedef struct PgAioWorkerSubmissionQueue typedef struct PgAioWorkerSlot { - Latch *latch; - bool in_use; + ProcNumber proc_number; } PgAioWorkerSlot; typedef struct PgAioWorkerControl { + /* Seen by postmaster */ + volatile bool new_worker_needed; + + /* Potected by AioWorkerSubmissionQueueLock. */ uint64 idle_worker_mask; + + /* Protected by AioWorkerControlLock. */ + uint64 worker_set; + int nworkers; + + /* Protected by AioWorkerControlLock. */ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; } PgAioWorkerControl; - static size_t pgaio_worker_shmem_size(void); static void pgaio_worker_shmem_init(bool first_time); @@ -90,11 +99,14 @@ const IoMethodOps pgaio_worker_ops = { /* GUCs */ -int io_workers = 3; +int io_min_workers = 1; +int io_max_workers = 8; +int io_worker_idle_timeout = 60000; +int io_worker_launch_interval = 500; static int io_worker_queue_size = 64; -static int MyIoWorkerId; +static int MyIoWorkerId = -1; static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static PgAioWorkerControl *io_worker_control; @@ -151,36 +163,171 @@ pgaio_worker_shmem_init(bool first_time) &found); if (!found) { - io_worker_control->idle_worker_mask = 0; + io_worker_control->new_worker_needed = false; + io_worker_control->worker_set = 0; for (int i = 0; i < MAX_IO_WORKERS; ++i) - { - io_worker_control->workers[i].latch = NULL; - io_worker_control->workers[i].in_use = false; - } + io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER; + } +} + +static void +pgaio_worker_consider_new_worker(uint32 queue_depth) +{ + /* + * This is called from sites that don't hold AioWorkerControlLock, but it + * changes infrequently and an up to date value is not required for this + * heuristic purpose. + */ + if (!io_worker_control->new_worker_needed && + queue_depth >= io_worker_control->nworkers) + { + io_worker_control->new_worker_needed = true; + SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE); } } +/* + * Called by a worker when the queue is empty, to try to prevent a delayed + * reaction to a brief burst. This races against the postmaster acting on the + * old value if it was recently set to true, but that's OK, the ordering would + * be indeterminate anyway even if we could use locks in the postmaster. + */ +static void +pgaio_worker_cancel_new_worker(void) +{ + io_worker_control->new_worker_needed = false; +} + +/* + * Called by the postmaster to check if a new worker is needed. + */ +bool +pgaio_worker_test_new_worker_needed(void) +{ + return io_worker_control->new_worker_needed; +} + +/* + * Called by the postmaster to check if a new worker is needed when it's ready + * to launch one, and clear the flag. + */ +bool +pgaio_worker_clear_new_worker_needed(void) +{ + bool result; + + result = io_worker_control->new_worker_needed; + if (result) + io_worker_control->new_worker_needed = false; + + return result; +} + +static uint64 +pgaio_worker_mask(int worker) +{ + return UINT64_C(1) << worker; +} + +static void +pgaio_worker_add(uint64 *set, int worker) +{ + *set |= pgaio_worker_mask(worker); +} + +static void +pgaio_worker_remove(uint64 *set, int worker) +{ + *set &= ~pgaio_worker_mask(worker); +} + +#ifdef USE_ASSERT_CHECKING +static bool +pgaio_worker_in(uint64 set, int worker) +{ + return (set & pgaio_worker_mask(worker)) != 0; +} +#endif + +static uint64 +pgaio_worker_highest(uint64 set) +{ + return pg_leftmost_one_pos64(set); +} + +static uint64 +pgaio_worker_lowest(uint64 set) +{ + return pg_rightmost_one_pos64(set); +} + +static int +pgaio_worker_pop(uint64 *set) +{ + int worker; + + Assert(set != 0); + worker = pgaio_worker_lowest(*set); + pgaio_worker_remove(set, worker); + return worker; +} + static int pgaio_worker_choose_idle(void) { + uint64 idle_worker_mask; int worker; - if (io_worker_control->idle_worker_mask == 0) + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + + /* + * Workers only wake higher numbered workers, to try to encourage an + * ordering of wakeup:work ratios, reducing spurious wakeups in lower + * numbered workers. + */ + idle_worker_mask = io_worker_control->idle_worker_mask; + if (MyIoWorkerId != -1) + idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1); + + if (idle_worker_mask == 0) return -1; /* Find the lowest bit position, and clear it. */ - worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); + worker = pgaio_worker_lowest(idle_worker_mask); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker); return worker; } +/* + * Try to wake a worker by setting its latch, to tell it there are IOs to + * process in the submission queue. + */ +static void +pgaio_worker_wake(int worker) +{ + ProcNumber proc_number; + + /* + * If the selected worker is concurrently exiting, then pgaio_worker_die() + * had not yet removed it as of when we saw it in idle_worker_mask. That's + * OK, because it will wake all remaining workers to close wakeup-vs-exit + * races: *someone* will see the queued IO. If there are no workers + * running, the postmaster will start a new one. + */ + proc_number = io_worker_control->workers[worker].proc_number; + if (proc_number != INVALID_PROC_NUMBER) + SetLatch(&GetPGProcByNumber(proc_number)->procLatch); +} + static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { PgAioWorkerSubmissionQueue *queue; uint32 new_head; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + queue = io_worker_submission_queue; new_head = (queue->head + 1) & (queue->size - 1); if (new_head == queue->tail) @@ -202,6 +349,8 @@ pgaio_worker_submission_queue_consume(void) PgAioWorkerSubmissionQueue *queue; uint32 result; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ @@ -218,6 +367,8 @@ pgaio_worker_submission_queue_depth(void) uint32 head; uint32 tail; + Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)); + head = io_worker_submission_queue->head; tail = io_worker_submission_queue->tail; @@ -242,9 +393,9 @@ static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + uint32 queue_depth; + int worker = -1; int nsync = 0; - Latch *wakeup = NULL; - int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); @@ -259,51 +410,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) * we can to workers, to maximize concurrency. */ synchronous_ios[nsync++] = staged_ios[i]; - continue; } - - if (wakeup == NULL) + else if (worker == -1) { /* Choose an idle worker to wake up if we haven't already. */ worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } } + queue_depth = pgaio_worker_submission_queue_depth(); LWLockRelease(AioWorkerSubmissionQueueLock); - if (wakeup) - SetLatch(wakeup); + if (worker != -1) + pgaio_worker_wake(worker); + else + pgaio_worker_consider_new_worker(queue_depth); /* Run whatever is left synchronously. */ for (int i = 0; i < nsync; ++i) { - wakeup = NULL; - /* * Between synchronous IO operations, try again to enqueue as many as * we can. */ if (i > 0) { - wakeup = NULL; + worker = -1; LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); while (i < nsync && pgaio_worker_submission_queue_insert(synchronous_ios[i])) { - if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0) - wakeup = io_worker_control->workers[worker].latch; + if (worker == -1) + worker = pgaio_worker_choose_idle(); i++; } LWLockRelease(AioWorkerSubmissionQueueLock); - if (wakeup) - SetLatch(wakeup); + if (worker != -1) + pgaio_worker_wake(worker); if (i == nsync) break; @@ -335,13 +483,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) static void pgaio_worker_die(int code, Datum arg) { - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - Assert(io_worker_control->workers[MyIoWorkerId].in_use); - Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + uint64 notify_set; - io_worker_control->workers[MyIoWorkerId].in_use = false; - io_worker_control->workers[MyIoWorkerId].latch = NULL; + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId); LWLockRelease(AioWorkerSubmissionQueueLock); + + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE); + Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber); + io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER; + Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId)); + pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId); + notify_set = io_worker_control->worker_set; + Assert(io_worker_control->nworkers > 0); + io_worker_control->nworkers--; + Assert(pg_popcount64(io_worker_control->worker_set) == + io_worker_control->nworkers); + LWLockRelease(AioWorkerControlLock); + + /* Notify other workers on pool change. */ + while (notify_set != 0) + pgaio_worker_wake(pgaio_worker_pop(¬ify_set)); } /* @@ -351,33 +513,37 @@ pgaio_worker_die(int code, Datum arg) static void pgaio_worker_register(void) { - MyIoWorkerId = -1; + uint64 worker_set_inverted; + uint64 old_worker_set; - /* - * XXX: This could do with more fine-grained locking. But it's also not - * very common for the number of workers to change at the moment... - */ - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + MyIoWorkerId = -1; - for (int i = 0; i < MAX_IO_WORKERS; ++i) + LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE); + worker_set_inverted = ~io_worker_control->worker_set; + if (worker_set_inverted != 0) { - if (!io_worker_control->workers[i].in_use) - { - Assert(io_worker_control->workers[i].latch == NULL); - io_worker_control->workers[i].in_use = true; - MyIoWorkerId = i; - break; - } - else - Assert(io_worker_control->workers[i].latch != NULL); + MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted); + if (MyIoWorkerId >= MAX_IO_WORKERS) + MyIoWorkerId = -1; } - if (MyIoWorkerId == -1) elog(ERROR, "couldn't find a free worker slot"); - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); - io_worker_control->workers[MyIoWorkerId].latch = MyLatch; - LWLockRelease(AioWorkerSubmissionQueueLock); + Assert(io_worker_control->workers[MyIoWorkerId].proc_number == + INVALID_PROC_NUMBER); + io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber; + + old_worker_set = io_worker_control->worker_set; + Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId)); + pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId); + io_worker_control->nworkers++; + Assert(pg_popcount64(io_worker_control->worker_set) == + io_worker_control->nworkers); + LWLockRelease(AioWorkerControlLock); + + /* Notify other workers on pool change. */ + while (old_worker_set != 0) + pgaio_worker_wake(pgaio_worker_pop(&old_worker_set)); on_shmem_exit(pgaio_worker_die, 0); } @@ -403,14 +569,47 @@ pgaio_worker_error_callback(void *arg) errcontext("I/O worker executing I/O on behalf of process %d", owner_pid); } +/* + * Check if this backend is allowed to time out, and thus should use a + * non-infinite sleep time. Only the highest-numbered worker is allowed to + * time out, and only if the pool is above io_min_workers. Serializing + * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting + * io_min_workers. + * + * The result is only instantaneously true and may be temporarily inconsistent + * in different workers around transitions, but all workers are woken up on + * pool size or GUC changes making the result eventually consistent. + */ +static bool +pgaio_worker_can_timeout(void) +{ + uint64 worker_set; + + /* Serialize against pool sized changes. */ + LWLockAcquire(AioWorkerControlLock, LW_SHARED); + worker_set = io_worker_control->worker_set; + LWLockRelease(AioWorkerControlLock); + + if (MyIoWorkerId != pgaio_worker_highest(worker_set)) + return false; + if (MyIoWorkerId < io_min_workers) + return false; + + return true; +} + void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; + TimestampTz idle_timeout_abs = 0; + int timeout_guc_used = 0; PgAioHandle *volatile error_ioh = NULL; ErrorContextCallback errcallback = {0}; volatile int error_errno = 0; char cmd[128]; + int ios = 0; + int wakeups = 0; MyBackendType = B_IO_WORKER; AuxiliaryProcessMainCommon(); @@ -479,47 +678,53 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) while (!ShutdownRequestPending) { uint32 io_index; - Latch *latches[IO_WORKER_WAKEUP_FANOUT]; - int nlatches = 0; - int nwakeups = 0; - int worker; + uint32 queue_depth; + int worker = -1; /* Try to get a job to do. */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) + io_index = pgaio_worker_submission_queue_consume(); + queue_depth = pgaio_worker_submission_queue_depth(); + if (io_index == UINT32_MAX) { - /* - * Nothing to do. Mark self idle. - * - * XXX: Invent some kind of back pressure to reduce useless - * wakeups? - */ - io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); + /* Nothing to do. Mark self idle. */ + pgaio_worker_add(&io_worker_control->idle_worker_mask, + MyIoWorkerId); } else { /* Got one. Clear idle flag. */ - io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); + pgaio_worker_remove(&io_worker_control->idle_worker_mask, + MyIoWorkerId); - /* See if we can wake up some peers. */ - nwakeups = Min(pgaio_worker_submission_queue_depth(), - IO_WORKER_WAKEUP_FANOUT); - for (int i = 0; i < nwakeups; ++i) - { - if ((worker = pgaio_worker_choose_idle()) < 0) - break; - latches[nlatches++] = io_worker_control->workers[worker].latch; - } + /* + * See if we should wake up a peer. Only do this if this worker + * is not experiencing spurious wakeups itself, to end a chain of + * wasted scheduling. + */ + if (queue_depth > 0 && wakeups <= ios) + worker = pgaio_worker_choose_idle(); } LWLockRelease(AioWorkerSubmissionQueueLock); - for (int i = 0; i < nlatches; ++i) - SetLatch(latches[i]); + /* Propagate wakeups. */ + if (worker != -1) + pgaio_worker_wake(worker); + else if (wakeups <= ios) + pgaio_worker_consider_new_worker(queue_depth); if (io_index != UINT32_MAX) { PgAioHandle *ioh = NULL; + /* Cancel timeout and update wakeup:work ratio. */ + idle_timeout_abs = 0; + if (++ios == PGAIO_WORKER_STATS_MAX) + { + ios /= 2; + wakeups /= 2; + } + ioh = &pgaio_ctl->io_handles[io_index]; error_ioh = ioh; errcallback.arg = ioh; @@ -585,12 +790,83 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) } else { - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, - WAIT_EVENT_IO_WORKER_MAIN); + int timeout_ms; + + /* Cancel new worker if pending. */ + pgaio_worker_cancel_new_worker(); + + /* Compute the remaining allowed idle time. */ + if (io_worker_idle_timeout == -1) + { + /* Never time out. */ + timeout_ms = -1; + } + else + { + TimestampTz now = GetCurrentTimestamp(); + + /* If the GUC changes, reset timer. */ + if (idle_timeout_abs != 0 && + io_worker_idle_timeout != timeout_guc_used) + idle_timeout_abs = 0; + + /* On first sleep, compute absolute timeout. */ + if (idle_timeout_abs == 0) + { + idle_timeout_abs = + TimestampTzPlusMilliseconds(now, + io_worker_idle_timeout); + timeout_guc_used = io_worker_idle_timeout; + } + + /* + * All workers maintain the absolute timeout value, but only + * the highest worker can actually time out and only if + * io_min_workers is exceeded. All others wait only for + * explicit wakeups caused by queue insertion, wakeup + * propagation, change of pool size (possibly making them + * highest), or GUC reload. + */ + if (pgaio_worker_can_timeout()) + timeout_ms = + TimestampDifferenceMilliseconds(now, + idle_timeout_abs); + else + timeout_ms = -1; + } + + if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, + timeout_ms, + WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT) + { + /* WL_TIMEOUT */ + if (pgaio_worker_can_timeout()) + if (GetCurrentTimestamp() >= idle_timeout_abs) + break; + } + else + { + /* WL_LATCH_SET */ + if (++wakeups == PGAIO_WORKER_STATS_MAX) + { + ios /= 2; + wakeups /= 2; + } + } ResetLatch(MyLatch); } CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + /* If io_max_workers has been decreased, exit highest first. */ + if (MyIoWorkerId >= io_max_workers) + break; + } } error_context_stack = errcallback.previous; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 930321905f1..067a3a1bb21 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -353,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +AioWorkerControl "Waiting to update AIO worker information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 60b12446a1c..bbb8855b12d 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] = }, { - {"io_workers", + {"io_max_workers", PGC_SIGHUP, RESOURCES_IO, - gettext_noop("Number of IO worker processes, for io_method=worker."), + gettext_noop("Maximum number of IO worker processes, for io_method=worker."), NULL, }, - &io_workers, - 3, 1, MAX_IO_WORKERS, + &io_max_workers, + 8, 1, MAX_IO_WORKERS, + NULL, NULL, NULL + }, + + { + {"io_min_workers", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Minimum number of IO worker processes, for io_method=worker."), + NULL, + }, + &io_min_workers, + 1, 1, MAX_IO_WORKERS, + NULL, NULL, NULL + }, + + { + {"io_worker_idle_timeout", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."), + NULL, + GUC_UNIT_MS + }, + &io_worker_idle_timeout, + 60 * 1000, -1, INT_MAX, + NULL, NULL, NULL + }, + + { + {"io_worker_launch_interval", + PGC_SIGHUP, + RESOURCES_IO, + gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."), + NULL, + GUC_UNIT_MS + }, + &io_worker_launch_interval, + 500, 0, INT_MAX, NULL, NULL, NULL }, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 34826d01380..4370f673821 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -214,7 +214,10 @@ # can execute simultaneously # -1 sets based on shared_buffers # (change requires restart) -#io_workers = 3 # 1-32; +#io_min_workers = 1 # 1-32; +#io_max_workers = 8 # 1-32; +#io_worker_idle_timeout = 60s # min 100ms +#io_worker_launch_interval = 500ms # min 0ms # - Worker Processes - diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h index 7bde7e89c8a..de9c80109e0 100644 --- a/src/include/storage/io_worker.h +++ b/src/include/storage/io_worker.h @@ -17,6 +17,13 @@ pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len); -extern PGDLLIMPORT int io_workers; +extern PGDLLIMPORT int io_min_workers; +extern PGDLLIMPORT int io_max_workers; +extern PGDLLIMPORT int io_worker_idle_timeout; +extern PGDLLIMPORT int io_worker_launch_interval; + +/* Interfaces visible to the postmaster. */ +extern bool pgaio_worker_test_new_worker_needed(void); +extern bool pgaio_worker_clear_new_worker_needed(void); #endif /* IO_WORKER_H */ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..c1801d08833 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, AioWorkerControl) diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index 67fa9ac06e1..10a967f6739 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -38,6 +38,7 @@ typedef enum PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ + PMSIGNAL_IO_WORKER_CHANGE, /* IO worker pool change */ PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */ diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl index af5fae15ea7..a0252857798 100644 --- a/src/test/modules/test_aio/t/002_io_workers.pl +++ b/src/test/modules/test_aio/t/002_io_workers.pl @@ -14,6 +14,9 @@ $node->init(); $node->append_conf( 'postgresql.conf', qq( io_method=worker +io_worker_idle_timeout=0ms +io_worker_launch_interval=0ms +io_max_workers=32 )); $node->start(); @@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic { my $node = shift; - my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers'); + my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers'); # Verify that worker count can't be set to 0 change_number_of_io_workers($node, 0, $prev_worker_count, 1); @@ -62,23 +65,23 @@ sub change_number_of_io_workers my ($result, $stdout, $stderr); ($result, $stdout, $stderr) = - $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count"); + $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count"); $node->safe_psql('postgres', 'SELECT pg_reload_conf()'); if ($expect_failure) { ok( $stderr =~ - /$worker_count is outside the valid range for parameter "io_workers"/, - "updating number of io_workers to $worker_count failed, as expected" + /$worker_count is outside the valid range for parameter "io_min_workers"/, + "updating number of io_min_workers to $worker_count failed, as expected" ); return $prev_worker_count; } else { - is( $node->safe_psql('postgres', 'SHOW io_workers'), + is( $node->safe_psql('postgres', 'SHOW io_min_workers'), $worker_count, - "updating number of io_workers from $prev_worker_count to $worker_count" + "updating number of io_min_workers from $prev_worker_count to $worker_count" ); check_io_worker_count($node, $worker_count); -- 2.39.5
From 43fea48f5f6e9b3301a0216f0402b2558862d632 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 5 Apr 2025 11:14:26 +1300 Subject: [PATCH 5/5] XXX read_buffer_loop select read_buffer_loop(n) with different values of n in each session to test latency of reading one block. --- src/test/modules/test_aio/test_aio--1.0.sql | 4 ++ src/test/modules/test_aio/test_aio.c | 59 +++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..c37b38afcb0 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -106,3 +106,7 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_reopen_detach() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION read_buffer_loop(block int) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index 1d776010ef4..2654302a13c 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -18,6 +18,8 @@ #include "postgres.h" +#include <math.h> + #include "access/relation.h" #include "fmgr.h" #include "storage/aio.h" @@ -27,6 +29,7 @@ #include "storage/checksum.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/read_stream.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" @@ -806,3 +809,59 @@ inj_io_reopen_detach(PG_FUNCTION_ARGS) #endif PG_RETURN_VOID(); } + +static BlockNumber +zero_callback(ReadStream *stream, void *user_data, void *pbd) +{ + return *(BlockNumber *) user_data; +} + +PG_FUNCTION_INFO_V1(read_buffer_loop); +Datum +read_buffer_loop(PG_FUNCTION_ARGS) +{ + BlockNumber block = PG_GETARG_UINT32(0); + Relation rel; + ReadStream *stream; + Buffer buffer; + TimestampTz start; + + rel = relation_open(TypeRelationId, AccessShareLock); + stream = read_stream_begin_relation(0, NULL, rel, MAIN_FORKNUM, zero_callback, &block, 0); + for (int loop = 0; loop < 10; loop++) + { + double samples[25000]; + double avg = 0; + double sum = 0; + double var = 0; + double dev; + double stddev; + + for (int i = 0; i < lengthof(samples); ++i) + { + bool flushed; + + start = GetCurrentTimestamp(); + buffer = read_stream_next_buffer(stream, NULL); + samples[i] = GetCurrentTimestamp() - start; + sum += samples[i]; + + ReleaseBuffer(buffer); + read_stream_reset(stream); + EvictUnpinnedBuffer(buffer, &flushed); + } + avg = sum / lengthof(samples); + for (int i = 0; i < lengthof(samples); i++) + { + dev = samples[i] - avg; + var += dev * dev; + } + stddev = sqrt(var / lengthof(samples)); + + elog(NOTICE, "n = %zu, avg = %.1fus, stddev = %.1f", lengthof(samples), avg, stddev); + } + read_stream_end(stream); + relation_close(rel, AccessShareLock); + + PG_RETURN_VOID(); +} -- 2.39.5