Hi Tomas!
I've finally got time again to work on PostgreSQL.
On 03.11.2023 21:48, Tomas Vondra wrote:
> On 2/22/23 13:22, Tomas Vondra wrote:
>> ...
>>
>>>> No opinion on these options, but worth a try. Alternatively, we could
>>>> try the usual doubling approach - start with a low threshold (and set
>>>> the latch frequently), and then gradually increase it up to the 1/4.
>>>>
>>>> That should work both for queries expecting only few rows and those
>>>> producing a lot of data.
>>> I was thinking about this variant as well. One more alternative would be
>>> latching the leader once a worker has produced 1/Nth of the LIMIT where
>>> N is the number of workers. Both variants have the disadvantage that
>>> there are still corner cases where the latch is set too late; but it
>>> would for sure be much better than what we have today.
Or always latching when a LIMIT is present. When a LIMIT is present,
it's much more likely that the latency hurts than that it doesn't.
>>> I also did some profiling and - at least on my development laptop with 8
>>> physical cores - the original example, motivating the batching change is
>>> slower than when it's disabled by commenting out:
>>>
>>> if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
>>>
>>> SET parallel_tuple_cost TO 0;
>>> CREATE TABLE b (a int);
>>> INSERT INTO b SELECT generate_series(1, 200000000);
>>> ANALYZE b;
>>> EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;
>>>
>>> Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
>>> rows=200000000 loops=1)
>>> Workers Planned: 7
>>> Workers Launched: 7
>>> -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061
>>> width=4) (actual rows=25000000 loops=8)
>>>
>>> Always latch: 19055 ms
>>> Batching: 19575 ms
>>>
>>> If I find some time, I'll play around a bit more and maybe propose a patch.
I've also remeasured the shared memory latching with and without the
1/4th check using the original example from [1]. Apart from the code
line mentioned by you, I also commented out the check on the consumer side:
if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
On my dev laptop (i9-13950HX) the runtimes are pretty much the same with
8 workers (16-17 seconds with some variance). It would be great to
understand when this truly helps, if at all, to see if we need some
smartness to latch the consumer or if we can just remove the 1/4th check.
If this turns out to be more involved we could also move this discussion
into a separate thread and have this thread focus on stopping the
parallel workers early, see below.
>>
>> OK. Once you have a WIP patch maybe share it and I'll try to do some
>> profiling too.
>>
>>>>> ...
>>>>>
>>>>> We would need something similar to CHECK_FOR_INTERRUPTS() which returns
>>>>> a NULL slot if a parallel worker is supposed to stop execution (we could
>>>>> e.g. check if the queue got detached). Or could we amend
>>>>> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
>>>>> got detached?
>>>>>
>>>> That sounds reasonable, but I'm not very familiar the leader-worker
>>>> communication, so no opinion on how it should be done.
>>>
>>> I think an extra macro that needs to be called from dozens of places to
>>> check if parallel execution is supposed to end is the least preferred
>>> approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
>>> we cannot actively signal the workers that they should stop.
>>>
>>
>> IMHO if this requires adding another macro to a bunch of ad hoc places
>> is rather inconvenient. It'd be much better to fix this in a localized
>> manner (especially as it seems related to a fairly specific place).
I've written up a draft patch that instructs workers to stop, once the
leader has gotten enough rows according to the LIMIT clause. I'm using
SendProcSignal() to inform the workers to take action and stop executing
ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see
a good way of doing this differently which is not much more intrusive.
The patch is incomplete (comments, support for Gather Merge, more
testing, etc.) but I'm mostly interested at this point if the overall
approach is deemed fine.
I first tried to use TerminateBackgroundWorker() but postmaster.c then
logs the worker termination and also some of the cleanup code needed for
proper instrumentation doesn't run any longer in the parallel workers.
With the patch applied, the query from the first mail of this thread
runs in a few milliseconds. That it still takes that long is because
forking, plan (de-)serialization and remaining initialization are fairly
heavy weight. With threads, the "fork" time would already much lower and
no (de-)serialization would be necessary. In the process-based
architecture it would be interesting to think about adding a parallel
worker pool.
>
> David, do you still plan to try fixing these issues? I have a feeling
> those issues may be fairly common but often undetected, or just brushed
> of as "slow query" (AFAICS it was only caught thanks to comparing
> timings before/after upgrade). Would be great to improve this.
I completely agree. And while they look like corner cases, if the
workload is diverse enough they will be encountered (both findings are
from the field). If it's then a query that runs frequently enough it
causes a real issue that is hard to be diagnosed by the DBA.
[1]
https://www.postgresql.org/message-id/flat/CAFiTN-tVXqn_OG7tHNeSkBbN%2BiiCZTiQ83uakax43y1sQb2OBA%40mail.gmail.com
--
David Geier
From 9553f1ec5885c9e2a22a0eafa4f4b489f13d22ae Mon Sep 17 00:00:00 2001
From: David Geier <geidav...@gmail.com>
Date: Tue, 2 Sep 2025 13:24:45 +0200
Subject: [PATCH] Parallel workers stop quicker
---
src/backend/executor/execParallel.c | 51 ++++++++++++++++++++++++----
src/backend/executor/nodeGather.c | 36 ++++++++++++++++++++
src/backend/storage/ipc/procsignal.c | 4 +++
src/backend/tcop/postgres.c | 4 +++
src/include/executor/execParallel.h | 5 +++
src/include/nodes/execnodes.h | 1 +
src/include/storage/procsignal.h | 1 +
7 files changed, 95 insertions(+), 7 deletions(-)
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..3b0993fd9d6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1410,6 +1410,32 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
pwcxt);
}
+volatile sig_atomic_t ParallelStopPending = false;
+sigjmp_buf * parallel_sigjmp_buf = NULL;
+volatile bool got_stopped = false;
+
+void HandleParallelStop(void)
+{
+ InterruptPending = true;
+ ParallelStopPending = true;
+ SetLatch(MyLatch);
+}
+
+void ProcessParallelStop(void)
+{
+ ParallelStopPending = false;
+ got_stopped = true;
+
+ /*
+ * Only allow siglongjmp if we are executing the plan.
+ * Otherwise, we might jump back right after ExecutePlan() even
+ * though we are not yet executing the plan or we're already done.
+ */
+ if (parallel_sigjmp_buf != NULL)
+ siglongjmp(*parallel_sigjmp_buf, 1);
+}
+
+
/*
* Main entrypoint for parallel query worker processes.
*
@@ -1440,6 +1466,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void *area_space;
dsa_area *area;
ParallelWorkerContext pwcxt;
+ sigjmp_buf local_sigjmp_buf;
/* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
@@ -1492,13 +1519,23 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
*/
InstrStartParallelQuery();
- /*
- * Run the plan. If we specified a tuple bound, be careful not to demand
- * more tuples than that.
- */
- ExecutorRun(queryDesc,
- ForwardScanDirection,
- fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+ if (!got_stopped)
+ {
+ if (sigsetjmp(local_sigjmp_buf, 1) == 0)
+ {
+ parallel_sigjmp_buf = &local_sigjmp_buf;
+
+ /*
+ * Run the plan. If we specified a tuple bound, be careful not to demand
+ * more tuples than that.
+ */
+ ExecutorRun(queryDesc,
+ ForwardScanDirection,
+ fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+ }
+ }
+
+ parallel_sigjmp_buf = NULL;
/* Shut down the executor */
ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index dc7d1830259..31745e86ced 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
+#include "storage/procsignal.h"
#include "utils/wait_event.h"
@@ -71,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->need_to_scan_locally =
!node->single_copy && parallel_leader_participation;
gatherstate->tuples_needed = -1;
+ gatherstate->tuples_produced = 0;
/*
* Miscellaneous initialization
@@ -126,6 +128,36 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
return gatherstate;
}
+/* ----------------------------------------------------------------
+ * Workers only stop when they themselves reach the LIMIT.
+ * They don't stop if other workers in total produced already
+ * enough rows to reach the LIMIT. Hence, we need to stop them
+ * explicitly.
+ * ----------------------------------------------------------------
+ */
+static void
+StopWorkersIfLimitReached(GatherState *node)
+{
+ if (node->tuples_needed != -1 && node->tuples_produced == node->tuples_needed)
+ {
+ if (node->pei != NULL)
+ {
+ ParallelContext *pcxt = node->pei->pcxt;
+ int i;
+
+ if (pcxt->worker != NULL)
+ {
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ pid_t pid;
+ GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+ SendProcSignal(pid, PROCSIG_PARALLEL_STOP, INVALID_PROC_NUMBER);
+ }
+ }
+ }
+ }
+}
+
/* ----------------------------------------------------------------
* ExecGather(node)
*
@@ -212,6 +244,7 @@ ExecGather(PlanState *pstate)
/* Run plan locally if no workers or enabled and not single-copy. */
node->need_to_scan_locally = (node->nreaders == 0)
|| (!gather->single_copy && parallel_leader_participation);
+ node->tuples_produced = 0;
node->initialized = true;
}
@@ -230,6 +263,9 @@ ExecGather(PlanState *pstate)
if (TupIsNull(slot))
return NULL;
+ node->tuples_produced++;
+ StopWorkersIfLimitReached(node);
+
/* If no projection is required, we're done. */
if (node->ps.ps_ProjInfo == NULL)
return slot;
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..8f99ecebe2f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
#include "access/parallel.h"
#include "commands/async.h"
+#include "executor/execParallel.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
HandleParallelApplyMessageInterrupt();
+ if (CheckProcSignal(PROCSIG_PARALLEL_STOP))
+ HandleParallelStop();
+
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0cecd464902..5b320e52b94 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -39,6 +39,7 @@
#include "commands/event_trigger.h"
#include "commands/prepare.h"
#include "common/pg_prng.h"
+#include "executor/execParallel.h"
#include "jit/jit.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -3536,6 +3537,9 @@ ProcessInterrupts(void)
if (ParallelApplyMessagePending)
ProcessParallelApplyMessages();
+
+ if (ParallelStopPending)
+ ProcessParallelStop();
}
/*
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5e7106c397a..9e0be350694 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -37,6 +37,11 @@ typedef struct ParallelExecutorInfo
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo;
+extern PGDLLIMPORT volatile sig_atomic_t ParallelStopPending;
+
+extern void HandleParallelStop(void);
+extern void ProcessParallelStop(void);
+
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, Bitmapset *sendParams, int nworkers,
int64 tuples_needed);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index de782014b2d..63962aebcd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2740,6 +2740,7 @@ typedef struct GatherState
bool initialized; /* workers launched? */
bool need_to_scan_locally; /* need to read from local plan? */
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+ int64 tuples_produced; /* tuples already produced */
/* these fields are set up once: */
TupleTableSlot *funnel_slot;
struct ParallelExecutorInfo *pei;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..f7f4ee85154 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
PROCSIG_BARRIER, /* global barrier interrupt */
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+ PROCSIG_PARALLEL_STOP, /* Instruct parallel worker to stop */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_FIRST,
--
2.43.0