On Mon, 2025-02-03 at 12:45 -0800, Jeff Davis wrote:
> * The patch allows forcing the partition fan-out to one. I could
> imagine forcing it to a specific value, is there a way to do that?

I hit "send" too quickly and this caused test failures in CI. Attaching
v2.

Changes:

  * a new injection point to force spilling at 1000 tuples so that the
test is deterministic (required some minor refactoring in
hash_agg_check_limits())
  * added a branch to guard against a shift-by-32, which could not
happen in the code before, because the number of partitions was a
minimum of 4
  * minor refactor of hash_agg_set_limits() to avoid an implicit
assumption. This is not directly related, so I added it as a separate
patch.


Regards,
        Jeff Davis

From 513889206e048abc2493d2902787404f86ca45d7 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sun, 2 Feb 2025 05:26:58 -0800
Subject: [PATCH v2 1/2] Add injection points for hash aggregation.

---
 src/backend/executor/nodeAgg.c                | 62 ++++++++++++++---
 src/test/modules/injection_points/Makefile    |  2 +-
 .../injection_points/expected/hashagg.out     | 68 +++++++++++++++++++
 src/test/modules/injection_points/meson.build |  1 +
 .../modules/injection_points/sql/hashagg.sql  | 26 +++++++
 5 files changed, 150 insertions(+), 9 deletions(-)
 create mode 100644 src/test/modules/injection_points/expected/hashagg.out
 create mode 100644 src/test/modules/injection_points/sql/hashagg.sql

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 3005b5c0e3b..35cf18e5282 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -269,6 +269,7 @@
 #include "utils/datum.h"
 #include "utils/dynahash.h"
 #include "utils/expandeddatum.h"
+#include "utils/injection_point.h"
 #include "utils/logtape.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -1489,6 +1490,14 @@ build_hash_tables(AggState *aggstate)
 										   perhash->aggnode->numGroups,
 										   memory);
 
+#ifdef USE_INJECTION_POINTS
+		if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
+		{
+			nbuckets = memory / sizeof(TupleHashEntryData);
+			INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
+		}
+#endif
+
 		build_hash_table(aggstate, setno, nbuckets);
 	}
 
@@ -1860,17 +1869,36 @@ hash_agg_check_limits(AggState *aggstate)
 													 true);
 	Size		hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
 														true);
+	bool		do_spill = false;
 
 	/*
-	 * Don't spill unless there's at least one group in the hash table so we
-	 * can be sure to make progress even in edge cases.
+	 * Spill if exceeding memory limit. Don't spill unless there's at least
+	 * one group in the hash table so we can be sure to make progress even in
+	 * edge cases.
 	 */
-	if (aggstate->hash_ngroups_current > 0 &&
-		(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
-		 ngroups > aggstate->hash_ngroups_limit))
+	if (ngroups > 0 && (meta_mem + hashkey_mem > aggstate->hash_mem_limit))
+		do_spill = true;
+
+	/*
+	 * Spill if exceeding ngroups limit. This is important if transition
+	 * values may grow, e.g. for ARRAY_AGG().
+	 */
+	if (ngroups > aggstate->hash_ngroups_limit)
+		do_spill = true;
+
+#ifdef USE_INJECTION_POINTS
+	if (ngroups > 1000)
 	{
-		hash_agg_enter_spill_mode(aggstate);
+		if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
+		{
+			do_spill = true;
+			INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
+		}
 	}
+#endif
+
+	if (do_spill)
+		hash_agg_enter_spill_mode(aggstate);
 }
 
 /*
@@ -1881,6 +1909,7 @@ hash_agg_check_limits(AggState *aggstate)
 static void
 hash_agg_enter_spill_mode(AggState *aggstate)
 {
+	INJECTION_POINT("hash-aggregate-enter-spill-mode");
 	aggstate->hash_spill_mode = true;
 	hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
 
@@ -2652,6 +2681,7 @@ agg_refill_hash_table(AggState *aggstate)
 	 */
 	hashagg_recompile_expressions(aggstate, true, true);
 
+	INJECTION_POINT("hash-aggregate-process-batch");
 	for (;;)
 	{
 		TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
@@ -2900,6 +2930,15 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
 	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
 											 used_bits, &partition_bits);
 
+#ifdef USE_INJECTION_POINTS
+	if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
+	{
+		npartitions = 1;
+		partition_bits = 0;
+		INJECTION_POINT_CACHED("hash-aggregate-single-partition");
+	}
+#endif
+
 	spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
 	spill->ntuples = palloc0(sizeof(int64) * npartitions);
 	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
@@ -2908,7 +2947,10 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
 		spill->partitions[i] = LogicalTapeCreate(tapeset);
 
 	spill->shift = 32 - used_bits - partition_bits;
-	spill->mask = (npartitions - 1) << spill->shift;
+	if (spill->shift < 32)
+		spill->mask = (npartitions - 1) << spill->shift;
+	else
+		spill->mask = 0;
 	spill->npartitions = npartitions;
 
 	for (int i = 0; i < npartitions; i++)
@@ -2957,7 +2999,11 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 
 	tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
 
-	partition = (hash & spill->mask) >> spill->shift;
+	if (spill->shift < 32)
+		partition = (hash & spill->mask) >> spill->shift;
+	else
+		partition = 0;
+
 	spill->ntuples[partition]++;
 
 	/*
diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile
index 4f0161fd33a..e680991f8d4 100644
--- a/src/test/modules/injection_points/Makefile
+++ b/src/test/modules/injection_points/Makefile
@@ -11,7 +11,7 @@ EXTENSION = injection_points
 DATA = injection_points--1.0.sql
 PGFILEDESC = "injection_points - facility for injection points"
 
-REGRESS = injection_points reindex_conc
+REGRESS = injection_points hashagg reindex_conc
 REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
 
 ISOLATION = basic inplace syscache-update-pruned
diff --git a/src/test/modules/injection_points/expected/hashagg.out b/src/test/modules/injection_points/expected/hashagg.out
new file mode 100644
index 00000000000..cc4247af97d
--- /dev/null
+++ b/src/test/modules/injection_points/expected/hashagg.out
@@ -0,0 +1,68 @@
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+SELECT injection_points_set_local();
+ injection_points_set_local 
+----------------------------
+ 
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+ count 
+-------
+  5100
+(1 row)
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;
diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build
index 259045e5c2d..d61149712fd 100644
--- a/src/test/modules/injection_points/meson.build
+++ b/src/test/modules/injection_points/meson.build
@@ -35,6 +35,7 @@ tests += {
   'regress': {
     'sql': [
       'injection_points',
+      'hashagg',
       'reindex_conc',
     ],
     'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
diff --git a/src/test/modules/injection_points/sql/hashagg.sql b/src/test/modules/injection_points/sql/hashagg.sql
new file mode 100644
index 00000000000..51d814623fc
--- /dev/null
+++ b/src/test/modules/injection_points/sql/hashagg.sql
@@ -0,0 +1,26 @@
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+
+SELECT injection_points_set_local();
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;
-- 
2.34.1

From c1a619500a41dee654868efe480b0b8a7c67dc4f Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Mon, 3 Feb 2025 15:13:03 -0800
Subject: [PATCH v2 2/2] Minor refactor of hash_agg_set_limits().

Avoid implicit assumption that input_groups is greater than or equal
to one. The assumption appears to be true, but it's easier to read
without it.

Also, a branch that avoided underflow while calculating a max
was hard to read; replace with ssize_t types and a Max().
---
 src/backend/executor/nodeAgg.c | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 35cf18e5282..fb467ec9877 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1809,8 +1809,8 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
 					int *num_partitions)
 {
 	int			npartitions;
-	Size		partition_mem;
-	Size		hash_mem_limit = get_hash_memory_limit();
+	ssize_t		partition_mem;
+	ssize_t		hash_mem_limit = get_hash_memory_limit();
 
 	/* if not expected to spill, use all of hash_mem */
 	if (input_groups * hashentrysize <= hash_mem_limit)
@@ -1818,7 +1818,7 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
 		if (num_partitions != NULL)
 			*num_partitions = 0;
 		*mem_limit = hash_mem_limit;
-		*ngroups_limit = hash_mem_limit / hashentrysize;
+		*ngroups_limit = Max(*mem_limit / hashentrysize, 1);
 		return;
 	}
 
@@ -1841,17 +1841,11 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
 	/*
 	 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
 	 * minimum number of partitions, so we aren't going to dramatically exceed
-	 * work mem anyway.
+	 * work mem anyway. Use ssize_t to avoid underflow during subtraction.
 	 */
-	if (hash_mem_limit > 4 * partition_mem)
-		*mem_limit = hash_mem_limit - partition_mem;
-	else
-		*mem_limit = hash_mem_limit * 0.75;
+	*mem_limit = Max(hash_mem_limit - partition_mem, hash_mem_limit * 0.75);
 
-	if (*mem_limit > hashentrysize)
-		*ngroups_limit = *mem_limit / hashentrysize;
-	else
-		*ngroups_limit = 1;
+	*ngroups_limit = Max(*mem_limit / hashentrysize, 1);
 }
 
 /*
-- 
2.34.1

Reply via email to