Hi,

Hash Aggs and SetOps are currently not spilled to disk. If the planner's estimate on the number of entries is badly off, you might run out of memory at execution time, if all the entries don't fit in memory.

For HashAggs, this was discussed in depth a couple of years ago at [1]. SetOps have the same issue, but fixing that is simpler, as you don't need to handle arbitrary aggregate transition values and functions.

So a while back, I started hacking on spilling SetOps, with the idea that the code to deal with that could later be reused to deal with HashAggs, too. I didn't get very far, but I'm posting this in this very unfinished form to show what I've got, because I had a chat on this with Jeff Davis and some others last week.

The logtape.c interface would be very useful for this. When you start spilling, you want to create many spill files, so that when reloaded, each file will fit comfortably in memory. With logtape.c, you can have many logical tapes, without the overhead of real files. Furthermore, if you need to re-spill because you a spill file grows too big in the first pass, logtape.c allows reusing the space "on-the-fly". The only problem with the current logtape interface is that it requires specifying the number of "tapes" upfront, when the tapeset is created. However, I was planning to change that, anyway [2].

[1] https://www.postgresql.org/message-id/1407706010.6623.16.camel%40jeff-desktop

[2] https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi

- Heikki

>From 1513a777ca1aa1df57f054ed8d15db9a734adf91 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 3 May 2018 09:44:13 +0300
Subject: [PATCH 1/2] Optimize memory usage in SetOp executor node.

---
 src/backend/executor/nodeSetOp.c | 68 +++++++++++++++++++++++++++++++---------
 src/include/nodes/execnodes.h    |  4 ++-
 2 files changed, 56 insertions(+), 16 deletions(-)

diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 3fa4a5fcc6..8dd017b2ef 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -51,6 +51,8 @@
 #include "utils/memutils.h"
 
 
+#define NUM_PERGROUPS_PER_ALLOC 100
+
 /*
  * SetOpStatePerGroupData - per-group working state
  *
@@ -60,11 +62,19 @@
  * In SETOP_SORTED mode, we need only one of these structs, and it's kept in
  * the plan state node.  In SETOP_HASHED mode, the hash table contains one
  * of these for each tuple group.
+ *
+ * Unused per-group structs are kept in a linked list, in
+ * SetOpState.free_pergroups.  In that case, 'next' points to the next struct
+ * in the free-list.
  */
-typedef struct SetOpStatePerGroupData
+typedef union SetOpStatePerGroupData
 {
-	long		numLeft;		/* number of left-input dups in group */
-	long		numRight;		/* number of right-input dups in group */
+	struct
+	{
+		uint64		numLeft;		/* number of left-input dups in group */
+		uint64		numRight;		/* number of right-input dups in group */
+	} data;
+	SetOpStatePerGroup next;		/* next unused entry in the free list */
 }			SetOpStatePerGroupData;
 
 
@@ -79,7 +89,7 @@ static TupleTableSlot *setop_retrieve_hash_table(SetOpState *setopstate);
 static inline void
 initialize_counts(SetOpStatePerGroup pergroup)
 {
-	pergroup->numLeft = pergroup->numRight = 0;
+	pergroup->data.numLeft = pergroup->data.numRight = 0;
 }
 
 /*
@@ -89,9 +99,39 @@ static inline void
 advance_counts(SetOpStatePerGroup pergroup, int flag)
 {
 	if (flag)
-		pergroup->numRight++;
+		pergroup->data.numRight++;
 	else
-		pergroup->numLeft++;
+		pergroup->data.numLeft++;
+}
+
+/*
+ * Allocate a new per-group struct.
+ *
+ * To save on memory and palloc() call overhead, we allocate the per-group
+ * structs in batches.
+ */
+static SetOpStatePerGroup
+alloc_pergroup(SetOpState *setopstate)
+{
+	SetOpStatePerGroup pergroup;
+
+	if (!setopstate->free_pergroups)
+	{
+		int			i;
+
+		setopstate->free_pergroups =
+			MemoryContextAlloc(setopstate->hashtable->tablecxt,
+							   NUM_PERGROUPS_PER_ALLOC * sizeof(SetOpStatePerGroupData));
+
+		for (i = 0; i < NUM_PERGROUPS_PER_ALLOC - 1; i++)
+			setopstate->free_pergroups[i].next = &setopstate->free_pergroups[i + 1];
+		setopstate->free_pergroups[NUM_PERGROUPS_PER_ALLOC - 1].next = NULL;
+	}
+
+	pergroup = setopstate->free_pergroups;
+	setopstate->free_pergroups = pergroup->next;
+
+	return pergroup;
 }
 
 /*
@@ -152,26 +192,26 @@ set_output_count(SetOpState *setopstate, SetOpStatePerGroup pergroup)
 	switch (plannode->cmd)
 	{
 		case SETOPCMD_INTERSECT:
-			if (pergroup->numLeft > 0 && pergroup->numRight > 0)
+			if (pergroup->data.numLeft > 0 && pergroup->data.numRight > 0)
 				setopstate->numOutput = 1;
 			else
 				setopstate->numOutput = 0;
 			break;
 		case SETOPCMD_INTERSECT_ALL:
 			setopstate->numOutput =
-				(pergroup->numLeft < pergroup->numRight) ?
-				pergroup->numLeft : pergroup->numRight;
+				(pergroup->data.numLeft < pergroup->data.numRight) ?
+				pergroup->data.numLeft : pergroup->data.numRight;
 			break;
 		case SETOPCMD_EXCEPT:
-			if (pergroup->numLeft > 0 && pergroup->numRight == 0)
+			if (pergroup->data.numLeft > 0 && pergroup->data.numRight == 0)
 				setopstate->numOutput = 1;
 			else
 				setopstate->numOutput = 0;
 			break;
 		case SETOPCMD_EXCEPT_ALL:
 			setopstate->numOutput =
-				(pergroup->numLeft < pergroup->numRight) ?
-				0 : (pergroup->numLeft - pergroup->numRight);
+				(pergroup->data.numLeft < pergroup->data.numRight) ?
+				0 : (pergroup->data.numLeft - pergroup->data.numRight);
 			break;
 		default:
 			elog(ERROR, "unrecognized set op: %d", (int) plannode->cmd);
@@ -385,9 +425,7 @@ setop_fill_hash_table(SetOpState *setopstate)
 			/* If new tuple group, initialize counts */
 			if (isnew)
 			{
-				entry->additional = (SetOpStatePerGroup)
-					MemoryContextAlloc(setopstate->hashtable->tablecxt,
-									   sizeof(SetOpStatePerGroupData));
+				entry->additional = alloc_pergroup(setopstate);
 				initialize_counts((SetOpStatePerGroup) entry->additional);
 			}
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index da7f52cab0..a62d299e67 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2147,7 +2147,7 @@ typedef struct HashState
  * ----------------
  */
 /* this struct is private in nodeSetOp.c: */
-typedef struct SetOpStatePerGroupData *SetOpStatePerGroup;
+typedef union SetOpStatePerGroupData *SetOpStatePerGroup;
 
 typedef struct SetOpState
 {
@@ -2165,6 +2165,8 @@ typedef struct SetOpState
 	MemoryContext tableContext; /* memory context containing hash table */
 	bool		table_filled;	/* hash table filled yet? */
 	TupleHashIterator hashiter; /* for iterating through hash table */
+
+	SetOpStatePerGroup free_pergroups; /* list of free per-group structs */
 } SetOpState;
 
 /* ----------------
-- 
2.11.0


>From c0afa4986b246fd7b10bd9432caa100bce592760 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 25 Apr 2018 15:40:14 +0300
Subject: [PATCH 2/2] Allow SetOps to spill.

---
 src/backend/executor/Makefile        |   2 +-
 src/backend/executor/execGrouping.c  |  37 +++++
 src/backend/executor/execHashSpill.c | 206 +++++++++++++++++++++++++++
 src/backend/executor/nodeSetOp.c     | 263 ++++++++++++++++++++++++++++++++++-
 src/include/executor/executor.h      |  11 ++
 src/include/lib/simplehash.h         |  81 ++++++-----
 src/include/nodes/execnodes.h        |  18 ++-
 7 files changed, 576 insertions(+), 42 deletions(-)
 create mode 100644 src/backend/executor/execHashSpill.c

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..12593dea81 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
-       execGrouping.o execIndexing.o execJunk.o \
+       execGrouping.o execHashSpill.o execIndexing.o execJunk.o \
        execMain.o execParallel.o execPartition.o execProcnode.o \
        execReplication.o execScan.o execSRF.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index c4d0e04058..e02ab8df87 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -180,6 +180,7 @@ BuildTupleHashTable(PlanState *parent,
 	hashtable->inputslot = NULL;
 	hashtable->in_hash_funcs = NULL;
 	hashtable->cur_eq_func = NULL;
+	hashtable->usedMem = sizeof(TupleHashTableData);
 
 	/*
 	 * If parallelism is in use, even if the master backend is performing the
@@ -195,6 +196,7 @@ BuildTupleHashTable(PlanState *parent,
 		hashtable->hash_iv = 0;
 
 	hashtable->hashtab = tuplehash_create(tablecxt, nbuckets, hashtable);
+	hashtable->usedMem += nbuckets * sizeof(TupleHashEntryData);
 
 	oldcontext = MemoryContextSwitchTo(hashtable->tablecxt);
 
@@ -266,6 +268,7 @@ LookupTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
 			MemoryContextSwitchTo(hashtable->tablecxt);
 			/* Copy the first tuple into the table context */
 			entry->firstTuple = ExecCopySlotMinimalTuple(slot);
+			hashtable->usedMem += GetMemoryChunkSpace(entry->firstTuple);
 		}
 	}
 	else
@@ -313,6 +316,40 @@ FindTupleHashEntry(TupleHashTable hashtable, TupleTableSlot *slot,
 }
 
 /*
+ * Choose a victim tuple to remove from hash table, and return it.
+ *
+ * The caller should remove the entry from the hash table, pfree(firstTuple),
+ * and 'additional'.
+ */
+TupleHashEntryData *
+SpillTupleHashTable(TupleHashTable hashtable)
+{
+	TupleHashEntry entry;
+
+	if (hashtable->hashtab->members == 0)
+	{
+		/* hash table is empty. Nothing to spill */
+		return NULL;
+	}
+
+	if (!hashtable->spill_iter_inited)
+	{
+		tuplehash_start_iterate(hashtable->hashtab, &hashtable->spill_iter);
+		hashtable->spill_iter_inited = true;
+	}
+
+	entry = tuplehash_iterate(hashtable->hashtab, &hashtable->spill_iter);
+	if (!entry)
+	{
+		/* reached end, restart */
+		tuplehash_start_iterate(hashtable->hashtab, &hashtable->spill_iter);
+		entry = tuplehash_iterate(hashtable->hashtab, &hashtable->spill_iter);
+	}
+
+	return entry;
+}
+
+/*
  * Compute the hash value for a tuple
  *
  * The passed-in key is a pointer to TupleHashEntryData.  In an actual hash
diff --git a/src/backend/executor/execHashSpill.c b/src/backend/executor/execHashSpill.c
new file mode 100644
index 0000000000..88e1ccec94
--- /dev/null
+++ b/src/backend/executor/execHashSpill.c
@@ -0,0 +1,206 @@
+/*
+ *
+ *
+ * TODO:
+ *
+ * - need a smarter strategy to choose spill victim. Must avoid infinite looping,
+ *   where the same tuples get spilled again and again.
+ */
+
+#include "postgres.h"
+
+#include "executor/executor.h"
+#include "utils/hashutils.h"
+#include "utils/memutils.h"
+#include "storage/buffile.h"
+
+/*
+ * Spill Set
+ *
+ */
+
+#define FANOUT_SHIFT 4
+#define FANOUT 16
+#define LEVELS 8		/* 32 bits, 4 bits per level */
+#define HASH_HIGH_MASK 0xf0000000
+
+typedef struct SubSpillSet SubSpillSet;
+
+/*
+ * Spill files form a radix tree, based on the hash key. Whenever a sub-spillset
+ * grows too large, it is split. The entries that had already been written to
+ * the file for that sub-spillset are kept in the old file, but any new entries
+ * are written to the child nodes instead.
+ *
+ * XXX: I think that doesn't do the right thing with ordered aggregates, where
+ * we have to take care to feed the input rows to the aggregate in the input
+ * order.
+ */
+struct SubSpillSet
+{
+	BufFile	   *file;
+	int			num_children;
+
+	SubSpillSet   *children[FANOUT];
+};
+
+struct HashSpillSet
+{
+	SubSpillSet root;
+	uint64		max_size;
+};
+
+/*
+ * When processing input:
+ *
+ * 1. If has table is full, choose victim.
+ * 2. Call GetSpillFile() on the victim
+ * 3. Dump the entry to the file returned by GetSpillFile .
+ * 4. Remove entry from hash table.
+ */
+
+/*
+ * To finalize:
+ *
+ * 1. Dump remaining entries from in-memory hash table, where !firstbatch
+ * 2. Return remaining entries from in-memory hash table (with firstbatch==true)
+ *
+ * 3. Call ReadNextSpillFile(). Read entries from the file until it's empty.
+ * 4. Load each entry to in-memory hash table.
+ *
+ */
+
+HashSpillSet *
+CreateHashSpillSet(int64 target_file_size)
+{
+	HashSpillSet *sp;
+
+	sp = palloc0(sizeof(HashSpillSet));
+	sp->max_size = target_file_size;
+
+	return sp;
+}
+
+static SubSpillSet *
+CreateSubSpillSet(void)
+{
+	SubSpillSet *subsp;
+
+	subsp = palloc0(sizeof(SubSpillSet));
+	subsp->file = BufFileCreateTemp(false);
+	subsp->num_children = 0;
+
+	return subsp;
+}
+
+BufFile *
+GetSpillFile(HashSpillSet *sp, uint32 hash)
+{
+	uint32		level;
+	SubSpillSet *subsp;
+	uint32		hash_shifted;
+
+	level = 0;
+	subsp = &sp->root;
+	hash_shifted = hash;
+	for (;;)
+	{
+		uint32		childno;
+
+		childno = (hash_shifted & HASH_HIGH_MASK) >> (32 - FANOUT_SHIFT);
+
+		if (!subsp->children[childno])
+		{
+			subsp->num_children++;
+			subsp = subsp->children[childno] = CreateSubSpillSet();
+			break;
+		}
+
+		subsp = subsp->children[childno];
+		if (subsp->num_children == 0)
+		{
+			/*
+			 * We found the correct sub-spillset that this tuple belongs to.
+			 *
+			 * But is it too full? If so, continue to recurse into it, to create
+			 * a new sub-spillsets at lower level.
+			 */
+			if (level == LEVELS - 1 || BufFileSize(subsp->file) < sp->max_size)
+				break;
+		}
+		Assert(level < LEVELS - 1);
+
+		level++;
+		hash_shifted <<= FANOUT_SHIFT;
+	}
+
+	return subsp->file;
+}
+
+static BufFile *
+OpenNextSpillFileRecurse(HashSpillSet *sp, SubSpillSet *thissp, bool *respill)
+{
+	BufFile	   *file = NULL;
+	int			i;
+
+	if (thissp->file)
+	{
+		file = thissp->file;
+		thissp->file = NULL;
+		*respill = (thissp->num_children > 0);
+		return file;
+	}
+
+	for (i = 0; i < FANOUT; i++)
+	{
+		SubSpillSet *subsp;
+
+		if (thissp->children[i])
+		{
+			subsp = thissp->children[i];
+
+			file = OpenNextSpillFileRecurse(sp, subsp, respill);
+
+			if (!subsp->file && subsp->num_children == 0)
+			{
+				/* This leaf entry is no longer needed. Prune it. */
+				pfree(thissp->children[i]);
+				thissp->children[i] = NULL;
+				thissp->num_children--;
+			}
+			return file;
+		}
+	}
+	return NULL;
+}
+
+/*
+ * Open the next spill file to process.
+ *
+ * The spill files form a tree. This returns the spill files in an order,
+ * so that a parent is always returned before its children.
+ */
+BufFile *
+OpenNextSpillFile(HashSpillSet *sp, bool *respill)
+{
+	/* Scan the radix tree for next batch. */
+	BufFile *file;
+	file = OpenNextSpillFileRecurse(sp, &sp->root, respill);
+	if (file)
+	{
+		BufFileSeek(file, 0, 0, SEEK_SET);
+		/* XXX check error */
+	}
+	return file;
+}
+void
+CloseHashSpillSet(HashSpillSet *sp)
+{
+	BufFile	   *file;
+	bool		respill;
+
+	while((file = OpenNextSpillFile(sp, &respill)) != NULL)
+		BufFileClose(file);
+
+	pfree(sp);
+}
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 8dd017b2ef..4f241d6104 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -22,6 +22,7 @@
  * scan the hashtable and generate the correct output using those counts.
  * We can avoid making hashtable entries for any tuples appearing only in the
  * second input relation, since they cannot result in any output.
+ * (XXX: except that we do, if the hash table has been spilled)
  *
  * This node type is not used for UNION or UNION ALL, since those can be
  * implemented more cheaply (there's no need for the junk attribute to
@@ -82,6 +83,7 @@ static TupleTableSlot *setop_retrieve_direct(SetOpState *setopstate);
 static void setop_fill_hash_table(SetOpState *setopstate);
 static TupleTableSlot *setop_retrieve_hash_table(SetOpState *setopstate);
 
+static void setop_spill_entry(SetOpState *setopstate, TupleHashEntry entry, bool in_hashtab);
 
 /*
  * Initialize state for a new group of input values.
@@ -134,6 +136,13 @@ alloc_pergroup(SetOpState *setopstate)
 	return pergroup;
 }
 
+static void
+free_pergroup(SetOpState *setopstate, SetOpStatePerGroup pergroup)
+{
+	pergroup->next = setopstate->free_pergroups;
+	setopstate->free_pergroups = pergroup;
+}
+
 /*
  * Fetch the "flag" column from an input tuple.
  * This is an integer column with value 0 for left side, 1 for right side.
@@ -177,6 +186,7 @@ build_hash_table(SetOpState *setopstate)
 												setopstate->tableContext,
 												econtext->ecxt_per_tuple_memory,
 												false);
+	setopstate->hashtable->allowedMem = work_mem * 1024L;
 }
 
 /*
@@ -371,6 +381,188 @@ setop_retrieve_direct(SetOpState *setopstate)
 	return NULL;
 }
 
+static void
+BufFileWriteNoError(BufFile *file, void *ptr, size_t size)
+{
+	if (BufFileWrite(file, ptr, size) != size)
+		elog(ERROR, "io error"); /* XXX ereport */
+}
+
+/* Callback to dump the 'additional' data */
+static void
+setop_spill_entries(SetOpState *setopstate)
+{
+	if (setopstate->hashtable->usedMem < setopstate->hashtable->allowedMem)
+		return;
+
+	if (!setopstate->spillset)
+	{
+		setopstate->spillset = CreateHashSpillSet(setopstate->hashtable->allowedMem / 2);
+
+		setopstate->spillslot = MakeSingleTupleTableSlot(CreateTupleDescCopy(ExecGetResultType(outerPlanState(setopstate))));
+	}
+
+	while (setopstate->hashtable->usedMem > setopstate->hashtable->allowedMem &&
+		setopstate->hashtable->hashtab->members > 1)
+	{
+		TupleHashEntry entry;
+#if 0
+		elog(NOTICE, "spilling: used %ld allowed %ld tuples %d",
+			 setopstate->hashtable->usedMem, setopstate->hashtable->allowedMem,
+			setopstate->hashtable->hashtab->members);
+#endif
+		entry = SpillTupleHashTable(setopstate->hashtable);
+		if (!entry)
+			break;
+
+		setop_spill_entry(setopstate, entry, true);
+	}
+}
+
+static void
+setop_spill_entry(SetOpState *setopstate, TupleHashEntry entry, bool in_hashtab)
+{
+	BufFile	   *file;
+	MinimalTuple tup;
+
+	file = GetSpillFile(setopstate->spillset, entry->hash);
+
+	BufFileWriteNoError(file, &entry->hash, sizeof(uint32));
+	BufFileWriteNoError(file, &entry->firstTuple->t_len, sizeof(uint32));
+	BufFileWriteNoError(file, entry->firstTuple, entry->firstTuple->t_len);
+	BufFileWriteNoError(file, entry->additional, sizeof(SetOpStatePerGroupData));
+
+	if (in_hashtab)
+	{
+		/* Free the entry. */
+		setopstate->hashtable->usedMem -= sizeof(SetOpStatePerGroupData);
+		setopstate->hashtable->usedMem -= GetMemoryChunkSpace(entry->firstTuple);
+		free_pergroup(setopstate, entry->additional);
+		tup = entry->firstTuple;
+		tuplehash_delete_elem(setopstate->hashtable->hashtab, entry, entry->hash);
+		pfree(tup);
+
+		if (!setopstate->spilled)
+		{
+			setopstate->spilled = true;
+
+			elog(NOTICE, "spilling: used %ld allowed %ld tuples %d",
+				 setopstate->hashtable->usedMem, setopstate->hashtable->allowedMem,
+				 setopstate->hashtable->hashtab->members);
+		}
+	}
+}
+
+static void
+BufFileReadExact(BufFile *file, void *ptr, size_t size)
+{
+	if (BufFileRead(file, ptr, size) != size)
+		elog(ERROR, "error reading file");
+}
+
+/*
+ * Reload next batch into memory.
+ *
+ * May cause further spilling. Returns false if there were no more batches.
+ */
+static bool
+setop_reload_batch(SetOpState *setopstate)
+{
+	uint32		loaded_hash;
+	uint32		tuplen;
+	MinimalTuple loaded_firstTuple;
+	SetOpStatePerGroup pergroup;
+	SetOpStatePerGroupData loaded_pergroup;
+	int			readlen;
+	BufFile	   *file;
+	TupleHashEntry entry;
+	bool		isnew;
+	bool		respill;
+
+	Assert(setopstate->hashtable->hashtab->members == 0);
+	setopstate->spilled = false;
+
+	file = OpenNextSpillFile(setopstate->spillset, &respill);
+	if (!file)
+		return false;
+
+	setopstate->spilled = false;
+
+	for (;;)
+	{
+		readlen = BufFileRead(file, &loaded_hash, sizeof(uint32));
+		if (readlen == 0)
+		{
+			//elog(NOTICE, "reloaded batch %p with %d tuples", file, setopstate->hashtable->hashtab->members);
+			BufFileClose(file);
+			break;
+		}
+		if (readlen != sizeof(uint32))
+			elog(ERROR, "error reading file");
+		BufFileReadExact(file, &tuplen, sizeof(uint32));
+		loaded_firstTuple = palloc(tuplen);
+		BufFileReadExact(file, loaded_firstTuple, tuplen);
+		Assert(loaded_firstTuple->t_len == tuplen);
+		BufFileReadExact(file, &loaded_pergroup, sizeof(SetOpStatePerGroupData));
+
+		ExecStoreMinimalTuple(loaded_firstTuple, setopstate->spillslot, true);
+
+		if (setopstate->hashtable->usedMem < setopstate->hashtable->allowedMem ||
+			setopstate->hashtable->hashtab->members < 1)
+		{
+			entry = LookupTupleHashEntry(setopstate->hashtable, setopstate->spillslot,
+										 &isnew);
+
+			if (isnew)
+			{
+				entry->additional = alloc_pergroup(setopstate);
+				setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
+				initialize_counts((SetOpStatePerGroup) entry->additional);
+				if (setopstate->spilled || respill)
+					entry->firstbatch = false;
+				else
+					entry->firstbatch = true;
+			}
+
+			pergroup = (SetOpStatePerGroup) entry->additional;
+			pergroup->data.numLeft += loaded_pergroup.data.numLeft;
+			pergroup->data.numRight += loaded_pergroup.data.numRight;
+		}
+		else
+		{
+			/*
+			 * We are already over the limit. Check if it happens to be in the
+			 * hash table, but if not, re-spill it immediately. This is
+			 * important, so that we process entries in LIFO order. Otherwise,
+			 * if there are enough entries with the same hash value to not fit
+			 * in memory at the time, we might loop and not make any progress.
+			 */
+			entry = LookupTupleHashEntry(setopstate->hashtable,
+										 setopstate->spillslot,
+										 NULL);
+			if (entry)
+			{
+				pergroup = (SetOpStatePerGroup) entry->additional;
+				pergroup->data.numLeft += loaded_pergroup.data.numLeft;
+				pergroup->data.numRight += loaded_pergroup.data.numRight;
+			}
+			else
+			{
+				TupleHashEntryData loaded_entry;
+
+				loaded_entry.hash = loaded_hash;
+				loaded_entry.firstTuple = loaded_firstTuple;
+				loaded_entry.additional = &loaded_pergroup;
+
+				setop_spill_entry(setopstate, &loaded_entry, false);
+			}
+		}
+		ExecClearTuple(setopstate->spillslot);
+	}
+
+	return true;
+}
+
 /*
  * ExecSetOp for hashed case: phase 1, read input and build hash table
  */
@@ -413,6 +605,9 @@ setop_fill_hash_table(SetOpState *setopstate)
 		/* Identify whether it's left or right input */
 		flag = fetch_tuple_flag(setopstate, outerslot);
 
+		/* If we're out of memory, spill some tuples from the hash table. */
+		setop_spill_entries(setopstate);
+
 		if (flag == firstFlag)
 		{
 			/* (still) in first input relation */
@@ -426,7 +621,12 @@ setop_fill_hash_table(SetOpState *setopstate)
 			if (isnew)
 			{
 				entry->additional = alloc_pergroup(setopstate);
+				setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
 				initialize_counts((SetOpStatePerGroup) entry->additional);
+				if (setopstate->spilled)
+					entry->firstbatch = false;
+				else
+					entry->firstbatch = true;
 			}
 
 			/* Advance the counts */
@@ -438,12 +638,30 @@ setop_fill_hash_table(SetOpState *setopstate)
 			in_first_rel = false;
 
 			/* For tuples not seen previously, do not make hashtable entry */
+			/* If the hash table has already been spilled, we must make entries
+			 * for everything, because we might have spilled an entry for this
+			 * key already.
+			 */
+			isnew = false;
 			entry = LookupTupleHashEntry(setopstate->hashtable, outerslot,
-										 NULL);
+										 setopstate->spilled ? &isnew : NULL);
 
 			/* Advance the counts if entry is already present */
 			if (entry)
+			{
+				/* If new tuple group, initialize counts */
+				if (isnew)
+				{
+					entry->additional = alloc_pergroup(setopstate);
+					setopstate->hashtable->usedMem += sizeof(SetOpStatePerGroupData);
+					initialize_counts((SetOpStatePerGroup) entry->additional);
+					if (setopstate->spilled)
+						entry->firstbatch = false;
+					else
+						entry->firstbatch = true;
+				}
 				advance_counts((SetOpStatePerGroup) entry->additional, flag);
+			}
 		}
 
 		/* Must reset expression context after each hashtable lookup */
@@ -453,6 +671,8 @@ setop_fill_hash_table(SetOpState *setopstate)
 	setopstate->table_filled = true;
 	/* Initialize to walk the hash table */
 	ResetTupleHashIterator(setopstate->hashtable, &setopstate->hashiter);
+
+	elog(NOTICE, "table filled");
 }
 
 /*
@@ -474,31 +694,63 @@ setop_retrieve_hash_table(SetOpState *setopstate)
 	 */
 	while (!setopstate->setop_done)
 	{
+		MinimalTuple tuple;
+
 		CHECK_FOR_INTERRUPTS();
 
 		/*
 		 * Find the next entry in the hash table
 		 */
 		entry = ScanTupleHashTable(setopstate->hashtable, &setopstate->hashiter);
+
 		if (entry == NULL)
 		{
-			/* No more entries in hashtable, so done */
+			/* No more entries in hashtable. But do we have spill files to process?  */
+			if (setopstate->spillset)
+			{
+				if (setop_reload_batch(setopstate))
+				{
+					ResetTupleHashIterator(setopstate->hashtable, &setopstate->hashiter);
+					continue;
+				}
+			}
 			setopstate->setop_done = true;
 			return NULL;
 		}
 
+		if (!entry->firstbatch)
+		{
+			/*
+			 * re-spill this entry, because we might have spilled some earlier entries
+			 * with this key already.
+			 */
+			setop_spill_entry(setopstate, entry, true);
+			continue;
+		}
+
 		/*
 		 * See if we should emit any copies of this tuple, and if so return
 		 * the first copy.
 		 */
 		set_output_count(setopstate, (SetOpStatePerGroup) entry->additional);
 
+		tuple = entry->firstTuple;
+
+		free_pergroup(setopstate, entry->additional);
+		setopstate->hashtable->usedMem -= sizeof(SetOpStatePerGroupData);
+		tuplehash_delete_elem(setopstate->hashtable->hashtab, entry, entry->hash);
+		setopstate->hashtable->usedMem -= GetMemoryChunkSpace(tuple);
+
 		if (setopstate->numOutput > 0)
 		{
 			setopstate->numOutput--;
-			return ExecStoreMinimalTuple(entry->firstTuple,
+			return ExecStoreMinimalTuple(tuple,
 										 resultTupleSlot,
-										 false);
+										 true);
+		}
+		else
+		{
+			pfree(tuple);
 		}
 	}
 
@@ -619,6 +871,9 @@ ExecEndSetOp(SetOpState *node)
 	/* clean up tuple table */
 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
+	if (node->spillset)
+		CloseHashSpillSet(node->spillset);
+
 	/* free subsidiary stuff including hashtable */
 	if (node->tableContext)
 		MemoryContextDelete(node->tableContext);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index a7ea3c7d10..0f972bf47b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -17,6 +17,7 @@
 #include "executor/execdesc.h"
 #include "nodes/parsenodes.h"
 #include "utils/memutils.h"
+#include "storage/buffile.h"
 
 
 /*
@@ -136,6 +137,16 @@ extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
 				   TupleTableSlot *slot,
 				   ExprState *eqcomp,
 				   FmgrInfo *hashfunctions);
+extern TupleHashEntry SpillTupleHashTable(TupleHashTable hashtable);
+
+/*
+ * prototypes from functions in execHashSpill.c
+ */
+extern HashSpillSet *CreateHashSpillSet(int64 target_file_size);
+extern BufFile *GetSpillFile(HashSpillSet *sp, uint32 hash);
+extern BufFile *OpenNextSpillFile(HashSpillSet *sp, bool *respill);
+extern void CloseHashSpillSet(HashSpillSet *sp);
+
 
 /*
  * prototypes from functions in execJunk.c
diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 5273d49460..5785c578b0 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -74,6 +74,7 @@
 #define SH_DESTROY SH_MAKE_NAME(destroy)
 #define SH_INSERT SH_MAKE_NAME(insert)
 #define SH_DELETE SH_MAKE_NAME(delete)
+#define SH_DELETE_ELEM SH_MAKE_NAME(delete_elem)
 #define SH_LOOKUP SH_MAKE_NAME(lookup)
 #define SH_GROW SH_MAKE_NAME(grow)
 #define SH_START_ITERATE SH_MAKE_NAME(start_iterate)
@@ -144,6 +145,7 @@ SH_SCOPE void SH_GROW(SH_TYPE * tb, uint32 newsize);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_INSERT(SH_TYPE * tb, SH_KEY_TYPE key, bool *found);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_LOOKUP(SH_TYPE * tb, SH_KEY_TYPE key);
 SH_SCOPE bool SH_DELETE(SH_TYPE * tb, SH_KEY_TYPE key);
+SH_SCOPE void SH_DELETE_ELEM(SH_TYPE * tb, SH_ELEMENT_TYPE *elem, uint32 hash);
 SH_SCOPE void SH_START_ITERATE(SH_TYPE * tb, SH_ITERATOR * iter);
 SH_SCOPE void SH_START_ITERATE_AT(SH_TYPE * tb, SH_ITERATOR * iter, uint32 at);
 SH_SCOPE	SH_ELEMENT_TYPE *SH_ITERATE(SH_TYPE * tb, SH_ITERATOR * iter);
@@ -697,54 +699,61 @@ SH_DELETE(SH_TYPE * tb, SH_KEY_TYPE key)
 		if (entry->status == SH_STATUS_IN_USE &&
 			SH_COMPARE_KEYS(tb, hash, key, entry))
 		{
-			SH_ELEMENT_TYPE *lastentry = entry;
+			SH_DELETE_ELEM(tb, entry, hash);
+			return true;
+		}
 
-			tb->members--;
+		/* TODO: return false; if distance too big */
 
-			/*
-			 * Backward shift following elements till either an empty element
-			 * or an element at its optimal position is encountered.
-			 *
-			 * While that sounds expensive, the average chain length is short,
-			 * and deletions would otherwise require tombstones.
-			 */
-			while (true)
-			{
-				SH_ELEMENT_TYPE *curentry;
-				uint32		curhash;
-				uint32		curoptimal;
+		curelem = SH_NEXT(tb, curelem, startelem);
+	}
+}
 
-				curelem = SH_NEXT(tb, curelem, startelem);
-				curentry = &tb->data[curelem];
+SH_SCOPE void
+SH_DELETE_ELEM(SH_TYPE * tb, SH_ELEMENT_TYPE *entry, uint32 hash)
+{
+	SH_ELEMENT_TYPE *lastentry = entry;
+	uint32		startelem = SH_INITIAL_BUCKET(tb, hash);
+	uint32		curelem = entry - &tb->data[0];
 
-				if (curentry->status != SH_STATUS_IN_USE)
-				{
-					lastentry->status = SH_STATUS_EMPTY;
-					break;
-				}
+	tb->members--;
 
-				curhash = SH_ENTRY_HASH(tb, curentry);
-				curoptimal = SH_INITIAL_BUCKET(tb, curhash);
+	/*
+	 * Backward shift following elements till either an empty element
+	 * or an element at its optimal position is encountered.
+	 *
+	 * While that sounds expensive, the average chain length is short,
+	 * and deletions would otherwise require tombstones.
+	 */
+	while (true)
+	{
+		SH_ELEMENT_TYPE *curentry;
+		uint32		curhash;
+		uint32		curoptimal;
 
-				/* current is at optimal position, done */
-				if (curoptimal == curelem)
-				{
-					lastentry->status = SH_STATUS_EMPTY;
-					break;
-				}
+		curelem = SH_NEXT(tb, curelem, startelem);
+		curentry = &tb->data[curelem];
 
-				/* shift */
-				memcpy(lastentry, curentry, sizeof(SH_ELEMENT_TYPE));
+		if (curentry->status != SH_STATUS_IN_USE)
+		{
+			lastentry->status = SH_STATUS_EMPTY;
+			break;
+		}
 
-				lastentry = curentry;
-			}
+		curhash = SH_ENTRY_HASH(tb, curentry);
+		curoptimal = SH_INITIAL_BUCKET(tb, curhash);
 
-			return true;
+		/* current is at optimal position, done */
+		if (curoptimal == curelem)
+		{
+			lastentry->status = SH_STATUS_EMPTY;
+			break;
 		}
 
-		/* TODO: return false; if distance too big */
+		/* shift */
+		memcpy(lastentry, curentry, sizeof(SH_ELEMENT_TYPE));
 
-		curelem = SH_NEXT(tb, curelem, startelem);
+		lastentry = curentry;
 	}
 }
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a62d299e67..58c8036003 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -650,7 +650,8 @@ typedef struct TupleHashEntryData
 {
 	MinimalTuple firstTuple;	/* copy of first tuple in this group */
 	void	   *additional;		/* user data */
-	uint32		status;			/* hash status */
+	bool		firstbatch;
+	uint8		status;			/* hash status */
 	uint32		hash;			/* hash value (cached) */
 } TupleHashEntryData;
 
@@ -679,6 +680,13 @@ typedef struct TupleHashTableData
 	ExprState  *cur_eq_func;	/* comparator for for input vs. table */
 	uint32		hash_iv;		/* hash-function IV */
 	ExprContext *exprcontext;	/* expression context */
+
+	int64		usedMem;
+	int64		allowedMem;
+
+	/* batches */
+	tuplehash_iterator spill_iter;
+	bool		spill_iter_inited;
 }			TupleHashTableData;
 
 typedef tuplehash_iterator TupleHashIterator;
@@ -697,6 +705,9 @@ typedef tuplehash_iterator TupleHashIterator;
 #define ScanTupleHashTable(htable, iter) \
 	tuplehash_iterate(htable->hashtab, iter)
 
+/* Support for spilling hash tables */
+typedef struct HashSpillSet HashSpillSet; /* private to execHashSpill.c */
+
 
 /* ----------------------------------------------------------------
  *				 Expression State Nodes
@@ -2167,6 +2178,11 @@ typedef struct SetOpState
 	TupleHashIterator hashiter; /* for iterating through hash table */
 
 	SetOpStatePerGroup free_pergroups; /* list of free per-group structs */
+
+	HashSpillSet *spillset;
+	bool		spilled;		/* has the current batch spilled? */
+	TupleTableSlot *spillslot;
+
 } SetOpState;
 
 /* ----------------
-- 
2.11.0


Reply via email to