On Mon, 2019-07-01 at 12:13 -0700, Jeff Davis wrote:
> This is for design review. I have a patch (WIP) for Approach 1, and
> if
> this discussion starts to converge on that approach I will polish and
> post it.
WIP patch attached (based on 9a81c9fa); targeting September CF.
Not intended for detailed review yet, but it seems to work in enough
cases (including grouping sets and JIT) to be a good proof-of-concept
for the algorithm and its complexity.
Initial performance numbers put it at 2X slower than sort for grouping
10M distinct integers. There are quite a few optimizations I haven't
tried yet and quite a few tunables I haven't tuned yet, so hopefully I
can close the gap a bit for the small-groups case.
I will offer more details soon when I have more confidence in the
numbers.
It does not attempt to spill ARRAY_AGG at all yet.
Regards,
Jeff Davis
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 84341a30e5..9f978e5a90 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1702,6 +1702,23 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
+ <varlistentry id="guc-hashagg-mem-overflow" xreflabel="hashagg_mem_overflow">
+ <term><varname>hashagg_mem_overflow</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>hashagg_mem_overflow</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ If hash aggregation exceeds <varname>work_mem</varname> at query
+ execution time, and <varname>hashagg_mem_overflow</varname> is set
+ to <literal>on</literal>, continue consuming more memory rather than
+ performing disk-based hash aggregation. The default
+ is <literal>off</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
<term><varname>max_stack_depth</varname> (<type>integer</type>)
<indexterm>
@@ -4354,6 +4371,24 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
+ <varlistentry id="guc-enable-hashagg-spill" xreflabel="enable_hashagg_spill">
+ <term><varname>enable_hashagg_spill</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_hashagg_spill</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of hashed aggregation plan
+ types when the memory usage is expected to
+ exceed <varname>work_mem</varname>. This only affects the planner
+ choice; actual behavior at execution time is dictated by
+ <xref linkend="guc-hashagg-mem-overflow"/>. The default
+ is <literal>on</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-hashjoin" xreflabel="enable_hashjoin">
<term><varname>enable_hashjoin</varname> (<type>boolean</type>)
<indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 92969636b7..6d6481a75f 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -102,6 +102,7 @@ static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
List *ancestors, ExplainState *es);
static void show_sort_info(SortState *sortstate, ExplainState *es);
static void show_hash_info(HashState *hashstate, ExplainState *es);
+static void show_hashagg_info(AggState *hashstate, ExplainState *es);
static void show_tidbitmap_info(BitmapHeapScanState *planstate,
ExplainState *es);
static void show_instrumentation_count(const char *qlabel, int which,
@@ -1826,6 +1827,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_Agg:
show_agg_keys(castNode(AggState, planstate), ancestors, es);
show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
+ show_hashagg_info((AggState *) planstate, es);
if (plan->qual)
show_instrumentation_count("Rows Removed by Filter", 1,
planstate, es);
@@ -2715,6 +2717,55 @@ show_hash_info(HashState *hashstate, ExplainState *es)
}
}
+/*
+ * Show information on hash aggregate buckets and batches
+ */
+static void
+show_hashagg_info(AggState *aggstate, ExplainState *es)
+{
+ Agg *agg = (Agg *)aggstate->ss.ps.plan;
+ long memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
+ long diskKb = (aggstate->hash_disk_used + 1023) / 1024;
+
+
+ Assert(IsA(aggstate, AggState));
+
+ if (agg->aggstrategy != AGG_HASHED &&
+ agg->aggstrategy != AGG_MIXED)
+ return;
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfo(
+ es->str,
+ "Memory Usage: %ldkB",
+ memPeakKb);
+
+ if (aggstate->hash_batches_used > 0)
+ {
+ appendStringInfo(
+ es->str,
+ " Batches: %d Disk Usage:%ldkB",
+ aggstate->hash_batches_used, diskKb);
+ }
+
+ appendStringInfo(
+ es->str,
+ "\n");
+ }
+ else
+ {
+ ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
+ if (aggstate->hash_batches_used > 0)
+ {
+ ExplainPropertyInteger("HashAgg Batches", NULL,
+ aggstate->hash_batches_used, es);
+ ExplainPropertyInteger("Disk Usage", "kB", diskKb, es);
+ }
+ }
+}
+
/*
* If it's EXPLAIN ANALYZE, show exact/lossy pages for a BitmapHeapScan node
*/
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 66a67c72b2..19e1127627 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -1570,7 +1570,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
[op->d.agg_init_trans.transno];
/* If transValue has not yet been initialized, do so now. */
- if (pergroup->noTransValue)
+ if (pergroup != NULL && pergroup->noTransValue)
{
AggStatePerTrans pertrans = op->d.agg_init_trans.pertrans;
@@ -1597,7 +1597,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
[op->d.agg_strict_trans_check.setoff]
[op->d.agg_strict_trans_check.transno];
- if (unlikely(pergroup->transValueIsNull))
+ if (pergroup == NULL ||
+ unlikely(pergroup->transValueIsNull))
EEO_JUMP(op->d.agg_strict_trans_check.jumpnull);
EEO_NEXT();
@@ -1624,6 +1625,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
[op->d.agg_trans.setoff]
[op->d.agg_trans.transno];
+ if (pergroup == NULL)
+ EEO_NEXT();
+
Assert(pertrans->transtypeByVal);
fcinfo = pertrans->transfn_fcinfo;
@@ -1675,6 +1679,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
[op->d.agg_trans.setoff]
[op->d.agg_trans.transno];
+ if (pergroup == NULL)
+ EEO_NEXT();
+
Assert(!pertrans->transtypeByVal);
fcinfo = pertrans->transfn_fcinfo;
diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index 14ee8db3f9..91714664d6 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -25,7 +25,6 @@
#include "utils/hashutils.h"
#include "utils/memutils.h"
-static uint32 TupleHashTableHash(struct tuplehash_hash *tb, const MinimalTuple tuple);
static int TupleHashTableMatch(struct tuplehash_hash *tb, const MinimalTuple tuple1, const MinimalTuple tuple2);
/*
@@ -371,17 +370,12 @@ FindTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
/*
* Compute the hash value for a tuple
*
- * The passed-in key is a pointer to TupleHashEntryData. In an actual hash
- * table entry, the firstTuple field points to a tuple (in MinimalTuple
- * format). LookupTupleHashEntry sets up a dummy TupleHashEntryData with a
- * NULL firstTuple field --- that cues us to look at the inputslot instead.
- * This convention avoids the need to materialize virtual input tuples unless
- * they actually need to get copied into the table.
+ * If tuple is NULL, use the input slot instead.
*
* Also, the caller must select an appropriate memory context for running
* the hash functions. (dynahash.c doesn't change CurrentMemoryContext.)
*/
-static uint32
+uint32
TupleHashTableHash(struct tuplehash_hash *tb, const MinimalTuple tuple)
{
TupleHashTable hashtable = (TupleHashTable) tb->private_data;
@@ -402,9 +396,6 @@ TupleHashTableHash(struct tuplehash_hash *tb, const MinimalTuple tuple)
{
/*
* Process a tuple already stored in the table.
- *
- * (this case never actually occurs due to the way simplehash.h is
- * used, as the hash-value is stored in the entries)
*/
slot = hashtable->tableslot;
ExecStoreMinimalTuple(tuple, slot, false);
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 6b8ef40599..1548de220e 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -229,14 +229,40 @@
#include "optimizer/optimizer.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
+#include "storage/buffile.h"
#include "utils/acl.h"
#include "utils/builtins.h"
+#include "utils/dynahash.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
#include "utils/datum.h"
+/*
+ * Represents partitioned spill data for a single hashtable.
+ */
+typedef struct HashAggSpill
+{
+ int n_partitions; /* number of output partitions */
+ int partition_bits; /* number of bits for partition mask
+ log2(n_partitions) parent partition bits */
+ BufFile **partitions; /* output partition files */
+ int64 *ntuples; /* number of tuples in each partition */
+} HashAggSpill;
+
+/*
+ * Represents work to be done for one pass of hash aggregation. Initially,
+ * only the input fields are set. If spilled to disk, also set the spill data.
+ */
+typedef struct HashAggBatch
+{
+ BufFile *input_file; /* input partition */
+ int input_bits; /* number of bits for input partition mask */
+ int64 input_groups; /* estimated number of input groups */
+ int setno; /* grouping set */
+ HashAggSpill spill; /* spill output */
+} HashAggBatch;
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
static void initialize_phase(AggState *aggstate, int newphase);
@@ -272,11 +298,24 @@ static TupleTableSlot *project_aggregates(AggState *aggstate);
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
static void build_hash_table(AggState *aggstate);
-static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
+static AggStatePerGroup lookup_hash_entry(AggState *aggstate);
static void lookup_hash_entries(AggState *aggstate);
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
static void agg_fill_hash_table(AggState *aggstate);
+static bool agg_refill_hash_table(AggState *aggstate);
static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
+static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
+static Size hash_spill_tuple(TupleHashTable hashtable, HashAggSpill *spill,
+ int input_bits, TupleTableSlot *slot);
+static void hash_spill_tuples(AggState *aggstate, TupleTableSlot *slot);
+static MinimalTuple hash_read_spilled(BufFile *file);
+static HashAggBatch *hash_batch_new(BufFile *input_file, int setno,
+ int64 input_groups, int input_bits);
+static void hash_finish_initial_spills(AggState *aggstate);
+static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill,
+ int setno, int input_bits);
+static void hash_reset_spill(HashAggSpill *spill);
+static void hash_reset_spills(AggState *aggstate);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
@@ -1269,6 +1308,10 @@ build_hash_table(AggState *aggstate)
Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
+ /* TODO: work harder to find a good nGroups for each hash table. We don't
+ * want the hash table itself to fill up work_mem with no room for
+ * out-of-line transition values. Also, we need to consider that there are
+ * multiple hash tables for grouping sets. */
additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
for (i = 0; i < aggstate->num_hashes; ++i)
@@ -1294,6 +1337,15 @@ build_hash_table(AggState *aggstate)
tmpmem,
DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
}
+
+ /*
+ * Set initial size to be that of an empty hash table. This ensures that
+ * at least one entry can be added before it exceeds work_mem; otherwise
+ * the algorithm might not make progress.
+ */
+ aggstate->hash_mem_init = MemoryContextMemAllocated(
+ aggstate->hashcontext->ecxt_per_tuple_memory, true);
+ aggstate->hash_mem_current = aggstate->hash_mem_init;
}
/*
@@ -1462,14 +1514,14 @@ hash_agg_entry_size(int numAggs)
*
* When called, CurrentMemoryContext should be the per-query context.
*/
-static TupleHashEntryData *
+static AggStatePerGroup
lookup_hash_entry(AggState *aggstate)
{
TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
TupleTableSlot *hashslot = perhash->hashslot;
TupleHashEntryData *entry;
- bool isnew;
+ bool isnew = false;
int i;
/* transfer just the needed columns into hashslot */
@@ -1486,12 +1538,26 @@ lookup_hash_entry(AggState *aggstate)
ExecStoreVirtualTuple(hashslot);
/* find or create the hashtable entry using the filtered tuple */
- entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
+ if (aggstate->hash_can_spill &&
+ aggstate->hash_mem_current > work_mem * 1024L &&
+ aggstate->hash_mem_current > aggstate->hash_mem_init)
+ entry = LookupTupleHashEntry(perhash->hashtable, hashslot, NULL);
+ else
+ entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
+
+ if (entry == NULL)
+ return NULL;
if (isnew)
{
- AggStatePerGroup pergroup;
- int transno;
+ AggStatePerGroup pergroup;
+ int transno;
+
+ aggstate->hash_mem_current = MemoryContextMemAllocated(
+ aggstate->hashcontext->ecxt_per_tuple_memory, true);
+
+ if (aggstate->hash_mem_current > aggstate->hash_mem_peak)
+ aggstate->hash_mem_peak = aggstate->hash_mem_current;
pergroup = (AggStatePerGroup)
MemoryContextAlloc(perhash->hashtable->tablecxt,
@@ -1511,7 +1577,7 @@ lookup_hash_entry(AggState *aggstate)
}
}
- return entry;
+ return entry->additional;
}
/*
@@ -1519,6 +1585,8 @@ lookup_hash_entry(AggState *aggstate)
* returning an array of pergroup pointers suitable for advance_aggregates.
*
* Be aware that lookup_hash_entry can reset the tmpcontext.
+ *
+ * Return false if hash table has exceeded its memory limit.
*/
static void
lookup_hash_entries(AggState *aggstate)
@@ -1530,7 +1598,7 @@ lookup_hash_entries(AggState *aggstate)
for (setno = 0; setno < numHashes; setno++)
{
select_current_set(aggstate, setno, true);
- pergroup[setno] = lookup_hash_entry(aggstate)->additional;
+ pergroup[setno] = lookup_hash_entry(aggstate);
}
}
@@ -1841,6 +1909,8 @@ agg_retrieve_direct(AggState *aggstate)
aggstate->current_phase == 1)
{
lookup_hash_entries(aggstate);
+ hash_spill_tuples(
+ aggstate, aggstate->tmpcontext->ecxt_outertuple);
}
/* Advance the aggregates (or combine functions) */
@@ -1852,6 +1922,10 @@ agg_retrieve_direct(AggState *aggstate)
outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot))
{
+ if (aggstate->aggstrategy == AGG_MIXED &&
+ aggstate->current_phase == 1)
+ hash_finish_initial_spills(aggstate);
+
/* no more outer-plan tuples available */
if (hasGroupingSets)
{
@@ -1944,6 +2018,7 @@ agg_fill_hash_table(AggState *aggstate)
/* Find or build hashtable entries */
lookup_hash_entries(aggstate);
+ hash_spill_tuples(aggstate, outerslot);
/* Advance the aggregates (or combine functions) */
advance_aggregates(aggstate);
@@ -1955,6 +2030,8 @@ agg_fill_hash_table(AggState *aggstate)
ResetExprContext(aggstate->tmpcontext);
}
+ hash_finish_initial_spills(aggstate);
+
aggstate->table_filled = true;
/* Initialize to walk the first hash table */
select_current_set(aggstate, 0, true);
@@ -1962,11 +2039,125 @@ agg_fill_hash_table(AggState *aggstate)
&aggstate->perhash[0].hashiter);
}
+/*
+ * If any data was spilled during hash aggregation, reset the hash table and
+ * reprocess one batch of spilled data. After reprocessing a batch, the hash
+ * table will again contain data, ready to be consumed by
+ * agg_retrieve_hash_table_in_memory().
+ *
+ * Should only be called after all in memory hash table entries have been
+ * consumed.
+ *
+ * Return false when input is exhausted and there's no more work to be done;
+ * otherwise return true.
+ */
+static bool
+agg_refill_hash_table(AggState *aggstate)
+{
+ HashAggBatch *batch;
+
+ if (aggstate->hash_batches == NIL)
+ return false;
+
+ /* free memory */
+ ReScanExprContext(aggstate->hashcontext);
+ /* Rebuild an empty hash table */
+ build_hash_table(aggstate);
+
+ batch = linitial(aggstate->hash_batches);
+ aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
+
+ Assert(aggstate->current_phase == 0);
+
+ if (aggstate->phase->aggstrategy == AGG_MIXED)
+ {
+ aggstate->current_phase = 1;
+ aggstate->phase = &aggstate->phases[aggstate->current_phase];
+ }
+
+ for (;;) {
+ TupleTableSlot *slot = aggstate->hash_spill_slot;
+ MinimalTuple tuple;
+
+ CHECK_FOR_INTERRUPTS();
+
+ tuple = hash_read_spilled(batch->input_file);
+ if (tuple == NULL)
+ break;
+
+ /*
+ * TODO: Should we re-compile the expressions to use a minimal tuple
+ * slot so that we don't have to create the virtual tuple here? If we
+ * project the tuple before writing, then perhaps this is not
+ * important.
+ */
+ ExecForceStoreMinimalTuple(tuple, slot, true);
+ aggstate->tmpcontext->ecxt_outertuple = slot;
+
+ /* Find or build hashtable entries */
+ memset(aggstate->hash_pergroup, 0,
+ sizeof(AggStatePerGroup) * aggstate->num_hashes);
+ select_current_set(aggstate, batch->setno, true);
+ aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(aggstate);
+ if (aggstate->hash_pergroup[batch->setno] == NULL)
+ aggstate->hash_disk_used += hash_spill_tuple(
+ aggstate->perhash[batch->setno].hashtable,
+ &batch->spill, batch->input_bits, slot);
+
+ /* Advance the aggregates (or combine functions) */
+ advance_aggregates(aggstate);
+
+ /*
+ * Reset per-input-tuple context after each tuple, but note that the
+ * hash lookups do this too
+ */
+ ResetExprContext(aggstate->tmpcontext);
+ }
+
+ BufFileClose(batch->input_file);
+
+ aggstate->current_phase = 0;
+ aggstate->phase = &aggstate->phases[aggstate->current_phase];
+
+ hash_spill_finish(aggstate, &batch->spill, batch->setno,
+ batch->input_bits);
+
+ pfree(batch);
+
+ /* Initialize to walk the first hash table */
+ select_current_set(aggstate, 0, true);
+ ResetTupleHashIterator(aggstate->perhash[0].hashtable,
+ &aggstate->perhash[0].hashiter);
+
+ return true;
+}
+
/*
* ExecAgg for hashed case: retrieving groups from hash table
*/
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
+{
+ TupleTableSlot *result = NULL;
+
+ while (result == NULL)
+ {
+ result = agg_retrieve_hash_table_in_memory(aggstate);
+ if (result == NULL)
+ {
+ if (!agg_refill_hash_table(aggstate))
+ {
+ aggstate->agg_done = true;
+ break;
+ }
+ }
+ }
+
+ return result;
+}
+
+static TupleTableSlot *
+agg_retrieve_hash_table_in_memory(AggState *aggstate)
{
ExprContext *econtext;
AggStatePerAgg peragg;
@@ -1995,7 +2186,7 @@ agg_retrieve_hash_table(AggState *aggstate)
* We loop retrieving groups until we find one satisfying
* aggstate->ss.ps.qual
*/
- while (!aggstate->agg_done)
+ for (;;)
{
TupleTableSlot *hashslot = perhash->hashslot;
int i;
@@ -2026,8 +2217,6 @@ agg_retrieve_hash_table(AggState *aggstate)
}
else
{
- /* No more hashtables, so done */
- aggstate->agg_done = true;
return NULL;
}
}
@@ -2084,6 +2273,296 @@ agg_retrieve_hash_table(AggState *aggstate)
return NULL;
}
+/*
+ * hash_spill_tuple
+ *
+ * Not enough memory to add tuple as new entry in hash table. Save for later
+ * in the appropriate partition.
+ */
+static Size
+hash_spill_tuple(TupleHashTable hashtable, HashAggSpill *spill,
+ int input_bits, TupleTableSlot *slot)
+{
+ int partition;
+ MinimalTuple tuple;
+ BufFile *file;
+ int written;
+ uint32 hashvalue;
+ bool shouldFree;
+
+ /* initialize output partitions */
+ if (spill->partitions == NULL)
+ {
+ int npartitions;
+ int partition_bits;
+
+ /*TODO: be smarter */
+ npartitions = 32;
+
+ partition_bits = my_log2(npartitions);
+
+ /* make sure that we don't exhaust the hash bits */
+ if (partition_bits + input_bits >= 32)
+ partition_bits = 32 - input_bits;
+
+ /* number of partitions will be a power of two */
+ npartitions = 1L << partition_bits;
+
+ spill->partition_bits = partition_bits;
+ spill->n_partitions = npartitions;
+ spill->partitions = palloc0(sizeof(BufFile *) * npartitions);
+ spill->ntuples = palloc0(sizeof(int64) * npartitions);
+ }
+
+ /*
+ * TODO: should we project only needed attributes from the tuple before
+ * writing it?
+ */
+ tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+ /*
+ * TODO: should we store the hash along with the tuple to avoid
+ * calculating the hash value multiple times?
+ */
+ hashvalue = TupleHashTableHash(hashtable->hashtab, tuple);
+
+ if (spill->partition_bits == 0)
+ partition = 0;
+ else
+ partition = (hashvalue << input_bits) >>
+ (32 - spill->partition_bits);
+
+ spill->ntuples[partition]++;
+
+ /*
+ * TODO: use logtape.c instead?
+ */
+ if (spill->partitions[partition] == NULL)
+ spill->partitions[partition] = BufFileCreateTemp(false);
+ file = spill->partitions[partition];
+
+ written = BufFileWrite(file, (void *) tuple, tuple->t_len);
+ if (written != tuple->t_len)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to HashAgg temporary file: %m")));
+
+ if (shouldFree)
+ pfree(tuple);
+
+ return written;
+}
+
+/*
+ * hash_spill_tuple
+ *
+ * Not enough memory to add tuple as new entry in hash table. Save for later
+ * in the appropriate partition.
+ */
+static void
+hash_spill_tuples(AggState *aggstate, TupleTableSlot *slot)
+{
+ int setno;
+
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
+ {
+ AggStatePerGroup pergroup = aggstate->hash_pergroup[setno];
+ AggStatePerHash perhash = &aggstate->perhash[setno];
+ HashAggSpill *spill;
+
+ if (pergroup == NULL)
+ {
+ if (aggstate->hash_spills == NULL)
+ aggstate->hash_spills = palloc0(
+ sizeof(HashAggSpill) * aggstate->num_hashes);
+ aggstate->hash_spilled = true;
+
+ spill = &aggstate->hash_spills[setno];
+
+ aggstate->hash_disk_used += hash_spill_tuple(
+ perhash->hashtable, spill, 0, slot);
+ }
+ }
+}
+
+/*
+ * read_spilled_tuple
+ * read the next tuple from a batch file. Return NULL if no more.
+ */
+static MinimalTuple
+hash_read_spilled(BufFile *file)
+{
+ MinimalTuple tuple;
+ uint32 t_len;
+ size_t nread;
+
+ nread = BufFileRead(file, &t_len, sizeof(t_len));
+ if (nread == 0)
+ return NULL;
+ if (nread != sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from HashAgg temporary file: %m")));
+
+ tuple = (MinimalTuple) palloc(t_len);
+ tuple->t_len = t_len;
+
+ nread = BufFileRead(file, (void *)((char *)tuple + sizeof(uint32)),
+ t_len - sizeof(uint32));
+ if (nread != t_len - sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from HashAgg temporary file: %m")));
+
+ return tuple;
+}
+
+/*
+ * new_hashagg_batch
+ *
+ * Construct a HashAggBatch item, which represents one iteration of HashAgg to
+ * be done. Should be called in the aggregate's memory context.
+ */
+static HashAggBatch *
+hash_batch_new(BufFile *input_file, int setno, int64 input_groups,
+ int input_bits)
+{
+ HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
+
+ batch->input_file = input_file;
+ batch->input_bits = input_bits;
+ batch->input_groups = input_groups;
+ batch->setno = setno;
+
+ /* batch->spill will be set only after spilling this batch */
+
+ return batch;
+}
+
+/*
+ * hash_finish_initial_spills
+ *
+ * After a HashAggBatch has been processed, it may have spilled tuples to
+ * disk. If so, turn the spilled partitions into new batches that must later
+ * be executed.
+ */
+static void
+hash_finish_initial_spills(AggState *aggstate)
+{
+ int setno;
+
+ if (aggstate->hash_spills == NULL)
+ return;
+
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
+ hash_spill_finish(aggstate, &aggstate->hash_spills[setno], setno, 0);
+
+ pfree(aggstate->hash_spills);
+ aggstate->hash_spills = NULL;
+}
+
+/*
+ * hash_spill_finish
+ *
+ *
+ */
+static void
+hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_bits)
+{
+ int i;
+
+ if (spill->n_partitions == 0)
+ return; /* didn't spill */
+
+ for (i = 0; i < spill->n_partitions; i++)
+ {
+ BufFile *file = spill->partitions[i];
+ MemoryContext oldContext;
+ HashAggBatch *new_batch;
+ int64 input_ngroups;
+
+ /* partition is empty */
+ if (file == NULL)
+ continue;
+
+ /* rewind file for reading */
+ if (BufFileSeek(file, 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind HashAgg temporary file: %m")));
+
+ /*
+ * Estimate the number of input groups for this new work item as the
+ * total number of tuples in its input file. Although that's a worst
+ * case, it's not bad here for two reasons: (1) overestimating is
+ * better than underestimating; and (2) we've already scanned the
+ * relation once, so it's likely that we've already finalized many of
+ * the common values.
+ */
+ input_ngroups = spill->ntuples[i];
+
+ oldContext = MemoryContextSwitchTo(aggstate->ss.ps.state->es_query_cxt);
+ new_batch = hash_batch_new(file, setno, input_ngroups,
+ spill->partition_bits + input_bits);
+ aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
+ aggstate->hash_batches_used++;
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ pfree(spill->ntuples);
+ pfree(spill->partitions);
+}
+
+/*
+ * Clear a HashAggSpill, free its memory, and close its files.
+ */
+static void
+hash_reset_spill(HashAggSpill *spill)
+{
+ int i;
+ for (i = 0; i < spill->n_partitions; i++)
+ {
+ BufFile *file = spill->partitions[i];
+
+ if (file != NULL)
+ BufFileClose(file);
+ }
+ if (spill->ntuples != NULL)
+ pfree(spill->ntuples);
+ if (spill->partitions != NULL)
+ pfree(spill->partitions);
+}
+
+/*
+ * Find and reset all active HashAggSpills.
+ */
+static void
+hash_reset_spills(AggState *aggstate)
+{
+ ListCell *lc;
+
+ if (aggstate->hash_spills != NULL)
+ {
+ int setno;
+
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
+ hash_reset_spill(&aggstate->hash_spills[setno]);
+
+ pfree(aggstate->hash_spills);
+ aggstate->hash_spills = NULL;
+ }
+
+ foreach(lc, aggstate->hash_batches)
+ {
+ HashAggBatch *batch = (HashAggBatch*) lfirst(lc);
+ hash_reset_spill(&batch->spill);
+ pfree(batch);
+ }
+ list_free(aggstate->hash_batches);
+ aggstate->hash_batches = NIL;
+}
+
+
/* -----------------
* ExecInitAgg
*
@@ -2213,6 +2692,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
{
ExecAssignExprContext(estate, &aggstate->ss.ps);
aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
+
+ /* will set to false if there are aggs with transtype == INTERNALOID */
+ if (!hashagg_mem_overflow)
+ aggstate->hash_can_spill = true;
}
ExecAssignExprContext(estate, &aggstate->ss.ps);
@@ -2238,6 +2721,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
&TTSOpsMinimalTuple);
+ if (use_hashing)
+ aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc,
+ &TTSOpsVirtual);
+
/*
* Initialize result type, slot and projection.
*/
@@ -2661,6 +3148,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
elog(ERROR, "deserialfunc not provided for deserialization aggregation");
deserialfn_oid = aggform->aggdeserialfn;
}
+
+ aggstate->hash_can_spill = false;
}
/* Check that aggregate owner has permission to call component fns */
@@ -3368,6 +3857,8 @@ ExecEndAgg(AggState *node)
if (node->sort_out)
tuplesort_end(node->sort_out);
+ hash_reset_spills(node);
+
for (transno = 0; transno < node->numtrans; transno++)
{
AggStatePerTrans pertrans = &node->pertrans[transno];
@@ -3423,12 +3914,13 @@ ExecReScanAgg(AggState *node)
return;
/*
- * If we do have the hash table, and the subplan does not have any
- * parameter changes, and none of our own parameter changes affect
- * input expressions of the aggregated functions, then we can just
- * rescan the existing hash table; no need to build it again.
+ * If we do have the hash table, and it never spilled, and the subplan
+ * does not have any parameter changes, and none of our own parameter
+ * changes affect input expressions of the aggregated functions, then
+ * we can just rescan the existing hash table; no need to build it
+ * again.
*/
- if (outerPlan->chgParam == NULL &&
+ if (outerPlan->chgParam == NULL && !node->hash_spilled &&
!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
{
ResetTupleHashIterator(node->perhash[0].hashtable,
@@ -3485,6 +3977,16 @@ ExecReScanAgg(AggState *node)
*/
if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
{
+ hash_reset_spills(node);
+
+ /* reset stats */
+ node->hash_spilled = false;
+ node->hash_mem_init = 0;
+ node->hash_mem_peak = 0;
+ node->hash_mem_current = 0;
+ node->hash_disk_used = 0;
+ node->hash_batches_used = 0;
+
ReScanExprContext(node->hashcontext);
/* Rebuild an empty hash table */
build_hash_table(node);
diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index 30133634c7..6dc175eabf 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -2100,6 +2100,7 @@ llvm_compile_expr(ExprState *state)
LLVMValueRef v_notransvalue;
+ LLVMBasicBlockRef b_check_notransvalue;
LLVMBasicBlockRef b_init;
aggstate = op->d.agg_init_trans.aggstate;
@@ -2126,6 +2127,19 @@ llvm_compile_expr(ExprState *state)
l_load_gep1(b, v_allpergroupsp, v_setoff, ""),
&v_transno, 1, "");
+ b_check_notransvalue = l_bb_before_v(
+ opblocks[i + 1], "op.%d.check_notransvalue", i);
+
+ LLVMBuildCondBr(b,
+ LLVMBuildICmp(b, LLVMIntEQ,
+ LLVMBuildPtrToInt(b, v_pergroupp,
+ TypeSizeT, ""),
+ l_sizet_const(0), ""),
+ opblocks[i + 1],
+ b_check_notransvalue);
+
+ LLVMPositionBuilderAtEnd(b, b_check_notransvalue);
+
v_notransvalue =
l_load_struct_gep(b, v_pergroupp,
FIELDNO_AGGSTATEPERGROUPDATA_NOTRANSVALUE,
@@ -2193,6 +2207,8 @@ llvm_compile_expr(ExprState *state)
LLVMValueRef v_transnull;
LLVMValueRef v_pergroupp;
+ LLVMBasicBlockRef b_check_transnull;
+
int jumpnull = op->d.agg_strict_trans_check.jumpnull;
aggstate = op->d.agg_strict_trans_check.aggstate;
@@ -2216,6 +2232,19 @@ llvm_compile_expr(ExprState *state)
l_load_gep1(b, v_allpergroupsp, v_setoff, ""),
&v_transno, 1, "");
+ b_check_transnull = l_bb_before_v(opblocks[i + 1],
+ "op.%d.check_transnull", i);
+
+ LLVMBuildCondBr(b,
+ LLVMBuildICmp(b, LLVMIntEQ,
+ LLVMBuildPtrToInt(b, v_pergroupp,
+ TypeSizeT, ""),
+ l_sizet_const(0), ""),
+ opblocks[jumpnull],
+ b_check_transnull);
+
+ LLVMPositionBuilderAtEnd(b, b_check_transnull);
+
v_transnull =
l_load_struct_gep(b, v_pergroupp,
FIELDNO_AGGSTATEPERGROUPDATA_TRANSVALUEISNULL,
@@ -2263,6 +2292,8 @@ llvm_compile_expr(ExprState *state)
LLVMValueRef v_tmpcontext;
LLVMValueRef v_oldcontext;
+ LLVMBasicBlockRef b_advance_transval;
+
aggstate = op->d.agg_trans.aggstate;
pertrans = op->d.agg_trans.pertrans;
@@ -2289,6 +2320,19 @@ llvm_compile_expr(ExprState *state)
l_load_gep1(b, v_allpergroupsp, v_setoff, ""),
&v_transno, 1, "");
+ b_advance_transval = l_bb_before_v(opblocks[i + 1],
+ "op.%d.advance_transval", i);
+
+ LLVMBuildCondBr(b,
+ LLVMBuildICmp(b, LLVMIntEQ,
+ LLVMBuildPtrToInt(b, v_pergroupp,
+ TypeSizeT, ""),
+ l_sizet_const(0), ""),
+ opblocks[i + 1],
+ b_advance_transval);
+
+ LLVMPositionBuilderAtEnd(b, b_advance_transval);
+
v_fcinfo = l_ptr_const(fcinfo,
l_ptr(StructFunctionCallInfoData));
v_aggcontext = l_ptr_const(op->d.agg_trans.aggcontext,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index a2a9b1f7be..3cfc299947 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -128,6 +128,7 @@ bool enable_bitmapscan = true;
bool enable_tidscan = true;
bool enable_sort = true;
bool enable_hashagg = true;
+bool enable_hashagg_spill = true;
bool enable_nestloop = true;
bool enable_material = true;
bool enable_mergejoin = true;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index cb897cc7f4..7dc0855461 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -4250,8 +4250,8 @@ consider_groupingsets_paths(PlannerInfo *root,
* with. Override work_mem in that case; otherwise, we'll rely on the
* sorted-input case to generate usable mixed paths.
*/
- if (hashsize > work_mem * 1024L && gd->rollups)
- return; /* nope, won't fit */
+ if (!enable_hashagg_spill && hashsize > work_mem * 1024L && gd->rollups)
+ return; /* nope, won't fit */
/*
* We need to burst the existing rollups list into individual grouping
@@ -6522,7 +6522,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
* were unable to sort above, then we'd better generate a Path, so
* that we at least have one.
*/
- if (hashaggtablesize < work_mem * 1024L ||
+ if (enable_hashagg_spill ||
+ hashaggtablesize < work_mem * 1024L ||
grouped_rel->pathlist == NIL)
{
/*
@@ -6555,7 +6556,8 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
agg_final_costs,
dNumGroups);
- if (hashaggtablesize < work_mem * 1024L)
+ if (enable_hashagg_spill ||
+ hashaggtablesize < work_mem * 1024L)
add_path(grouped_rel, (Path *)
create_agg_path(root,
grouped_rel,
@@ -6824,7 +6826,7 @@ create_partial_grouping_paths(PlannerInfo *root,
* Tentatively produce a partial HashAgg Path, depending on if it
* looks as if the hash table will fit in work_mem.
*/
- if (hashaggtablesize < work_mem * 1024L &&
+ if ((enable_hashagg_spill || hashaggtablesize < work_mem * 1024L) &&
cheapest_total_path != NULL)
{
add_path(partially_grouped_rel, (Path *)
@@ -6851,7 +6853,7 @@ create_partial_grouping_paths(PlannerInfo *root,
dNumPartialPartialGroups);
/* Do the same for partial paths. */
- if (hashaggtablesize < work_mem * 1024L &&
+ if ((enable_hashagg_spill || hashaggtablesize < work_mem * 1024L) &&
cheapest_partial_path != NULL)
{
add_partial_path(partially_grouped_rel, (Path *)
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 3bf96de256..b0cb1d7e6b 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,6 +120,7 @@ bool enableFsync = true;
bool allowSystemTableMods = false;
int work_mem = 1024;
int maintenance_work_mem = 16384;
+bool hashagg_mem_overflow = false;
int max_parallel_maintenance_workers = 2;
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 1208eb9a68..90883c7efd 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -952,6 +952,26 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_hashagg_spill", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of hashed aggregation plans that are expected to exceed work_mem."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_hashagg_spill,
+ true,
+ NULL, NULL, NULL
+ },
+ {
+ {"hashagg_mem_overflow", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables hashed aggregation to overflow work_mem at execution time."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &hashagg_mem_overflow,
+ false,
+ NULL, NULL, NULL
+ },
{
{"enable_material", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of materialization."),
diff --git a/src/backend/utils/mmgr/aset.c b/src/backend/utils/mmgr/aset.c
index 6e4a343439..46e12c359e 100644
--- a/src/backend/utils/mmgr/aset.c
+++ b/src/backend/utils/mmgr/aset.c
@@ -458,6 +458,8 @@ AllocSetContextCreateInternal(MemoryContext parent,
parent,
name);
+ ((MemoryContext) set)->mem_allocated = set->keeper->endptr - ((char *)set->keeper) + MAXALIGN(sizeof(AllocSetContext));
+
return (MemoryContext) set;
}
}
@@ -546,6 +548,8 @@ AllocSetContextCreateInternal(MemoryContext parent,
parent,
name);
+ ((MemoryContext) set)->mem_allocated = set->keeper->endptr - ((char *)set->keeper) + MAXALIGN(sizeof(AllocSetContext));
+
return (MemoryContext) set;
}
@@ -604,6 +608,8 @@ AllocSetReset(MemoryContext context)
else
{
/* Normal case, release the block */
+ context->mem_allocated -= block->endptr - ((char*) block);
+
#ifdef CLOBBER_FREED_MEMORY
wipe_mem(block, block->freeptr - ((char *) block));
#endif
@@ -688,11 +694,16 @@ AllocSetDelete(MemoryContext context)
#endif
if (block != set->keeper)
+ {
+ context->mem_allocated -= block->endptr - ((char *) block);
free(block);
+ }
block = next;
}
+ Assert(context->mem_allocated == 0);
+
/* Finally, free the context header, including the keeper block */
free(set);
}
@@ -733,6 +744,9 @@ AllocSetAlloc(MemoryContext context, Size size)
block = (AllocBlock) malloc(blksize);
if (block == NULL)
return NULL;
+
+ context->mem_allocated += blksize;
+
block->aset = set;
block->freeptr = block->endptr = ((char *) block) + blksize;
@@ -928,6 +942,8 @@ AllocSetAlloc(MemoryContext context, Size size)
if (block == NULL)
return NULL;
+ context->mem_allocated += blksize;
+
block->aset = set;
block->freeptr = ((char *) block) + ALLOC_BLOCKHDRSZ;
block->endptr = ((char *) block) + blksize;
@@ -1028,6 +1044,9 @@ AllocSetFree(MemoryContext context, void *pointer)
set->blocks = block->next;
if (block->next)
block->next->prev = block->prev;
+
+ context->mem_allocated -= block->endptr - ((char*) block);
+
#ifdef CLOBBER_FREED_MEMORY
wipe_mem(block, block->freeptr - ((char *) block));
#endif
@@ -1144,6 +1163,7 @@ AllocSetRealloc(MemoryContext context, void *pointer, Size size)
AllocBlock block = (AllocBlock) (((char *) chunk) - ALLOC_BLOCKHDRSZ);
Size chksize;
Size blksize;
+ Size oldblksize;
/*
* Try to verify that we have a sane block pointer: it should
@@ -1159,6 +1179,8 @@ AllocSetRealloc(MemoryContext context, void *pointer, Size size)
/* Do the realloc */
chksize = MAXALIGN(size);
blksize = chksize + ALLOC_BLOCKHDRSZ + ALLOC_CHUNKHDRSZ;
+ oldblksize = block->endptr - ((char *)block);
+
block = (AllocBlock) realloc(block, blksize);
if (block == NULL)
{
@@ -1166,6 +1188,9 @@ AllocSetRealloc(MemoryContext context, void *pointer, Size size)
VALGRIND_MAKE_MEM_NOACCESS(chunk, ALLOCCHUNK_PRIVATE_LEN);
return NULL;
}
+
+ context->mem_allocated += blksize - oldblksize;
+
block->freeptr = block->endptr = ((char *) block) + blksize;
/* Update pointers since block has likely been moved */
@@ -1383,6 +1408,7 @@ AllocSetCheck(MemoryContext context)
const char *name = set->header.name;
AllocBlock prevblock;
AllocBlock block;
+ int64 total_allocated = 0;
for (prevblock = NULL, block = set->blocks;
block != NULL;
@@ -1393,6 +1419,10 @@ AllocSetCheck(MemoryContext context)
long blk_data = 0;
long nchunks = 0;
+ total_allocated += block->endptr - ((char *)block);
+ if (set->keeper == block)
+ total_allocated += MAXALIGN(sizeof(AllocSetContext));
+
/*
* Empty block - empty can be keeper-block only
*/
@@ -1479,6 +1509,8 @@ AllocSetCheck(MemoryContext context)
elog(WARNING, "problem in alloc set %s: found inconsistent memory block %p",
name, block);
}
+
+ Assert(total_allocated == context->mem_allocated);
}
#endif /* MEMORY_CONTEXT_CHECKING */
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index b07be12236..27417af548 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -462,6 +462,29 @@ MemoryContextIsEmpty(MemoryContext context)
return context->methods->is_empty(context);
}
+/*
+ * Find the memory allocated to blocks for this memory context. If recurse is
+ * true, also include children.
+ */
+int64
+MemoryContextMemAllocated(MemoryContext context, bool recurse)
+{
+ int64 total = context->mem_allocated;
+
+ AssertArg(MemoryContextIsValid(context));
+
+ if (recurse)
+ {
+ MemoryContext child = context->firstchild;
+ for (child = context->firstchild;
+ child != NULL;
+ child = child->nextchild)
+ total += MemoryContextMemAllocated(child, true);
+ }
+
+ return total;
+}
+
/*
* MemoryContextStats
* Print statistics about the named context and all its descendants.
@@ -736,6 +759,7 @@ MemoryContextCreate(MemoryContext node,
node->methods = methods;
node->parent = parent;
node->firstchild = NULL;
+ node->mem_allocated = 0;
node->prevchild = NULL;
node->name = name;
node->ident = NULL;
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index d056fd6151..265e5ffe62 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -143,6 +143,8 @@ extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
TupleTableSlot *slot,
ExprState *eqcomp,
FmgrInfo *hashfunctions);
+extern uint32 TupleHashTableHash(struct tuplehash_hash *tb,
+ const MinimalTuple tuple);
extern void ResetTupleHashTable(TupleHashTable hashtable);
/*
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 61a24c2e3c..be9ae1028d 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -244,6 +244,7 @@ extern bool enableFsync;
extern PGDLLIMPORT bool allowSystemTableMods;
extern PGDLLIMPORT int work_mem;
extern PGDLLIMPORT int maintenance_work_mem;
+extern PGDLLIMPORT bool hashagg_mem_overflow;
extern PGDLLIMPORT int max_parallel_maintenance_workers;
extern int VacuumCostPageHit;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 99b9fa414f..dd7378d4ca 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2022,13 +2022,25 @@ typedef struct AggState
HeapTuple grp_firstTuple; /* copy of first tuple of current group */
/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
bool table_filled; /* hash table filled yet? */
- int num_hashes;
+ int num_hashes; /* number of hash tables active at once */
+ bool hash_can_spill; /* nothing disqualifies the hash from spilling? */
+ bool hash_spilled; /* any hash table ever spilled? */
+ struct HashAggSpill *hash_spills; /* HashAggSpill for each hash table,
+ exists only during first pass if spilled */
+ TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */
+ uint64 hash_mem_init; /* initial hash table memory usage */
+ uint64 hash_mem_peak; /* peak hash table memory usage */
+ uint64 hash_mem_current; /* current hash table memory usage */
+ uint64 hash_disk_used; /* bytes of disk space used */
+ int hash_batches_used; /* batches used during entire execution */
+ List *hash_batches; /* hash batches remaining to be processed */
+
AggStatePerHash perhash; /* array of per-hashtable data */
AggStatePerGroup *hash_pergroup; /* grouping set indexed array of
* per-group pointers */
/* support for evaluation of agg input expressions: */
-#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
+#define FIELDNO_AGGSTATE_ALL_PERGROUPS 44
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
* ->hash_pergroup */
ProjectionInfo *combinedproj; /* projection machinery */
diff --git a/src/include/nodes/memnodes.h b/src/include/nodes/memnodes.h
index dbae98d3d9..df0ae3625c 100644
--- a/src/include/nodes/memnodes.h
+++ b/src/include/nodes/memnodes.h
@@ -79,6 +79,7 @@ typedef struct MemoryContextData
/* these two fields are placed here to minimize alignment wastage: */
bool isReset; /* T = no space alloced since last reset */
bool allowInCritSection; /* allow palloc in critical section */
+ int64 mem_allocated; /* track memory allocated for this context */
const MemoryContextMethods *methods; /* virtual function table */
MemoryContext parent; /* NULL if no parent (toplevel context) */
MemoryContext firstchild; /* head of linked list of children */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 9b6bdbc518..78e24be7b6 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -54,6 +54,7 @@ extern PGDLLIMPORT bool enable_bitmapscan;
extern PGDLLIMPORT bool enable_tidscan;
extern PGDLLIMPORT bool enable_sort;
extern PGDLLIMPORT bool enable_hashagg;
+extern PGDLLIMPORT bool enable_hashagg_spill;
extern PGDLLIMPORT bool enable_nestloop;
extern PGDLLIMPORT bool enable_material;
extern PGDLLIMPORT bool enable_mergejoin;
diff --git a/src/include/utils/memutils.h b/src/include/utils/memutils.h
index ffe6de536e..6a837bc990 100644
--- a/src/include/utils/memutils.h
+++ b/src/include/utils/memutils.h
@@ -82,6 +82,7 @@ extern void MemoryContextSetParent(MemoryContext context,
extern Size GetMemoryChunkSpace(void *pointer);
extern MemoryContext MemoryContextGetParent(MemoryContext context);
extern bool MemoryContextIsEmpty(MemoryContext context);
+extern int64 MemoryContextMemAllocated(MemoryContext context, bool recurse);
extern void MemoryContextStats(MemoryContext context);
extern void MemoryContextStatsDetail(MemoryContext context, int max_children);
extern void MemoryContextAllowInCriticalSection(MemoryContext context,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index a1c90eb905..c40bf6c16e 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -75,6 +75,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
+ enable_hashagg_spill | on
enable_hashjoin | on
enable_indexonlyscan | on
enable_indexscan | on
@@ -89,7 +90,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(17 rows)
+(18 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail