On Thu, 2020-03-12 at 16:01 -0500, Justin Pryzby wrote:
> I don't understand what's meant by "the chosen plan".
> Should it say, "at execution ..." instead of "execution time" ?

I removed that wording; hopefully it's more clear without it?

> Either remove "plan types" for consistency with
> enable_groupingsets_hash_disk,
> Or add it there.  Maybe it should say "when the memory usage would
> OTHERWISE BE
> expected to exceed.."

I added "plan types".

I don't think "otherwise be..." would quite work there. "Otherwise"
sounds to me like it's referring to another plan type (e.g.
Sort+GroupAgg), and that doesn't fit.

It's probably best to leave that level of detail out of the docs. I
think the main use case for enable_hashagg_disk is for users who
experience some plan changes and want the old behavior which favors
Sort when there are a lot of groups.

> +show_hashagg_info(AggState *aggstate, ExplainState *es)
> +{
> +     Agg             *agg       = (Agg *)aggstate->ss.ps.plan;
> +     long     memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
> 
> I see this partially duplicates my patch [0] to show memory stats for

...

> You added hash_mem_peak and hash_batches_used to struct AggState.
> In my 0001 patch, I added instrumentation to struct TupleHashTable

I replied in that thread and I'm not sure that tracking the memory in
the TupleHashTable is the right approach. The group keys and the
transition state data can't be estimated easily that way. Perhaps we
can do that if the THT owns the memory contexts (and can call
MemoryContextMemAllocated()), rather than using passed-in ones, but
that might require more discussion. (I'm open to that idea, by the
way.)

Also, my patch also considers the buffer space, so would that be a
third memory number?

For now, I think I'll leave the way I report it in a simpler form and
we can change it later as we sort out these details. That leaves mine
specific to HashAgg, but we can always refactor it later.

I did change my code to put the metacontext in a child context of its
own so that I could call MemoryContextMemAllocated() on it to include
it in the memory total, and that will make reporting it separately
easier when we want to do so.

> +     if (from_tape)
> +             partition_mem += HASHAGG_READ_BUFFER_SIZE;
> +     partition_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
> 
> => That looks wrong ; should say += ?

Good catch! Fixed.

Regards,
        Jeff Davis

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3cac340f323..9f7f7736665 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4462,6 +4462,23 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-groupingsets-hash-disk" xreflabel="enable_groupingsets_hash_disk">
+      <term><varname>enable_groupingsets_hash_disk</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_groupingsets_hash_disk</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of hashed aggregation plan
+        types for grouping sets when the total size of the hash tables is
+        expected to exceed <varname>work_mem</varname>.  See <xref
+        linkend="queries-grouping-sets"/>.  The default is
+        <literal>off</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-hashagg" xreflabel="enable_hashagg">
       <term><varname>enable_hashagg</varname> (<type>boolean</type>)
       <indexterm>
@@ -4476,6 +4493,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-hashagg-disk" xreflabel="enable_hashagg_disk">
+      <term><varname>enable_hashagg_disk</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_hashagg_disk</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of hashed aggregation plan
+        types when the memory usage is expected to exceed
+        <varname>work_mem</varname>.  The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-hashjoin" xreflabel="enable_hashjoin">
       <term><varname>enable_hashjoin</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index d901dc4a50e..58141d8393c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -104,6 +104,7 @@ static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
 							 List *ancestors, ExplainState *es);
 static void show_sort_info(SortState *sortstate, ExplainState *es);
 static void show_hash_info(HashState *hashstate, ExplainState *es);
+static void show_hashagg_info(AggState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 								ExplainState *es);
 static void show_instrumentation_count(const char *qlabel, int which,
@@ -1882,6 +1883,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Agg:
 			show_agg_keys(castNode(AggState, planstate), ancestors, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
+			show_hashagg_info((AggState *) planstate, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
 										   planstate, es);
@@ -2769,6 +2771,41 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 	}
 }
 
+/*
+ * Show information on hash aggregate memory usage and batches.
+ */
+static void
+show_hashagg_info(AggState *aggstate, ExplainState *es)
+{
+	Agg		*agg	   = (Agg *)aggstate->ss.ps.plan;
+	long	 memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
+
+	Assert(IsA(aggstate, AggState));
+
+	if (agg->aggstrategy != AGG_HASHED &&
+		agg->aggstrategy != AGG_MIXED)
+		return;
+
+	if (es->costs && aggstate->hash_planned_partitions > 0)
+	{
+		ExplainPropertyInteger("Planned Partitions", NULL,
+							   aggstate->hash_planned_partitions, es);
+	}
+
+	if (!es->analyze)
+		return;
+
+	/* EXPLAIN ANALYZE */
+	ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
+	if (aggstate->hash_batches_used > 0)
+	{
+		ExplainPropertyInteger("Disk Usage", "kB",
+							   aggstate->hash_disk_used, es);
+		ExplainPropertyInteger("HashAgg Batches", NULL,
+							   aggstate->hash_batches_used, es);
+	}
+}
+
 /*
  * If it's EXPLAIN ANALYZE, show exact/lossy pages for a BitmapHeapScan node
  */
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 7aebb247d88..2d8efb7731a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -194,6 +194,29 @@
  *	  transition values.  hashcontext is the single context created to support
  *	  all hash tables.
  *
+ *	  Spilling To Disk
+ *
+ *	  When performing hash aggregation, if the hash table memory exceeds the
+ *	  limit (see hash_agg_check_limits()), we enter "spill mode". In spill
+ *	  mode, we advance the transition states only for groups already in the
+ *	  hash table. For tuples that would need to create a new hash table
+ *	  entries (and initialize new transition states), we instead spill them to
+ *	  disk to be processed later. The tuples are spilled in a partitioned
+ *	  manner, so that subsequent batches are smaller and less likely to exceed
+ *	  work_mem (if a batch does exceed work_mem, it must be spilled
+ *	  recursively).
+ *
+ *	  Spilled data is written to logical tapes. These provide better control
+ *	  over memory usage, disk space, and the number of files than if we were
+ *	  to use a BufFile for each spill.
+ *
+ *	  Note that it's possible for transition states to start small but then
+ *	  grow very large; for instance in the case of ARRAY_AGG. In such cases,
+ *	  it's still possible to significantly exceed work_mem. We try to avoid
+ *	  this situation by estimating what will fit in the available memory, and
+ *	  imposing a limit on the number of groups separately from the amount of
+ *	  memory consumed.
+ *
  *    Transition / Combine function invocation:
  *
  *    For performance reasons transition functions, including combine
@@ -233,12 +256,101 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
+#include "utils/dynahash.h"
 #include "utils/expandeddatum.h"
+#include "utils/logtape.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/tuplesort.h"
 
+/*
+ * Control how many partitions are created when spilling HashAgg to
+ * disk.
+ *
+ * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
+ * partitions needed such that each partition will fit in memory. The factor
+ * is set higher than one because there's not a high cost to having a few too
+ * many partitions, and it makes it less likely that a partition will need to
+ * be spilled recursively. Another benefit of having more, smaller partitions
+ * is that small hash tables may perform better than large ones due to memory
+ * caching effects.
+ *
+ * We also specify a min and max number of partitions per spill. Too few might
+ * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
+ * many will result in lots of memory wasted buffering the spill files (which
+ * could instead be spent on a larger hash table).
+ *
+ * For reading from tapes, the buffer size must be a multiple of
+ * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
+ * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
+ * tape always uses a buffer of size BLCKSZ.
+ */
+#define HASHAGG_PARTITION_FACTOR 1.50
+#define HASHAGG_MIN_PARTITIONS 4
+#define HASHAGG_MAX_PARTITIONS 256
+#define HASHAGG_MIN_BUCKETS 256
+#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
+#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
+
+/*
+ * Track all tapes needed for a HashAgg that spills. We don't know the maximum
+ * number of tapes needed at the start of the algorithm (because it can
+ * recurse), so one tape set is allocated and extended as needed for new
+ * tapes. When a particular tape is already read, rewind it for write mode and
+ * put it in the free list.
+ *
+ * Tapes' buffers can take up substantial memory when many tapes are open at
+ * once. We only need one tape open at a time in read mode (using a buffer
+ * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ * requiring a buffer of size BLCKSZ) for each partition.
+ */
+typedef struct HashTapeInfo
+{
+	LogicalTapeSet	*tapeset;
+	int				 ntapes;
+	int				*freetapes;
+	int				 nfreetapes;
+	int				 freetapes_alloc;
+} HashTapeInfo;
+
+/*
+ * Represents partitioned spill data for a single hashtable. Contains the
+ * necessary information to route tuples to the correct partition, and to
+ * transform the spilled data into new batches.
+ *
+ * The high bits are used for partition selection (when recursing, we ignore
+ * the bits that have already been used for partition selection at an earlier
+ * level).
+ */
+typedef struct HashAggSpill
+{
+	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
+	int		 npartitions;		/* number of partitions */
+	int		*partitions;		/* spill partition tape numbers */
+	int64   *ntuples;			/* number of tuples in each partition */
+	uint32   mask;				/* mask to find partition from hash value */
+	int      shift;				/* after masking, shift by this amount */
+} HashAggSpill;
+
+/*
+ * Represents work to be done for one pass of hash aggregation (with only one
+ * grouping set).
+ *
+ * Also tracks the bits of the hash already used for partition selection by
+ * earlier iterations, so that this batch can use new bits. If all bits have
+ * already been used, no partitioning will be done (any spilled data will go
+ * to a single output tape).
+ */
+typedef struct HashAggBatch
+{
+	int				 setno;			/* grouping set */
+	int				 used_bits;		/* number of bits of hash already used */
+	LogicalTapeSet	*tapeset;		/* borrowed reference to tape set */
+	int				 input_tapenum;	/* input partition tape */
+	int64			 input_tuples;	/* number of tuples in this batch */
+} HashAggBatch;
+
 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
 static void initialize_phase(AggState *aggstate, int newphase);
 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
@@ -275,11 +387,43 @@ static Bitmapset *find_unaggregated_cols(AggState *aggstate);
 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
 static void build_hash_tables(AggState *aggstate);
 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
+static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
+										  bool nullcheck);
+static long hash_choose_num_buckets(double hashentrysize,
+									long estimated_nbuckets,
+									Size memory);
+static int hash_choose_num_partitions(uint64 input_groups,
+									  double hashentrysize,
+									  int used_bits,
+									  int *log2_npartittions);
 static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash);
 static void lookup_hash_entries(AggState *aggstate);
 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
 static void agg_fill_hash_table(AggState *aggstate);
+static bool agg_refill_hash_table(AggState *aggstate);
 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
+static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
+static void hash_agg_check_limits(AggState *aggstate);
+static void hash_agg_enter_spill_mode(AggState *aggstate);
+static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
+									int npartitions);
+static void hashagg_finish_initial_spills(AggState *aggstate);
+static void hashagg_reset_spill_state(AggState *aggstate);
+static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
+									   int input_tapenum, int setno,
+									   int64 input_tuples, int used_bits);
+static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
+static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+							   int used_bits, uint64 input_tuples,
+							   double hashentrysize);
+static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
+								uint32 hash);
+static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
+								 int setno);
+static void hashagg_tapeinfo_init(AggState *aggstate);
+static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
+									int ndest);
+static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
 									  AggState *aggstate, EState *estate,
@@ -1275,9 +1419,9 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
  * We have a separate hashtable and associated perhash data structure for each
  * grouping set for which we're doing hashing.
  *
- * The contents of the hash tables always live in the hashcontext's per-tuple
- * memory context (there is only one of these for all tables together, since
- * they are all reset at the same time).
+ * The hash tables and their contents always live in the hashcontext's
+ * per-tuple memory context (there is only one of these for all tables
+ * together, since they are all reset at the same time).
  */
 static void
 build_hash_tables(AggState *aggstate)
@@ -1287,14 +1431,27 @@ build_hash_tables(AggState *aggstate)
 	for (setno = 0; setno < aggstate->num_hashes; ++setno)
 	{
 		AggStatePerHash perhash = &aggstate->perhash[setno];
+		long			nbuckets;
+		Size			memory;
+
+		if (perhash->hashtable != NULL)
+		{
+			ResetTupleHashTable(perhash->hashtable);
+			continue;
+		}
 
 		Assert(perhash->aggnode->numGroups > 0);
 
-		if (perhash->hashtable)
-			ResetTupleHashTable(perhash->hashtable);
-		else
-			build_hash_table(aggstate, setno, perhash->aggnode->numGroups);
+		memory = aggstate->hash_mem_limit / aggstate->num_hashes;
+
+		/* choose reasonable number of buckets per hashtable */
+		nbuckets = hash_choose_num_buckets(
+			aggstate->hashentrysize, perhash->aggnode->numGroups, memory);
+
+		build_hash_table(aggstate, setno, nbuckets);
 	}
+
+	aggstate->hash_ngroups_current = 0;
 }
 
 /*
@@ -1304,7 +1461,7 @@ static void
 build_hash_table(AggState *aggstate, int setno, long nbuckets)
 {
 	AggStatePerHash perhash = &aggstate->perhash[setno];
-	MemoryContext	metacxt = aggstate->ss.ps.state->es_query_cxt;
+	MemoryContext	metacxt = aggstate->hash_metacxt;
 	MemoryContext	hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
 	MemoryContext	tmpcxt	= aggstate->tmpcontext->ecxt_per_tuple_memory;
 	Size            additionalsize;
@@ -1487,14 +1644,326 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace)
 		transitionSpace;
 }
 
+/*
+ * hashagg_recompile_expressions()
+ *
+ * Identifies the right phase, compiles the right expression given the
+ * arguments, and then sets phase->evalfunc to that expression.
+ *
+ * Different versions of the compiled expression are needed depending on
+ * whether hash aggregation has spilled or not, and whether it's reading from
+ * the outer plan or a tape. Before spilling to disk, the expression reads
+ * from the outer plan and does not need to perform a NULL check. After
+ * HashAgg begins to spill, new groups will not be created in the hash table,
+ * and the AggStatePerGroup array may be NULL; therefore we need to add a null
+ * pointer check to the expression. Then, when reading spilled data from a
+ * tape, we change the outer slot type to be a fixed minimal tuple slot.
+ *
+ * It would be wasteful to recompile every time, so cache the compiled
+ * expressions in the AggStatePerPhase, and reuse when appropriate.
+ */
+static void
+hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
+{
+	AggStatePerPhase		 phase;
+	int						 i = minslot ? 1 : 0;
+	int						 j = nullcheck ? 1 : 0;
+
+	Assert(aggstate->aggstrategy == AGG_HASHED ||
+		   aggstate->aggstrategy == AGG_MIXED);
+
+	if (aggstate->aggstrategy == AGG_HASHED)
+		phase = &aggstate->phases[0];
+	else /* AGG_MIXED */
+		phase = &aggstate->phases[1];
+
+	if (phase->evaltrans_cache[i][j] == NULL)
+	{
+		const TupleTableSlotOps *outerops	= aggstate->ss.ps.outerops;
+		bool					 outerfixed = aggstate->ss.ps.outeropsfixed;
+		bool					 dohash		= true;
+		bool					 dosort;
+
+		dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
+
+		/* temporarily change the outerops while compiling the expression */
+		if (minslot)
+		{
+			aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
+			aggstate->ss.ps.outeropsfixed = true;
+		}
+
+		phase->evaltrans_cache[i][j] = ExecBuildAggTrans(
+			aggstate, phase, dosort, dohash, nullcheck);
+
+		/* change back */
+		aggstate->ss.ps.outerops = outerops;
+		aggstate->ss.ps.outeropsfixed = outerfixed;
+	}
+
+	phase->evaltrans = phase->evaltrans_cache[i][j];
+}
+
+/*
+ * Set limits that trigger spilling to avoid exceeding work_mem. Consider the
+ * number of partitions we expect to create (if we do spill).
+ *
+ * There are two limits: a memory limit, and also an ngroups limit. The
+ * ngroups limit becomes important when we expect transition values to grow
+ * substantially larger than the initial value.
+ */
+void
+hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
+					Size *mem_limit, uint64 *ngroups_limit,
+					int *num_partitions)
+{
+	int npartitions;
+	Size partition_mem;
+
+	/* if not expected to spill, use all of work_mem */
+	if (input_groups * hashentrysize < work_mem * 1024L)
+	{
+		*mem_limit = work_mem * 1024L;
+		*ngroups_limit = *mem_limit / hashentrysize;
+		return;
+	}
+
+	/*
+	 * Calculate expected memory requirements for spilling, which is the size
+	 * of the buffers needed for all the tapes that need to be open at
+	 * once. Then, subtract that from the memory available for holding hash
+	 * tables.
+	 */
+	npartitions = hash_choose_num_partitions(input_groups,
+											 hashentrysize,
+											 used_bits,
+											 NULL);
+	if (num_partitions != NULL)
+		*num_partitions = npartitions;
+
+	partition_mem =
+		HASHAGG_READ_BUFFER_SIZE +
+		HASHAGG_WRITE_BUFFER_SIZE * npartitions;
+
+	/*
+	 * Don't set the limit below 3/4 of work_mem. In that case, we are at the
+	 * minimum number of partitions, so we aren't going to dramatically exceed
+	 * work mem anyway.
+	 */
+	if (work_mem * 1024L > 4 * partition_mem)
+		*mem_limit = work_mem * 1024L - partition_mem;
+	else
+		*mem_limit = work_mem * 1024L * 0.75;
+
+	if (*mem_limit > hashentrysize)
+		*ngroups_limit = *mem_limit / hashentrysize;
+	else
+		*ngroups_limit = 1;
+}
+
+/*
+ * hash_agg_check_limits
+ *
+ * After adding a new group to the hash table, check whether we need to enter
+ * spill mode. Allocations may happen without adding new groups (for instance,
+ * if the transition state size grows), so this check is imperfect.
+ */
+static void
+hash_agg_check_limits(AggState *aggstate)
+{
+	uint64 ngroups = aggstate->hash_ngroups_current;
+	Size meta_mem = MemoryContextMemAllocated(
+		aggstate->hash_metacxt, true);
+	Size hash_mem = MemoryContextMemAllocated(
+		aggstate->hashcontext->ecxt_per_tuple_memory, true);
+
+	/*
+	 * Don't spill unless there's at least one group in the hash table so we
+	 * can be sure to make progress even in edge cases.
+	 */
+	if (aggstate->hash_ngroups_current > 0 &&
+		(meta_mem + hash_mem > aggstate->hash_mem_limit ||
+		 ngroups > aggstate->hash_ngroups_limit))
+	{
+		hash_agg_enter_spill_mode(aggstate);
+	}
+}
+
+/*
+ * Enter "spill mode", meaning that no new groups are added to any of the hash
+ * tables. Tuples that would create a new group are instead spilled, and
+ * processed later.
+ */
+static void
+hash_agg_enter_spill_mode(AggState *aggstate)
+{
+	aggstate->hash_spill_mode = true;
+	hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
+
+	if (!aggstate->hash_ever_spilled)
+	{
+		Assert(aggstate->hash_tapeinfo == NULL);
+		Assert(aggstate->hash_spills == NULL);
+
+		aggstate->hash_ever_spilled = true;
+
+		hashagg_tapeinfo_init(aggstate);
+
+		aggstate->hash_spills = palloc(
+			sizeof(HashAggSpill) * aggstate->num_hashes);
+
+		for (int setno = 0; setno < aggstate->num_hashes; setno++)
+		{
+			AggStatePerHash	 perhash = &aggstate->perhash[setno];
+			HashAggSpill	*spill	 = &aggstate->hash_spills[setno];
+
+			hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+							   perhash->aggnode->numGroups,
+							   aggstate->hashentrysize);
+		}
+	}
+}
+
+/*
+ * Update metrics after filling the hash table.
+ *
+ * If reading from the outer plan, from_tape should be false; if reading from
+ * another tape, from_tape should be true.
+ */
+static void
+hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
+{
+	Size	meta_mem;
+	Size	hash_mem;
+	Size	buffer_mem;
+	Size	total_mem;
+
+	if (aggstate->aggstrategy != AGG_MIXED &&
+		aggstate->aggstrategy != AGG_HASHED)
+		return;
+
+	/* memory for the hash table itself */
+	meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
+
+	/* memory for the group keys and transition states */
+	hash_mem = MemoryContextMemAllocated(
+		aggstate->hashcontext->ecxt_per_tuple_memory, true);
+
+	/* memory for read/write tape buffers, if spilled */
+	buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
+	if (from_tape)
+		buffer_mem += HASHAGG_READ_BUFFER_SIZE;
+
+	/* update peak mem */
+	total_mem = meta_mem + hash_mem + buffer_mem;
+	if (total_mem > aggstate->hash_mem_peak)
+		aggstate->hash_mem_peak = total_mem;
+
+	/* update disk usage */
+	if (aggstate->hash_tapeinfo != NULL)
+	{
+		uint64	disk_used = LogicalTapeSetBlocks(
+			aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+
+		if (aggstate->hash_disk_used < disk_used)
+			aggstate->hash_disk_used = disk_used;
+	}
+
+	/* update hashentrysize estimate based on contents */
+	if (aggstate->hash_ngroups_current > 0)
+	{
+		aggstate->hashentrysize =
+			hash_mem / (double)aggstate->hash_ngroups_current;
+	}
+}
+
+/*
+ * Choose a reasonable number of buckets for the initial hash table size.
+ */
+static long
+hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
+{
+	long	max_nbuckets;
+	long	nbuckets = ngroups;
+
+	max_nbuckets = memory / hashentrysize;
+
+	/*
+	 * Leave room for slop to avoid a case where the initial hash table size
+	 * exceeds the memory limit (though that may still happen in edge cases).
+	 */
+	max_nbuckets *= 0.75;
+
+	if (nbuckets > max_nbuckets)
+		nbuckets = max_nbuckets;
+	if (nbuckets < HASHAGG_MIN_BUCKETS)
+		nbuckets = HASHAGG_MIN_BUCKETS;
+	return nbuckets;
+}
+
+/*
+ * Determine the number of partitions to create when spilling, which will
+ * always be a power of two. If log2_npartitions is non-NULL, set
+ * *log2_npartitions to the log2() of the number of partitions.
+ */
+static int
+hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
+						   int used_bits, int *log2_npartitions)
+{
+	Size	mem_wanted;
+	int		partition_limit;
+	int		npartitions;
+	int		partition_bits;
+
+	/*
+	 * Avoid creating so many partitions that the memory requirements of the
+	 * open partition files are greater than 1/4 of work_mem.
+	 */
+	partition_limit =
+		(work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
+		HASHAGG_WRITE_BUFFER_SIZE;
+
+	mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
+
+	/* make enough partitions so that each one is likely to fit in memory */
+	npartitions = 1 + (mem_wanted / (work_mem * 1024L));
+
+	if (npartitions > partition_limit)
+		npartitions = partition_limit;
+
+	if (npartitions < HASHAGG_MIN_PARTITIONS)
+		npartitions = HASHAGG_MIN_PARTITIONS;
+	if (npartitions > HASHAGG_MAX_PARTITIONS)
+		npartitions = HASHAGG_MAX_PARTITIONS;
+
+	/* ceil(log2(npartitions)) */
+	partition_bits = my_log2(npartitions);
+
+	/* make sure that we don't exhaust the hash bits */
+	if (partition_bits + used_bits >= 32)
+		partition_bits = 32 - used_bits;
+
+	if (log2_npartitions != NULL)
+		*log2_npartitions = partition_bits;
+
+	/* number of partitions will be a power of two */
+	npartitions = 1L << partition_bits;
+
+	return npartitions;
+}
+
 /*
  * Find or create a hashtable entry for the tuple group containing the current
  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
  * set (which the caller must have selected - note that initialize_aggregate
  * depends on this).
  *
- * When called, CurrentMemoryContext should be the per-query context. The
- * already-calculated hash value for the tuple must be specified.
+ * When called, CurrentMemoryContext should be the per-query context.
+ *
+ * If the hash table is at the memory limit, then only find existing hashtable
+ * entries; don't create new ones. If a tuple's group is not already present
+ * in the hash table for the current grouping set, return NULL and the caller
+ * will spill it to disk.
  */
 static AggStatePerGroup
 lookup_hash_entry(AggState *aggstate, uint32 hash)
@@ -1502,16 +1971,26 @@ lookup_hash_entry(AggState *aggstate, uint32 hash)
 	AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
 	TupleTableSlot *hashslot = perhash->hashslot;
 	TupleHashEntryData *entry;
-	bool		isnew;
+	bool			isnew = false;
+	bool		   *p_isnew;
+
+	/* if hash table already spilled, don't create new entries */
+	p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
 
 	/* find or create the hashtable entry using the filtered tuple */
-	entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, &isnew,
+	entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
 									 hash);
 
+	if (entry == NULL)
+		return NULL;
+
 	if (isnew)
 	{
-		AggStatePerGroup pergroup;
-		int			transno;
+		AggStatePerGroup	pergroup;
+		int					transno;
+
+		aggstate->hash_ngroups_current++;
+		hash_agg_check_limits(aggstate);
 
 		pergroup = (AggStatePerGroup)
 			MemoryContextAlloc(perhash->hashtable->tablecxt,
@@ -1539,23 +2018,48 @@ lookup_hash_entry(AggState *aggstate, uint32 hash)
  * returning an array of pergroup pointers suitable for advance_aggregates.
  *
  * Be aware that lookup_hash_entry can reset the tmpcontext.
+ *
+ * Some entries may be left NULL if we have reached the limit and have begun
+ * to spill. The same tuple will belong to different groups for each set, so
+ * may match a group already in memory for one set and match a group not in
+ * memory for another set. If we have begun to spill and a tuple doesn't match
+ * a group in memory for a particular set, it will be spilled.
+ *
+ * NB: It's possible to spill the same tuple for several different grouping
+ * sets. This may seem wasteful, but it's actually a trade-off: if we spill
+ * the tuple multiple times for multiple grouping sets, it can be partitioned
+ * for each grouping set, making the refilling of the hash table very
+ * efficient.
  */
 static void
 lookup_hash_entries(AggState *aggstate)
 {
-	int			numHashes = aggstate->num_hashes;
 	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
 	int			setno;
 
-	for (setno = 0; setno < numHashes; setno++)
+	for (setno = 0; setno < aggstate->num_hashes; setno++)
 	{
-		AggStatePerHash perhash = &aggstate->perhash[setno];
+		AggStatePerHash	perhash = &aggstate->perhash[setno];
 		uint32			hash;
 
 		select_current_set(aggstate, setno, true);
 		prepare_hash_slot(aggstate);
 		hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
 		pergroup[setno] = lookup_hash_entry(aggstate, hash);
+
+		/* check to see if we need to spill the tuple for this grouping set */
+		if (pergroup[setno] == NULL)
+		{
+			HashAggSpill	*spill	 = &aggstate->hash_spills[setno];
+			TupleTableSlot	*slot	 = aggstate->tmpcontext->ecxt_outertuple;
+
+			if (spill->partitions == NULL)
+				hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+								   perhash->aggnode->numGroups,
+								   aggstate->hashentrysize);
+
+			hashagg_spill_tuple(spill, slot, hash);
+		}
 	}
 }
 
@@ -1878,6 +2382,12 @@ agg_retrieve_direct(AggState *aggstate)
 					if (TupIsNull(outerslot))
 					{
 						/* no more outer-plan tuples available */
+
+						/* if we built hash tables, finalize any spills */
+						if (aggstate->aggstrategy == AGG_MIXED &&
+							aggstate->current_phase == 1)
+							hashagg_finish_initial_spills(aggstate);
+
 						if (hasGroupingSets)
 						{
 							aggstate->input_done = true;
@@ -1980,6 +2490,10 @@ agg_fill_hash_table(AggState *aggstate)
 		ResetExprContext(aggstate->tmpcontext);
 	}
 
+	/* finalize spills, if any */
+	hashagg_finish_initial_spills(aggstate);
+
+	aggstate->input_done = true;
 	aggstate->table_filled = true;
 	/* Initialize to walk the first hash table */
 	select_current_set(aggstate, 0, true);
@@ -1987,11 +2501,171 @@ agg_fill_hash_table(AggState *aggstate)
 						   &aggstate->perhash[0].hashiter);
 }
 
+/*
+ * If any data was spilled during hash aggregation, reset the hash table and
+ * reprocess one batch of spilled data. After reprocessing a batch, the hash
+ * table will again contain data, ready to be consumed by
+ * agg_retrieve_hash_table_in_memory().
+ *
+ * Should only be called after all in memory hash table entries have been
+ * finalized and emitted.
+ *
+ * Return false when input is exhausted and there's no more work to be done;
+ * otherwise return true.
+ */
+static bool
+agg_refill_hash_table(AggState *aggstate)
+{
+	HashAggBatch	*batch;
+	HashAggSpill	 spill;
+	HashTapeInfo	*tapeinfo = aggstate->hash_tapeinfo;
+	uint64			 ngroups_estimate;
+
+	if (aggstate->hash_batches == NIL)
+		return false;
+
+	batch = linitial(aggstate->hash_batches);
+	aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
+
+	/*
+	 * Estimate the number of groups for this batch as the total number of
+	 * tuples in its input file. Although that's a worst case, it's not bad
+	 * here for two reasons: (1) overestimating is better than
+	 * underestimating; and (2) we've already scanned the relation once, so
+	 * it's likely that we've already finalized many of the common values.
+	 */
+	ngroups_estimate = batch->input_tuples;
+
+	hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+					   ngroups_estimate, aggstate->hashentrysize);
+
+	hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
+						batch->used_bits, &aggstate->hash_mem_limit,
+						&aggstate->hash_ngroups_limit, NULL);
+
+	/* there could be residual pergroup pointers; clear them */
+	for (int setoff = 0;
+		 setoff < aggstate->maxsets + aggstate->num_hashes;
+		 setoff++)
+		aggstate->all_pergroups[setoff] = NULL;
+
+	/* free memory and reset hash tables */
+	ReScanExprContext(aggstate->hashcontext);
+	for (int setno = 0; setno < aggstate->num_hashes; setno++)
+		ResetTupleHashTable(aggstate->perhash[setno].hashtable);
+
+	aggstate->hash_ngroups_current = 0;
+
+	/*
+	 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
+	 * happens in phase 0. So, we switch to phase 1 when processing a batch,
+	 * and back to phase 0 after the batch is done.
+	 */
+	Assert(aggstate->current_phase == 0);
+	if (aggstate->phase->aggstrategy == AGG_MIXED)
+	{
+		aggstate->current_phase = 1;
+		aggstate->phase = &aggstate->phases[aggstate->current_phase];
+	}
+
+	select_current_set(aggstate, batch->setno, true);
+
+	/*
+	 * Spilled tuples are always read back as MinimalTuples, which may be
+	 * different from the outer plan, so recompile the aggregate expressions.
+	 *
+	 * We still need the NULL check, because we are only processing one
+	 * grouping set at a time and the rest will be NULL.
+	 */
+	hashagg_recompile_expressions(aggstate, true, true);
+
+	LogicalTapeRewindForRead(tapeinfo->tapeset, batch->input_tapenum,
+							 HASHAGG_READ_BUFFER_SIZE);
+	for (;;) {
+		TupleTableSlot	*slot = aggstate->hash_spill_slot;
+		MinimalTuple	 tuple;
+		uint32			 hash;
+
+		CHECK_FOR_INTERRUPTS();
+
+		tuple = hashagg_batch_read(batch, &hash);
+		if (tuple == NULL)
+			break;
+
+		ExecStoreMinimalTuple(tuple, slot, true);
+		aggstate->tmpcontext->ecxt_outertuple = slot;
+
+		prepare_hash_slot(aggstate);
+		aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(aggstate, hash);
+
+		/* if there's no memory for a new group, spill */
+		if (aggstate->hash_pergroup[batch->setno] == NULL)
+			hashagg_spill_tuple(&spill, slot, hash);
+
+		/* Advance the aggregates (or combine functions) */
+		advance_aggregates(aggstate);
+
+		/*
+		 * Reset per-input-tuple context after each tuple, but note that the
+		 * hash lookups do this too
+		 */
+		ResetExprContext(aggstate->tmpcontext);
+	}
+
+	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+
+	/* change back to phase 0 */
+	aggstate->current_phase = 0;
+	aggstate->phase = &aggstate->phases[aggstate->current_phase];
+
+	hash_agg_update_metrics(aggstate, true, spill.npartitions);
+	hashagg_spill_finish(aggstate, &spill, batch->setno);
+	aggstate->hash_spill_mode = false;
+
+	/* prepare to walk the first hash table */
+	select_current_set(aggstate, batch->setno, true);
+	ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
+						   &aggstate->perhash[batch->setno].hashiter);
+
+	pfree(batch);
+
+	return true;
+}
+
 /*
  * ExecAgg for hashed case: retrieving groups from hash table
+ *
+ * After exhausting in-memory tuples, also try refilling the hash table using
+ * previously-spilled tuples. Only returns NULL after all in-memory and
+ * spilled tuples are exhausted.
  */
 static TupleTableSlot *
 agg_retrieve_hash_table(AggState *aggstate)
+{
+	TupleTableSlot *result = NULL;
+
+	while (result == NULL)
+	{
+		result = agg_retrieve_hash_table_in_memory(aggstate);
+		if (result == NULL)
+		{
+			if (!agg_refill_hash_table(aggstate))
+			{
+				aggstate->agg_done = true;
+				break;
+			}
+		}
+	}
+
+	return result;
+}
+
+/*
+ * Retrieve the groups from the in-memory hash tables without considering any
+ * spilled tuples.
+ */
+static TupleTableSlot *
+agg_retrieve_hash_table_in_memory(AggState *aggstate)
 {
 	ExprContext *econtext;
 	AggStatePerAgg peragg;
@@ -2020,7 +2694,7 @@ agg_retrieve_hash_table(AggState *aggstate)
 	 * We loop retrieving groups until we find one satisfying
 	 * aggstate->ss.ps.qual
 	 */
-	while (!aggstate->agg_done)
+	for (;;)
 	{
 		TupleTableSlot *hashslot = perhash->hashslot;
 		int			i;
@@ -2051,8 +2725,6 @@ agg_retrieve_hash_table(AggState *aggstate)
 			}
 			else
 			{
-				/* No more hashtables, so done */
-				aggstate->agg_done = true;
 				return NULL;
 			}
 		}
@@ -2109,6 +2781,315 @@ agg_retrieve_hash_table(AggState *aggstate)
 	return NULL;
 }
 
+/*
+ * Initialize HashTapeInfo
+ */
+static void
+hashagg_tapeinfo_init(AggState *aggstate)
+{
+	HashTapeInfo	*tapeinfo	= palloc(sizeof(HashTapeInfo));
+	int				 init_tapes = 16;	/* expanded dynamically */
+
+	tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1);
+	tapeinfo->ntapes = init_tapes;
+	tapeinfo->nfreetapes = init_tapes;
+	tapeinfo->freetapes_alloc = init_tapes;
+	tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
+	for (int i = 0; i < init_tapes; i++)
+		tapeinfo->freetapes[i] = i;
+
+	aggstate->hash_tapeinfo = tapeinfo;
+}
+
+/*
+ * Assign unused tapes to spill partitions, extending the tape set if
+ * necessary.
+ */
+static void
+hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
+						int npartitions)
+{
+	int partidx = 0;
+
+	/* use free tapes if available */
+	while (partidx < npartitions && tapeinfo->nfreetapes > 0)
+		partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
+
+	if (partidx < npartitions)
+	{
+		LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
+
+		while (partidx < npartitions)
+			partitions[partidx++] = tapeinfo->ntapes++;
+	}
+}
+
+/*
+ * After a tape has already been written to and then read, this function
+ * rewinds it for writing and adds it to the free list.
+ */
+static void
+hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
+{
+	LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
+	if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
+	{
+		tapeinfo->freetapes_alloc <<= 1;
+		tapeinfo->freetapes = repalloc(
+			tapeinfo->freetapes, tapeinfo->freetapes_alloc * sizeof(int));
+	}
+	tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
+}
+
+/*
+ * hashagg_spill_init
+ *
+ * Called after we determined that spilling is necessary. Chooses the number
+ * of partitions to create, and initializes them.
+ */
+static void
+hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+				   uint64 input_groups, double hashentrysize)
+{
+	int		npartitions;
+	int     partition_bits;
+
+	npartitions = hash_choose_num_partitions(
+		input_groups, hashentrysize, used_bits, &partition_bits);
+
+	spill->partitions = palloc0(sizeof(int) * npartitions);
+	spill->ntuples = palloc0(sizeof(int64) * npartitions);
+
+	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+
+	spill->tapeset = tapeinfo->tapeset;
+	spill->shift = 32 - used_bits - partition_bits;
+	spill->mask = (npartitions - 1) << spill->shift;
+	spill->npartitions = npartitions;
+}
+
+/*
+ * hashagg_spill_tuple
+ *
+ * No room for new groups in the hash table. Save for later in the appropriate
+ * partition.
+ */
+static Size
+hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
+{
+	LogicalTapeSet		*tapeset = spill->tapeset;
+	int					 partition;
+	MinimalTuple		 tuple;
+	int					 tapenum;
+	int					 total_written = 0;
+	bool				 shouldFree;
+
+	Assert(spill->partitions != NULL);
+
+	/* XXX: may contain unnecessary attributes, should project */
+	tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+	partition = (hash & spill->mask) >> spill->shift;
+	spill->ntuples[partition]++;
+
+	tapenum = spill->partitions[partition];
+
+	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+	total_written += sizeof(uint32);
+
+	LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+	total_written += tuple->t_len;
+
+	if (shouldFree)
+		pfree(tuple);
+
+	return total_written;
+}
+
+/*
+ * hashagg_batch_new
+ *
+ * Construct a HashAggBatch item, which represents one iteration of HashAgg to
+ * be done.
+ */
+static HashAggBatch *
+hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+				  int64 input_tuples, int used_bits)
+{
+	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
+
+	batch->setno = setno;
+	batch->used_bits = used_bits;
+	batch->tapeset = tapeset;
+	batch->input_tapenum = tapenum;
+	batch->input_tuples = input_tuples;
+
+	return batch;
+}
+
+/*
+ * read_spilled_tuple
+ * 		read the next tuple from a batch file.  Return NULL if no more.
+ */
+static MinimalTuple
+hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
+{
+	LogicalTapeSet *tapeset = batch->tapeset;
+	int				tapenum = batch->input_tapenum;
+	MinimalTuple	tuple;
+	uint32			t_len;
+	size_t			nread;
+	uint32			hash;
+
+	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+	if (nread == 0)
+		return NULL;
+	if (nread != sizeof(uint32))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+						tapenum, sizeof(uint32), nread)));
+	if (hashp != NULL)
+		*hashp = hash;
+
+	nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+	if (nread != sizeof(uint32))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+						tapenum, sizeof(uint32), nread)));
+
+	tuple = (MinimalTuple) palloc(t_len);
+	tuple->t_len = t_len;
+
+	nread = LogicalTapeRead(tapeset, tapenum,
+							(void *)((char *)tuple + sizeof(uint32)),
+							t_len - sizeof(uint32));
+	if (nread != t_len - sizeof(uint32))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+						tapenum, t_len - sizeof(uint32), nread)));
+
+	return tuple;
+}
+
+/*
+ * hashagg_finish_initial_spills
+ *
+ * After a HashAggBatch has been processed, it may have spilled tuples to
+ * disk. If so, turn the spilled partitions into new batches that must later
+ * be executed.
+ */
+static void
+hashagg_finish_initial_spills(AggState *aggstate)
+{
+	int setno;
+	int total_npartitions = 0;
+
+	if (aggstate->hash_spills != NULL)
+	{
+		for (setno = 0; setno < aggstate->num_hashes; setno++)
+		{
+			HashAggSpill *spill = &aggstate->hash_spills[setno];
+			total_npartitions += spill->npartitions;
+			hashagg_spill_finish(aggstate, spill, setno);
+		}
+
+		/*
+		 * We're not processing tuples from outer plan any more; only
+		 * processing batches of spilled tuples. The initial spill structures
+		 * are no longer needed.
+		 */
+		pfree(aggstate->hash_spills);
+		aggstate->hash_spills = NULL;
+	}
+
+	hash_agg_update_metrics(aggstate, false, total_npartitions);
+	aggstate->hash_spill_mode = false;
+}
+
+/*
+ * hashagg_spill_finish
+ *
+ * Transform spill partitions into new batches.
+ */
+static void
+hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
+{
+	int i;
+	int used_bits = 32 - spill->shift;
+
+	if (spill->npartitions == 0)
+		return;	/* didn't spill */
+
+	for (i = 0; i < spill->npartitions; i++)
+	{
+		int				 tapenum = spill->partitions[i];
+		HashAggBatch    *new_batch;
+
+		/* if the partition is empty, don't create a new batch of work */
+		if (spill->ntuples[i] == 0)
+			continue;
+
+		new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
+									  tapenum, setno, spill->ntuples[i],
+									  used_bits);
+		aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
+		aggstate->hash_batches_used++;
+	}
+
+	pfree(spill->ntuples);
+	pfree(spill->partitions);
+}
+
+/*
+ * Free resources related to a spilled HashAgg.
+ */
+static void
+hashagg_reset_spill_state(AggState *aggstate)
+{
+	ListCell *lc;
+
+	/* free spills from initial pass */
+	if (aggstate->hash_spills != NULL)
+	{
+		int setno;
+
+		for (setno = 0; setno < aggstate->num_hashes; setno++)
+		{
+			HashAggSpill *spill = &aggstate->hash_spills[setno];
+			if (spill->ntuples != NULL)
+				pfree(spill->ntuples);
+			if (spill->partitions != NULL)
+				pfree(spill->partitions);
+		}
+		pfree(aggstate->hash_spills);
+		aggstate->hash_spills = NULL;
+	}
+
+	/* free batches */
+	foreach(lc, aggstate->hash_batches)
+	{
+		HashAggBatch *batch = (HashAggBatch*) lfirst(lc);
+		pfree(batch);
+	}
+	list_free(aggstate->hash_batches);
+	aggstate->hash_batches = NIL;
+
+	/* close tape set */
+	if (aggstate->hash_tapeinfo != NULL)
+	{
+		HashTapeInfo	*tapeinfo = aggstate->hash_tapeinfo;
+
+		LogicalTapeSetClose(tapeinfo->tapeset);
+		pfree(tapeinfo->freetapes);
+		pfree(tapeinfo);
+		aggstate->hash_tapeinfo = NULL;
+	}
+}
+
+
 /* -----------------
  * ExecInitAgg
  *
@@ -2518,9 +3499,36 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	 */
 	if (use_hashing)
 	{
+		Plan   *outerplan = outerPlan(node);
+		uint64	totalGroups = 0;
+		int 	i;
+
+		aggstate->hash_metacxt = AllocSetContextCreate(
+			aggstate->ss.ps.state->es_query_cxt,
+			"HashAgg meta context",
+			ALLOCSET_DEFAULT_SIZES);
+		aggstate->hash_spill_slot = ExecInitExtraTupleSlot(
+			estate, scanDesc, &TTSOpsMinimalTuple);
+
 		/* this is an array of pointers, not structures */
 		aggstate->hash_pergroup = pergroups;
 
+		aggstate->hashentrysize = hash_agg_entry_size(
+			aggstate->numtrans, outerplan->plan_width, node->transitionSpace);
+
+		/*
+		 * Consider all of the grouping sets together when setting the limits
+		 * and estimating the number of partitions. This can be inaccurate
+		 * when there is more than one grouping set, but should still be
+		 * reasonable.
+		 */
+		for (i = 0; i < aggstate->num_hashes; i++)
+			totalGroups = aggstate->perhash[i].aggnode->numGroups;
+
+		hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
+							&aggstate->hash_mem_limit,
+							&aggstate->hash_ngroups_limit,
+							&aggstate->hash_planned_partitions);
 		find_hash_columns(aggstate);
 		build_hash_tables(aggstate);
 		aggstate->table_filled = false;
@@ -2931,6 +3939,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
 											 false);
 
+		/* cache compiled expression for outer slot without NULL check */
+		phase->evaltrans_cache[0][0] = phase->evaltrans;
 	}
 
 	return aggstate;
@@ -3424,6 +4434,14 @@ ExecEndAgg(AggState *node)
 	if (node->sort_out)
 		tuplesort_end(node->sort_out);
 
+	hashagg_reset_spill_state(node);
+
+	if (node->hash_metacxt != NULL)
+	{
+		MemoryContextDelete(node->hash_metacxt);
+		node->hash_metacxt = NULL;
+	}
+
 	for (transno = 0; transno < node->numtrans; transno++)
 	{
 		AggStatePerTrans pertrans = &node->pertrans[transno];
@@ -3479,12 +4497,13 @@ ExecReScanAgg(AggState *node)
 			return;
 
 		/*
-		 * If we do have the hash table, and the subplan does not have any
-		 * parameter changes, and none of our own parameter changes affect
-		 * input expressions of the aggregated functions, then we can just
-		 * rescan the existing hash table; no need to build it again.
+		 * If we do have the hash table, and it never spilled, and the subplan
+		 * does not have any parameter changes, and none of our own parameter
+		 * changes affect input expressions of the aggregated functions, then
+		 * we can just rescan the existing hash table; no need to build it
+		 * again.
 		 */
-		if (outerPlan->chgParam == NULL &&
+		if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
 			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
 		{
 			ResetTupleHashIterator(node->perhash[0].hashtable,
@@ -3541,11 +4560,19 @@ ExecReScanAgg(AggState *node)
 	 */
 	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
 	{
+		hashagg_reset_spill_state(node);
+
+		node->hash_ever_spilled = false;
+		node->hash_spill_mode = false;
+		node->hash_ngroups_current = 0;
+
 		ReScanExprContext(node->hashcontext);
 		/* Rebuild an empty hash table */
 		build_hash_tables(node);
 		node->table_filled = false;
 		/* iterator will be reset when the table is filled */
+
+		hashagg_recompile_expressions(node, false, false);
 	}
 
 	if (node->aggstrategy != AGG_HASHED)
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index b5a0033721f..8cf694b61dc 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -77,6 +77,7 @@
 #include "access/htup_details.h"
 #include "access/tsmapi.h"
 #include "executor/executor.h"
+#include "executor/nodeAgg.h"
 #include "executor/nodeHash.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -128,6 +129,8 @@ bool		enable_bitmapscan = true;
 bool		enable_tidscan = true;
 bool		enable_sort = true;
 bool		enable_hashagg = true;
+bool		enable_hashagg_disk = true;
+bool		enable_groupingsets_hash_disk = false;
 bool		enable_nestloop = true;
 bool		enable_material = true;
 bool		enable_mergejoin = true;
@@ -2153,7 +2156,7 @@ cost_agg(Path *path, PlannerInfo *root,
 		 int numGroupCols, double numGroups,
 		 List *quals,
 		 Cost input_startup_cost, Cost input_total_cost,
-		 double input_tuples)
+		 double input_tuples, double input_width)
 {
 	double		output_tuples;
 	Cost		startup_cost;
@@ -2228,14 +2231,79 @@ cost_agg(Path *path, PlannerInfo *root,
 			startup_cost += disable_cost;
 		startup_cost += aggcosts->transCost.startup;
 		startup_cost += aggcosts->transCost.per_tuple * input_tuples;
+		/* cost of computing hash value */
 		startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples;
 		startup_cost += aggcosts->finalCost.startup;
+
 		total_cost = startup_cost;
 		total_cost += aggcosts->finalCost.per_tuple * numGroups;
+		/* cost of retrieving from hash table */
 		total_cost += cpu_tuple_cost * numGroups;
 		output_tuples = numGroups;
 	}
 
+	/*
+	 * Add the disk costs of hash aggregation that spills to disk.
+	 *
+	 * Groups that go into the hash table stay in memory until finalized,
+	 * so spilling and reprocessing tuples doesn't incur additional
+	 * invocations of transCost or finalCost. Furthermore, the computed
+	 * hash value is stored with the spilled tuples, so we don't incur
+	 * extra invocations of the hash function.
+	 *
+	 * Hash Agg begins returning tuples after the first batch is
+	 * complete. Accrue writes (spilled tuples) to startup_cost and to
+	 * total_cost; accrue reads only to total_cost.
+	 */
+	if (aggstrategy == AGG_HASHED || aggstrategy == AGG_MIXED)
+	{
+		double	pages_written = 0.0;
+		double	pages_read	  = 0.0;
+		double	hashentrysize;
+		double	nbatches;
+		Size	mem_limit;
+		uint64	ngroups_limit;
+		int		num_partitions;
+
+
+		/*
+		 * Estimate number of batches based on the computed limits. If less
+		 * than or equal to one, all groups are expected to fit in memory;
+		 * otherwise we expect to spill.
+		 */
+		hashentrysize = hash_agg_entry_size(
+			aggcosts->numAggs, input_width, aggcosts->transitionSpace);
+		hash_agg_set_limits(hashentrysize, numGroups, 0, &mem_limit,
+							&ngroups_limit, &num_partitions);
+
+		nbatches = Max( (numGroups * hashentrysize) / mem_limit,
+						numGroups / ngroups_limit );
+
+		/*
+		 * Estimate number of pages read and written. For each level of
+		 * recursion, a tuple must be written and then later read.
+		 */
+		if (nbatches > 1.0)
+		{
+			double depth;
+			double pages;
+
+			pages = relation_byte_size(input_tuples, input_width) / BLCKSZ;
+
+			/*
+			 * The number of partitions can change at different levels of
+			 * recursion; but for the purposes of this calculation assume it
+			 * stays constant.
+			 */
+			depth = ceil( log(nbatches - 1) / log(num_partitions) );
+			pages_written = pages_read = pages * depth;
+		}
+
+		startup_cost += pages_written * random_page_cost;
+		total_cost += pages_written * random_page_cost;
+		total_cost += pages_read * seq_page_cost;
+	}
+
 	/*
 	 * If there are quals (HAVING quals), account for their cost and
 	 * selectivity.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index b44efd6314c..eb25c2f4707 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -4258,11 +4258,12 @@ consider_groupingsets_paths(PlannerInfo *root,
 											  dNumGroups - exclude_groups);
 
 		/*
-		 * gd->rollups is empty if we have only unsortable columns to work
-		 * with.  Override work_mem in that case; otherwise, we'll rely on the
-		 * sorted-input case to generate usable mixed paths.
+		 * If we have sortable columns to work with (gd->rollups is non-empty)
+		 * and enable_groupingsets_hash_disk is disabled, don't generate
+		 * hash-based paths that will exceed work_mem.
 		 */
-		if (hashsize > work_mem * 1024L && gd->rollups)
+		if (!enable_groupingsets_hash_disk &&
+			hashsize > work_mem * 1024L && gd->rollups)
 			return;				/* nope, won't fit */
 
 		/*
@@ -6528,7 +6529,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 			 * were unable to sort above, then we'd better generate a Path, so
 			 * that we at least have one.
 			 */
-			if (hashaggtablesize < work_mem * 1024L ||
+			if (enable_hashagg_disk ||
+				hashaggtablesize < work_mem * 1024L ||
 				grouped_rel->pathlist == NIL)
 			{
 				/*
@@ -6561,7 +6563,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 														  agg_final_costs,
 														  dNumGroups);
 
-			if (hashaggtablesize < work_mem * 1024L)
+			if (enable_hashagg_disk ||
+				hashaggtablesize < work_mem * 1024L)
 				add_path(grouped_rel, (Path *)
 						 create_agg_path(root,
 										 grouped_rel,
@@ -6830,7 +6833,7 @@ create_partial_grouping_paths(PlannerInfo *root,
 		 * Tentatively produce a partial HashAgg Path, depending on if it
 		 * looks as if the hash table will fit in work_mem.
 		 */
-		if (hashaggtablesize < work_mem * 1024L &&
+		if ((enable_hashagg_disk || hashaggtablesize < work_mem * 1024L) &&
 			cheapest_total_path != NULL)
 		{
 			add_path(partially_grouped_rel, (Path *)
@@ -6857,7 +6860,7 @@ create_partial_grouping_paths(PlannerInfo *root,
 									   dNumPartialPartialGroups);
 
 		/* Do the same for partial paths. */
-		if (hashaggtablesize < work_mem * 1024L &&
+		if ((enable_hashagg_disk || hashaggtablesize < work_mem * 1024L) &&
 			cheapest_partial_path != NULL)
 		{
 			add_partial_path(partially_grouped_rel, (Path *)
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 1a23e18970d..951aed80e7a 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -1072,7 +1072,7 @@ choose_hashed_setop(PlannerInfo *root, List *groupClauses,
 			 numGroupCols, dNumGroups,
 			 NIL,
 			 input_path->startup_cost, input_path->total_cost,
-			 input_path->rows);
+			 input_path->rows, input_path->pathtarget->width);
 
 	/*
 	 * Now for the sorted case.  Note that the input is *always* unsorted,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index d9ce5162116..8ba8122ee2f 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1704,7 +1704,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 					 NIL,
 					 subpath->startup_cost,
 					 subpath->total_cost,
-					 rel->rows);
+					 rel->rows,
+					 subpath->pathtarget->width);
 	}
 
 	if (sjinfo->semi_can_btree && sjinfo->semi_can_hash)
@@ -2958,7 +2959,7 @@ create_agg_path(PlannerInfo *root,
 			 list_length(groupClause), numGroups,
 			 qual,
 			 subpath->startup_cost, subpath->total_cost,
-			 subpath->rows);
+			 subpath->rows, subpath->pathtarget->width);
 
 	/* add tlist eval cost for each output row */
 	pathnode->path.startup_cost += target->cost.startup;
@@ -3069,7 +3070,8 @@ create_groupingsets_path(PlannerInfo *root,
 					 having_qual,
 					 subpath->startup_cost,
 					 subpath->total_cost,
-					 subpath->rows);
+					 subpath->rows,
+					 subpath->pathtarget->width);
 			is_first = false;
 			if (!rollup->is_hashed)
 				is_first_sort = false;
@@ -3092,7 +3094,8 @@ create_groupingsets_path(PlannerInfo *root,
 						 rollup->numGroups,
 						 having_qual,
 						 0.0, 0.0,
-						 subpath->rows);
+						 subpath->rows,
+						 subpath->pathtarget->width);
 				if (!rollup->is_hashed)
 					is_first_sort = false;
 			}
@@ -3117,7 +3120,8 @@ create_groupingsets_path(PlannerInfo *root,
 						 having_qual,
 						 sort_path.startup_cost,
 						 sort_path.total_cost,
-						 sort_path.rows);
+						 sort_path.rows,
+						 subpath->pathtarget->width);
 			}
 
 			pathnode->path.total_cost += agg_path.total_cost;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4c6d6486623..64da8882082 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -999,6 +999,26 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_hashagg_disk", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of hashed aggregation plans that are expected to exceed work_mem."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_hashagg_disk,
+		true,
+		NULL, NULL, NULL
+	},
+	{
+		{"enable_groupingsets_hash_disk", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of hashed aggregation plans for groupingsets when the total size of the hash tables is expected to exceed work_mem."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_groupingsets_hash_disk,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_material", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of materialization."),
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index 264916f9a92..a5b8a004d1e 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -280,6 +280,11 @@ typedef struct AggStatePerPhaseData
 	Sort	   *sortnode;		/* Sort node for input ordering for phase */
 
 	ExprState  *evaltrans;		/* evaluation of transition functions  */
+
+	/* cached variants of the compiled expression */
+	ExprState  *evaltrans_cache
+				[2]		/* 0: outerops; 1: TTSOpsMinimalTuple */
+				[2];	/* 0: no NULL check; 1: with NULL check */
 }			AggStatePerPhaseData;
 
 /*
@@ -311,5 +316,8 @@ extern void ExecReScanAgg(AggState *node);
 
 extern Size hash_agg_entry_size(int numAggs, Size tupleWidth,
 								Size transitionSpace);
+extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
+								int used_bits, Size *mem_limit,
+								uint64 *ngroups_limit, int *num_partitions);
 
 #endif							/* NODEAGG_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index cd3ddf781f1..3d27d50f090 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2079,12 +2079,32 @@ typedef struct AggState
 	/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
 	bool		table_filled;	/* hash table filled yet? */
 	int			num_hashes;
+	MemoryContext	hash_metacxt;	/* memory for hash table itself */
+	struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+	struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
+										 exists only during first pass */
+	TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */
+	List	   *hash_batches;	/* hash batches remaining to be processed */
+	bool		hash_ever_spilled;	/* ever spilled during this execution? */
+	bool		hash_spill_mode;	/* we hit a limit during the current batch
+									   and we must not create new groups */
+	Size		hash_mem_limit;	/* limit before spilling hash table */
+	uint64		hash_ngroups_limit;	/* limit before spilling hash table */
+	int			hash_planned_partitions; /* number of partitions planned
+											for first pass */
+	double		hashentrysize;	/* estimate revised during execution */
+	Size		hash_mem_peak;	/* peak hash table memory usage */
+	uint64		hash_ngroups_current;	/* number of groups currently in
+										   memory in all hash tables */
+	uint64		hash_disk_used; /* kB of disk space used */
+	int			hash_batches_used;	/* batches used during entire execution */
+
 	AggStatePerHash perhash;	/* array of per-hashtable data */
 	AggStatePerGroup *hash_pergroup;	/* grouping set indexed array of
 										 * per-group pointers */
 
 	/* support for evaluation of agg input expressions: */
-#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
+#define FIELDNO_AGGSTATE_ALL_PERGROUPS 49
 	AggStatePerGroup *all_pergroups;	/* array of first ->pergroups, than
 										 * ->hash_pergroup */
 	ProjectionInfo *combinedproj;	/* projection machinery */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index cb012ba1980..735ba096503 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -54,6 +54,8 @@ extern PGDLLIMPORT bool enable_bitmapscan;
 extern PGDLLIMPORT bool enable_tidscan;
 extern PGDLLIMPORT bool enable_sort;
 extern PGDLLIMPORT bool enable_hashagg;
+extern PGDLLIMPORT bool enable_hashagg_disk;
+extern PGDLLIMPORT bool enable_groupingsets_hash_disk;
 extern PGDLLIMPORT bool enable_nestloop;
 extern PGDLLIMPORT bool enable_material;
 extern PGDLLIMPORT bool enable_mergejoin;
@@ -114,7 +116,7 @@ extern void cost_agg(Path *path, PlannerInfo *root,
 					 int numGroupCols, double numGroups,
 					 List *quals,
 					 Cost input_startup_cost, Cost input_total_cost,
-					 double input_tuples);
+					 double input_tuples, double input_width);
 extern void cost_windowagg(Path *path, PlannerInfo *root,
 						   List *windowFuncs, int numPartCols, int numOrderCols,
 						   Cost input_startup_cost, Cost input_total_cost,
diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out
index f457b5b150f..0073072a368 100644
--- a/src/test/regress/expected/aggregates.out
+++ b/src/test/regress/expected/aggregates.out
@@ -2357,3 +2357,187 @@ explain (costs off)
                ->  Seq Scan on onek
 (8 rows)
 
+--
+-- Hash Aggregation Spill tests
+--
+set enable_sort=false;
+set work_mem='64kB';
+select unique1, count(*), sum(twothousand) from tenk1
+group by unique1
+having sum(fivethous) > 4975
+order by sum(twothousand);
+ unique1 | count | sum  
+---------+-------+------
+    4976 |     1 |  976
+    4977 |     1 |  977
+    4978 |     1 |  978
+    4979 |     1 |  979
+    4980 |     1 |  980
+    4981 |     1 |  981
+    4982 |     1 |  982
+    4983 |     1 |  983
+    4984 |     1 |  984
+    4985 |     1 |  985
+    4986 |     1 |  986
+    4987 |     1 |  987
+    4988 |     1 |  988
+    4989 |     1 |  989
+    4990 |     1 |  990
+    4991 |     1 |  991
+    4992 |     1 |  992
+    4993 |     1 |  993
+    4994 |     1 |  994
+    4995 |     1 |  995
+    4996 |     1 |  996
+    4997 |     1 |  997
+    4998 |     1 |  998
+    4999 |     1 |  999
+    9976 |     1 | 1976
+    9977 |     1 | 1977
+    9978 |     1 | 1978
+    9979 |     1 | 1979
+    9980 |     1 | 1980
+    9981 |     1 | 1981
+    9982 |     1 | 1982
+    9983 |     1 | 1983
+    9984 |     1 | 1984
+    9985 |     1 | 1985
+    9986 |     1 | 1986
+    9987 |     1 | 1987
+    9988 |     1 | 1988
+    9989 |     1 | 1989
+    9990 |     1 | 1990
+    9991 |     1 | 1991
+    9992 |     1 | 1992
+    9993 |     1 | 1993
+    9994 |     1 | 1994
+    9995 |     1 | 1995
+    9996 |     1 | 1996
+    9997 |     1 | 1997
+    9998 |     1 | 1998
+    9999 |     1 | 1999
+(48 rows)
+
+set work_mem to default;
+set enable_sort to default;
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low.
+--
+set work_mem='64kB';
+-- Produce results with sorting.
+set enable_hashagg = false;
+set jit_above_cost = 0;
+explain (costs off)
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+                   QUERY PLAN                   
+------------------------------------------------
+ GroupAggregate
+   Group Key: ((g % 100000))
+   ->  Sort
+         Sort Key: ((g % 100000))
+         ->  Function Scan on generate_series g
+(5 rows)
+
+create table agg_group_1 as
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+create table agg_group_2 as
+select * from
+  (values (100), (300), (500)) as r(a),
+  lateral (
+    select (g/2)::numeric as c1,
+           array_agg(g::numeric) as c2,
+	   count(*) as c3
+    from generate_series(0, 1999) g
+    where g < r.a
+    group by g/2) as s;
+set jit_above_cost to default;
+create table agg_group_3 as
+select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+create table agg_group_4 as
+select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+-- Produce results with hash aggregation
+set enable_hashagg = true;
+set enable_sort = false;
+set jit_above_cost = 0;
+explain (costs off)
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+                QUERY PLAN                
+------------------------------------------
+ HashAggregate
+   Group Key: (g % 100000)
+   ->  Function Scan on generate_series g
+(3 rows)
+
+create table agg_hash_1 as
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+create table agg_hash_2 as
+select * from
+  (values (100), (300), (500)) as r(a),
+  lateral (
+    select (g/2)::numeric as c1,
+           array_agg(g::numeric) as c2,
+	   count(*) as c3
+    from generate_series(0, 1999) g
+    where g < r.a
+    group by g/2) as s;
+set jit_above_cost to default;
+create table agg_hash_3 as
+select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+create table agg_hash_4 as
+select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+set enable_sort = true;
+set work_mem to default;
+-- Compare group aggregation results to hash aggregation results
+(select * from agg_hash_1 except select * from agg_group_1)
+  union all
+(select * from agg_group_1 except select * from agg_hash_1);
+ c1 | c2 | c3 
+----+----+----
+(0 rows)
+
+(select * from agg_hash_2 except select * from agg_group_2)
+  union all
+(select * from agg_group_2 except select * from agg_hash_2);
+ a | c1 | c2 | c3 
+---+----+----+----
+(0 rows)
+
+(select * from agg_hash_3 except select * from agg_group_3)
+  union all
+(select * from agg_group_3 except select * from agg_hash_3);
+ c1 | c2 | c3 
+----+----+----
+(0 rows)
+
+(select * from agg_hash_4 except select * from agg_group_4)
+  union all
+(select * from agg_group_4 except select * from agg_hash_4);
+ c1 | c2 | c3 
+----+----+----
+(0 rows)
+
+drop table agg_group_1;
+drop table agg_group_2;
+drop table agg_group_3;
+drop table agg_group_4;
+drop table agg_hash_1;
+drop table agg_hash_2;
+drop table agg_hash_3;
+drop table agg_hash_4;
diff --git a/src/test/regress/expected/groupingsets.out b/src/test/regress/expected/groupingsets.out
index c1f802c88a7..dbe5140b558 100644
--- a/src/test/regress/expected/groupingsets.out
+++ b/src/test/regress/expected/groupingsets.out
@@ -1633,4 +1633,126 @@ select v||'a', case when grouping(v||'a') = 1 then 1 else 0 end, count(*)
           |    1 |     2
 (4 rows)
 
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low
+-- and turning on enable_groupingsets_hash_disk.
+--
+SET enable_groupingsets_hash_disk = true;
+SET work_mem='64kB';
+-- Produce results with sorting.
+set enable_hashagg = false;
+set jit_above_cost = 0;
+explain (costs off)
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ GroupAggregate
+   Group Key: ((g.g % 1000)), ((g.g % 100)), ((g.g % 10))
+   Group Key: ((g.g % 1000)), ((g.g % 100))
+   Group Key: ((g.g % 1000))
+   Group Key: ()
+   Sort Key: ((g.g % 100)), ((g.g % 10))
+     Group Key: ((g.g % 100)), ((g.g % 10))
+     Group Key: ((g.g % 100))
+   Sort Key: ((g.g % 10)), ((g.g % 1000))
+     Group Key: ((g.g % 10)), ((g.g % 1000))
+     Group Key: ((g.g % 10))
+   ->  Sort
+         Sort Key: ((g.g % 1000)), ((g.g % 100)), ((g.g % 10))
+         ->  Function Scan on generate_series g
+(14 rows)
+
+create table gs_group_1 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+set jit_above_cost to default;
+create table gs_group_2 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g/20 as g1000, g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by cube (g1000,g100,g10);
+create table gs_group_3 as
+select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from
+  (select g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by grouping sets (g100,g10);
+-- Produce results with hash aggregation.
+set enable_hashagg = true;
+set enable_sort = false;
+set work_mem='64kB';
+set jit_above_cost = 0;
+explain (costs off)
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+                    QUERY PLAN                     
+---------------------------------------------------
+ MixedAggregate
+   Hash Key: (g.g % 1000), (g.g % 100), (g.g % 10)
+   Hash Key: (g.g % 1000), (g.g % 100)
+   Hash Key: (g.g % 1000)
+   Hash Key: (g.g % 100), (g.g % 10)
+   Hash Key: (g.g % 100)
+   Hash Key: (g.g % 10), (g.g % 1000)
+   Hash Key: (g.g % 10)
+   Group Key: ()
+   ->  Function Scan on generate_series g
+(10 rows)
+
+create table gs_hash_1 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+set jit_above_cost to default;
+create table gs_hash_2 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g/20 as g1000, g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by cube (g1000,g100,g10);
+create table gs_hash_3 as
+select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from
+  (select g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by grouping sets (g100,g10);
+set enable_sort = true;
+set work_mem to default;
+-- Compare results
+(select * from gs_hash_1 except select * from gs_group_1)
+  union all
+(select * from gs_group_1 except select * from gs_hash_1);
+ g1000 | g100 | g10 | sum | count | max 
+-------+------+-----+-----+-------+-----
+(0 rows)
+
+(select * from gs_hash_2 except select * from gs_group_2)
+  union all
+(select * from gs_group_2 except select * from gs_hash_2);
+ g1000 | g100 | g10 | sum | count | max 
+-------+------+-----+-----+-------+-----
+(0 rows)
+
+(select g100,g10,unnest(a),c,m from gs_hash_3 except
+  select g100,g10,unnest(a),c,m from gs_group_3)
+    union all
+(select g100,g10,unnest(a),c,m from gs_group_3 except
+  select g100,g10,unnest(a),c,m from gs_hash_3);
+ g100 | g10 | unnest | c | m 
+------+-----+--------+---+---
+(0 rows)
+
+drop table gs_group_1;
+drop table gs_group_2;
+drop table gs_group_3;
+drop table gs_hash_1;
+drop table gs_hash_2;
+drop table gs_hash_3;
+SET enable_groupingsets_hash_disk TO DEFAULT;
 -- end
diff --git a/src/test/regress/expected/select_distinct.out b/src/test/regress/expected/select_distinct.out
index f3696c6d1de..11c6f50fbfa 100644
--- a/src/test/regress/expected/select_distinct.out
+++ b/src/test/regress/expected/select_distinct.out
@@ -148,6 +148,68 @@ SELECT count(*) FROM
      4
 (1 row)
 
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low.
+--
+SET work_mem='64kB';
+-- Produce results with sorting.
+SET enable_hashagg=FALSE;
+SET jit_above_cost=0;
+EXPLAIN (costs off)
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+                   QUERY PLAN                   
+------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: ((g % 1000))
+         ->  Function Scan on generate_series g
+(4 rows)
+
+CREATE TABLE distinct_group_1 AS
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+SET jit_above_cost TO DEFAULT;
+CREATE TABLE distinct_group_2 AS
+SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g;
+SET enable_hashagg=TRUE;
+-- Produce results with hash aggregation.
+SET enable_sort=FALSE;
+SET jit_above_cost=0;
+EXPLAIN (costs off)
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+                QUERY PLAN                
+------------------------------------------
+ HashAggregate
+   Group Key: (g % 1000)
+   ->  Function Scan on generate_series g
+(3 rows)
+
+CREATE TABLE distinct_hash_1 AS
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+SET jit_above_cost TO DEFAULT;
+CREATE TABLE distinct_hash_2 AS
+SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g;
+SET enable_sort=TRUE;
+SET work_mem TO DEFAULT;
+-- Compare results
+(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1)
+  UNION ALL
+(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1);
+ ?column? 
+----------
+(0 rows)
+
+(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1)
+  UNION ALL
+(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1);
+ ?column? 
+----------
+(0 rows)
+
+DROP TABLE distinct_hash_1;
+DROP TABLE distinct_hash_2;
+DROP TABLE distinct_group_1;
+DROP TABLE distinct_group_2;
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index a1c90eb9057..715842b87af 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -74,7 +74,9 @@ select name, setting from pg_settings where name like 'enable%';
 --------------------------------+---------
  enable_bitmapscan              | on
  enable_gathermerge             | on
+ enable_groupingsets_hash_disk  | off
  enable_hashagg                 | on
+ enable_hashagg_disk            | on
  enable_hashjoin                | on
  enable_indexonlyscan           | on
  enable_indexscan               | on
@@ -89,7 +91,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(17 rows)
+(19 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql
index 3e593f2d615..02578330a6f 100644
--- a/src/test/regress/sql/aggregates.sql
+++ b/src/test/regress/sql/aggregates.sql
@@ -1032,3 +1032,134 @@ select v||'a', case when v||'a' = 'aa' then 1 else 0 end, count(*)
 explain (costs off)
   select 1 from tenk1
    where (hundred, thousand) in (select twothousand, twothousand from onek);
+
+--
+-- Hash Aggregation Spill tests
+--
+
+set enable_sort=false;
+set work_mem='64kB';
+
+select unique1, count(*), sum(twothousand) from tenk1
+group by unique1
+having sum(fivethous) > 4975
+order by sum(twothousand);
+
+set work_mem to default;
+set enable_sort to default;
+
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low.
+--
+
+set work_mem='64kB';
+
+-- Produce results with sorting.
+
+set enable_hashagg = false;
+
+set jit_above_cost = 0;
+
+explain (costs off)
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+
+create table agg_group_1 as
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+
+create table agg_group_2 as
+select * from
+  (values (100), (300), (500)) as r(a),
+  lateral (
+    select (g/2)::numeric as c1,
+           array_agg(g::numeric) as c2,
+	   count(*) as c3
+    from generate_series(0, 1999) g
+    where g < r.a
+    group by g/2) as s;
+
+set jit_above_cost to default;
+
+create table agg_group_3 as
+select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+
+create table agg_group_4 as
+select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+
+-- Produce results with hash aggregation
+
+set enable_hashagg = true;
+set enable_sort = false;
+
+set jit_above_cost = 0;
+
+explain (costs off)
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+
+create table agg_hash_1 as
+select g%100000 as c1, sum(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 199999) g
+  group by g%100000;
+
+create table agg_hash_2 as
+select * from
+  (values (100), (300), (500)) as r(a),
+  lateral (
+    select (g/2)::numeric as c1,
+           array_agg(g::numeric) as c2,
+	   count(*) as c3
+    from generate_series(0, 1999) g
+    where g < r.a
+    group by g/2) as s;
+
+set jit_above_cost to default;
+
+create table agg_hash_3 as
+select (g/2)::numeric as c1, sum(7::int4) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+
+create table agg_hash_4 as
+select (g/2)::numeric as c1, array_agg(g::numeric) as c2, count(*) as c3
+  from generate_series(0, 1999) g
+  group by g/2;
+
+set enable_sort = true;
+set work_mem to default;
+
+-- Compare group aggregation results to hash aggregation results
+
+(select * from agg_hash_1 except select * from agg_group_1)
+  union all
+(select * from agg_group_1 except select * from agg_hash_1);
+
+(select * from agg_hash_2 except select * from agg_group_2)
+  union all
+(select * from agg_group_2 except select * from agg_hash_2);
+
+(select * from agg_hash_3 except select * from agg_group_3)
+  union all
+(select * from agg_group_3 except select * from agg_hash_3);
+
+(select * from agg_hash_4 except select * from agg_group_4)
+  union all
+(select * from agg_group_4 except select * from agg_hash_4);
+
+drop table agg_group_1;
+drop table agg_group_2;
+drop table agg_group_3;
+drop table agg_group_4;
+drop table agg_hash_1;
+drop table agg_hash_2;
+drop table agg_hash_3;
+drop table agg_hash_4;
diff --git a/src/test/regress/sql/groupingsets.sql b/src/test/regress/sql/groupingsets.sql
index 95ac3fb52f6..478f49ecab5 100644
--- a/src/test/regress/sql/groupingsets.sql
+++ b/src/test/regress/sql/groupingsets.sql
@@ -441,4 +441,107 @@ select v||'a', case when grouping(v||'a') = 1 then 1 else 0 end, count(*)
   from unnest(array[1,1], array['a','b']) u(i,v)
  group by rollup(i, v||'a') order by 1,3;
 
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low
+-- and turning on enable_groupingsets_hash_disk.
+--
+
+SET enable_groupingsets_hash_disk = true;
+SET work_mem='64kB';
+
+-- Produce results with sorting.
+
+set enable_hashagg = false;
+
+set jit_above_cost = 0;
+
+explain (costs off)
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+
+create table gs_group_1 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+
+set jit_above_cost to default;
+
+create table gs_group_2 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g/20 as g1000, g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by cube (g1000,g100,g10);
+
+create table gs_group_3 as
+select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from
+  (select g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by grouping sets (g100,g10);
+
+-- Produce results with hash aggregation.
+
+set enable_hashagg = true;
+set enable_sort = false;
+set work_mem='64kB';
+
+set jit_above_cost = 0;
+
+explain (costs off)
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+
+create table gs_hash_1 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g%1000 as g1000, g%100 as g100, g%10 as g10, g
+   from generate_series(0,199999) g) s
+group by cube (g1000,g100,g10);
+
+set jit_above_cost to default;
+
+create table gs_hash_2 as
+select g1000, g100, g10, sum(g::numeric), count(*), max(g::text) from
+  (select g/20 as g1000, g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by cube (g1000,g100,g10);
+
+create table gs_hash_3 as
+select g100, g10, array_agg(g) as a, count(*) as c, max(g::text) as m from
+  (select g/200 as g100, g/2000 as g10, g
+   from generate_series(0,19999) g) s
+group by grouping sets (g100,g10);
+
+set enable_sort = true;
+set work_mem to default;
+
+-- Compare results
+
+(select * from gs_hash_1 except select * from gs_group_1)
+  union all
+(select * from gs_group_1 except select * from gs_hash_1);
+
+(select * from gs_hash_2 except select * from gs_group_2)
+  union all
+(select * from gs_group_2 except select * from gs_hash_2);
+
+(select g100,g10,unnest(a),c,m from gs_hash_3 except
+  select g100,g10,unnest(a),c,m from gs_group_3)
+    union all
+(select g100,g10,unnest(a),c,m from gs_group_3 except
+  select g100,g10,unnest(a),c,m from gs_hash_3);
+
+drop table gs_group_1;
+drop table gs_group_2;
+drop table gs_group_3;
+drop table gs_hash_1;
+drop table gs_hash_2;
+drop table gs_hash_3;
+
+SET enable_groupingsets_hash_disk TO DEFAULT;
+
 -- end
diff --git a/src/test/regress/sql/select_distinct.sql b/src/test/regress/sql/select_distinct.sql
index a605e86449e..33102744ebf 100644
--- a/src/test/regress/sql/select_distinct.sql
+++ b/src/test/regress/sql/select_distinct.sql
@@ -45,6 +45,68 @@ SELECT count(*) FROM
 SELECT count(*) FROM
   (SELECT DISTINCT two, four, two FROM tenk1) ss;
 
+--
+-- Compare results between plans using sorting and plans using hash
+-- aggregation. Force spilling in both cases by setting work_mem low.
+--
+
+SET work_mem='64kB';
+
+-- Produce results with sorting.
+
+SET enable_hashagg=FALSE;
+
+SET jit_above_cost=0;
+
+EXPLAIN (costs off)
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+
+CREATE TABLE distinct_group_1 AS
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+
+SET jit_above_cost TO DEFAULT;
+
+CREATE TABLE distinct_group_2 AS
+SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g;
+
+SET enable_hashagg=TRUE;
+
+-- Produce results with hash aggregation.
+
+SET enable_sort=FALSE;
+
+SET jit_above_cost=0;
+
+EXPLAIN (costs off)
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+
+CREATE TABLE distinct_hash_1 AS
+SELECT DISTINCT g%1000 FROM generate_series(0,9999) g;
+
+SET jit_above_cost TO DEFAULT;
+
+CREATE TABLE distinct_hash_2 AS
+SELECT DISTINCT (g%1000)::text FROM generate_series(0,9999) g;
+
+SET enable_sort=TRUE;
+
+SET work_mem TO DEFAULT;
+
+-- Compare results
+
+(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1)
+  UNION ALL
+(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1);
+
+(SELECT * FROM distinct_hash_1 EXCEPT SELECT * FROM distinct_group_1)
+  UNION ALL
+(SELECT * FROM distinct_group_1 EXCEPT SELECT * FROM distinct_hash_1);
+
+DROP TABLE distinct_hash_1;
+DROP TABLE distinct_hash_2;
+DROP TABLE distinct_group_1;
+DROP TABLE distinct_group_2;
+
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.

Reply via email to