On Mon, 2025-02-03 at 12:45 -0800, Jeff Davis wrote: > * The patch allows forcing the partition fan-out to one. I could > imagine forcing it to a specific value, is there a way to do that?
I hit "send" too quickly and this caused test failures in CI. Attaching v2. Changes: * a new injection point to force spilling at 1000 tuples so that the test is deterministic (required some minor refactoring in hash_agg_check_limits()) * added a branch to guard against a shift-by-32, which could not happen in the code before, because the number of partitions was a minimum of 4 * minor refactor of hash_agg_set_limits() to avoid an implicit assumption. This is not directly related, so I added it as a separate patch. Regards, Jeff Davis
From 513889206e048abc2493d2902787404f86ca45d7 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sun, 2 Feb 2025 05:26:58 -0800 Subject: [PATCH v2 1/2] Add injection points for hash aggregation. --- src/backend/executor/nodeAgg.c | 62 ++++++++++++++--- src/test/modules/injection_points/Makefile | 2 +- .../injection_points/expected/hashagg.out | 68 +++++++++++++++++++ src/test/modules/injection_points/meson.build | 1 + .../modules/injection_points/sql/hashagg.sql | 26 +++++++ 5 files changed, 150 insertions(+), 9 deletions(-) create mode 100644 src/test/modules/injection_points/expected/hashagg.out create mode 100644 src/test/modules/injection_points/sql/hashagg.sql diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 3005b5c0e3b..35cf18e5282 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -269,6 +269,7 @@ #include "utils/datum.h" #include "utils/dynahash.h" #include "utils/expandeddatum.h" +#include "utils/injection_point.h" #include "utils/logtape.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1489,6 +1490,14 @@ build_hash_tables(AggState *aggstate) perhash->aggnode->numGroups, memory); +#ifdef USE_INJECTION_POINTS + if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table")) + { + nbuckets = memory / sizeof(TupleHashEntryData); + INJECTION_POINT_CACHED("hash-aggregate-oversize-table"); + } +#endif + build_hash_table(aggstate, setno, nbuckets); } @@ -1860,17 +1869,36 @@ hash_agg_check_limits(AggState *aggstate) true); Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true); + bool do_spill = false; /* - * Don't spill unless there's at least one group in the hash table so we - * can be sure to make progress even in edge cases. + * Spill if exceeding memory limit. Don't spill unless there's at least + * one group in the hash table so we can be sure to make progress even in + * edge cases. */ - if (aggstate->hash_ngroups_current > 0 && - (meta_mem + hashkey_mem > aggstate->hash_mem_limit || - ngroups > aggstate->hash_ngroups_limit)) + if (ngroups > 0 && (meta_mem + hashkey_mem > aggstate->hash_mem_limit)) + do_spill = true; + + /* + * Spill if exceeding ngroups limit. This is important if transition + * values may grow, e.g. for ARRAY_AGG(). + */ + if (ngroups > aggstate->hash_ngroups_limit) + do_spill = true; + +#ifdef USE_INJECTION_POINTS + if (ngroups > 1000) { - hash_agg_enter_spill_mode(aggstate); + if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000")) + { + do_spill = true; + INJECTION_POINT_CACHED("hash-aggregate-spill-1000"); + } } +#endif + + if (do_spill) + hash_agg_enter_spill_mode(aggstate); } /* @@ -1881,6 +1909,7 @@ hash_agg_check_limits(AggState *aggstate) static void hash_agg_enter_spill_mode(AggState *aggstate) { + INJECTION_POINT("hash-aggregate-enter-spill-mode"); aggstate->hash_spill_mode = true; hashagg_recompile_expressions(aggstate, aggstate->table_filled, true); @@ -2652,6 +2681,7 @@ agg_refill_hash_table(AggState *aggstate) */ hashagg_recompile_expressions(aggstate, true, true); + INJECTION_POINT("hash-aggregate-process-batch"); for (;;) { TupleTableSlot *spillslot = aggstate->hash_spill_rslot; @@ -2900,6 +2930,15 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, npartitions = hash_choose_num_partitions(input_groups, hashentrysize, used_bits, &partition_bits); +#ifdef USE_INJECTION_POINTS + if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition")) + { + npartitions = 1; + partition_bits = 0; + INJECTION_POINT_CACHED("hash-aggregate-single-partition"); + } +#endif + spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions); spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); @@ -2908,7 +2947,10 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, spill->partitions[i] = LogicalTapeCreate(tapeset); spill->shift = 32 - used_bits - partition_bits; - spill->mask = (npartitions - 1) << spill->shift; + if (spill->shift < 32) + spill->mask = (npartitions - 1) << spill->shift; + else + spill->mask = 0; spill->npartitions = npartitions; for (int i = 0; i < npartitions; i++) @@ -2957,7 +2999,11 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree); - partition = (hash & spill->mask) >> spill->shift; + if (spill->shift < 32) + partition = (hash & spill->mask) >> spill->shift; + else + partition = 0; + spill->ntuples[partition]++; /* diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile index 4f0161fd33a..e680991f8d4 100644 --- a/src/test/modules/injection_points/Makefile +++ b/src/test/modules/injection_points/Makefile @@ -11,7 +11,7 @@ EXTENSION = injection_points DATA = injection_points--1.0.sql PGFILEDESC = "injection_points - facility for injection points" -REGRESS = injection_points reindex_conc +REGRESS = injection_points hashagg reindex_conc REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress ISOLATION = basic inplace syscache-update-pruned diff --git a/src/test/modules/injection_points/expected/hashagg.out b/src/test/modules/injection_points/expected/hashagg.out new file mode 100644 index 00000000000..cc4247af97d --- /dev/null +++ b/src/test/modules/injection_points/expected/hashagg.out @@ -0,0 +1,68 @@ +-- Test for hash aggregation +CREATE EXTENSION injection_points; +SELECT injection_points_set_local(); + injection_points_set_local +---------------------------- + +(1 row) + +SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice'); + injection_points_attach +------------------------- + +(1 row) + +SELECT injection_points_attach('hash-aggregate-process-batch', 'notice'); + injection_points_attach +------------------------- + +(1 row) + +-- force partition fan-out to 1 +SELECT injection_points_attach('hash-aggregate-single-partition', 'notice'); + injection_points_attach +------------------------- + +(1 row) + +-- force spilling after 1000 groups +SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice'); + injection_points_attach +------------------------- + +(1 row) + +CREATE TABLE hashagg_ij(x INTEGER); +INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g; +SET max_parallel_workers=0; +SET max_parallel_workers_per_gather=0; +SET enable_sort=FALSE; +SET work_mem='4MB'; +SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s; +NOTICE: notice triggered for injection point hash-aggregate-spill-1000 +NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode +NOTICE: notice triggered for injection point hash-aggregate-single-partition +NOTICE: notice triggered for injection point hash-aggregate-process-batch +NOTICE: notice triggered for injection point hash-aggregate-spill-1000 +NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode +NOTICE: notice triggered for injection point hash-aggregate-single-partition +NOTICE: notice triggered for injection point hash-aggregate-process-batch +NOTICE: notice triggered for injection point hash-aggregate-spill-1000 +NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode +NOTICE: notice triggered for injection point hash-aggregate-single-partition +NOTICE: notice triggered for injection point hash-aggregate-process-batch +NOTICE: notice triggered for injection point hash-aggregate-spill-1000 +NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode +NOTICE: notice triggered for injection point hash-aggregate-single-partition +NOTICE: notice triggered for injection point hash-aggregate-process-batch +NOTICE: notice triggered for injection point hash-aggregate-spill-1000 +NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode +NOTICE: notice triggered for injection point hash-aggregate-single-partition +NOTICE: notice triggered for injection point hash-aggregate-process-batch + count +------- + 5100 +(1 row) + +DROP TABLE hashagg_ij; +DROP EXTENSION injection_points; diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build index 259045e5c2d..d61149712fd 100644 --- a/src/test/modules/injection_points/meson.build +++ b/src/test/modules/injection_points/meson.build @@ -35,6 +35,7 @@ tests += { 'regress': { 'sql': [ 'injection_points', + 'hashagg', 'reindex_conc', ], 'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'], diff --git a/src/test/modules/injection_points/sql/hashagg.sql b/src/test/modules/injection_points/sql/hashagg.sql new file mode 100644 index 00000000000..51d814623fc --- /dev/null +++ b/src/test/modules/injection_points/sql/hashagg.sql @@ -0,0 +1,26 @@ +-- Test for hash aggregation +CREATE EXTENSION injection_points; + +SELECT injection_points_set_local(); + +SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice'); +SELECT injection_points_attach('hash-aggregate-process-batch', 'notice'); + +-- force partition fan-out to 1 +SELECT injection_points_attach('hash-aggregate-single-partition', 'notice'); + +-- force spilling after 1000 groups +SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice'); + +CREATE TABLE hashagg_ij(x INTEGER); +INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g; + +SET max_parallel_workers=0; +SET max_parallel_workers_per_gather=0; +SET enable_sort=FALSE; +SET work_mem='4MB'; + +SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s; + +DROP TABLE hashagg_ij; +DROP EXTENSION injection_points; -- 2.34.1
From c1a619500a41dee654868efe480b0b8a7c67dc4f Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Mon, 3 Feb 2025 15:13:03 -0800 Subject: [PATCH v2 2/2] Minor refactor of hash_agg_set_limits(). Avoid implicit assumption that input_groups is greater than or equal to one. The assumption appears to be true, but it's easier to read without it. Also, a branch that avoided underflow while calculating a max was hard to read; replace with ssize_t types and a Max(). --- src/backend/executor/nodeAgg.c | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 35cf18e5282..fb467ec9877 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -1809,8 +1809,8 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, int *num_partitions) { int npartitions; - Size partition_mem; - Size hash_mem_limit = get_hash_memory_limit(); + ssize_t partition_mem; + ssize_t hash_mem_limit = get_hash_memory_limit(); /* if not expected to spill, use all of hash_mem */ if (input_groups * hashentrysize <= hash_mem_limit) @@ -1818,7 +1818,7 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, if (num_partitions != NULL) *num_partitions = 0; *mem_limit = hash_mem_limit; - *ngroups_limit = hash_mem_limit / hashentrysize; + *ngroups_limit = Max(*mem_limit / hashentrysize, 1); return; } @@ -1841,17 +1841,11 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, /* * Don't set the limit below 3/4 of hash_mem. In that case, we are at the * minimum number of partitions, so we aren't going to dramatically exceed - * work mem anyway. + * work mem anyway. Use ssize_t to avoid underflow during subtraction. */ - if (hash_mem_limit > 4 * partition_mem) - *mem_limit = hash_mem_limit - partition_mem; - else - *mem_limit = hash_mem_limit * 0.75; + *mem_limit = Max(hash_mem_limit - partition_mem, hash_mem_limit * 0.75); - if (*mem_limit > hashentrysize) - *ngroups_limit = *mem_limit / hashentrysize; - else - *ngroups_limit = 1; + *ngroups_limit = Max(*mem_limit / hashentrysize, 1); } /* -- 2.34.1