Hi, Jeff

I tried to use the logical tape APIs for hash agg spilling, based on
your 1220 version.

Turns out it doesn't make much of performance difference with the
default 8K block size (might be my patch's problem), but the disk space
(not I/O) would be saved a lot because I force the respilling to use the
same LogicalTapeSet.

Logtape APIs with default block size 8K:
```
postgres=# EXPLAIN ANALYZE SELECT avg(g) FROM generate_series(0,5000000) g 
GROUP BY g;
                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=75000.02..75002.52 rows=200 width=36) (actual 
time=7701.706..24473.002 rows=5000001 loops=1)
   Group Key: g
   Memory Usage: 4096kB  Batches: 516  Disk: 116921kB
   ->  Function Scan on generate_series g  (cost=0.00..50000.01 rows=5000001 
width=4) (actual time=1611.829..3253.150 rows=5000001 loops=1)
 Planning Time: 0.194 ms
 Execution Time: 25129.239 ms
(6 rows)
```

Bare BufFile APIs:
```
postgres=# EXPLAIN ANALYZE SELECT avg(g) FROM generate_series(0,5000000) g 
GROUP BY g;
                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=75000.02..75002.52 rows=200 width=36) (actual 
time=7339.835..24472.466 rows=5000001 loops=1)
   Group Key: g
   Memory Usage: 4096kB  Batches: 516  Disk: 232773kB
   ->  Function Scan on generate_series g  (cost=0.00..50000.01 rows=5000001 
width=4) (actual time=1580.057..3128.749 rows=5000001 loops=1)
 Planning Time: 0.769 ms
 Execution Time: 26696.502 ms
(6 rows)
```

Even though, I'm not sure which API is better, because we should avoid
the respilling as much as we could in the planner, and hash join uses
the bare BufFile.

Attached my hacky and probably not robust diff for your reference.

-- 
Adam Lee
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index f1989b10ea..8c743d7561 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -247,6 +247,7 @@
 #include "utils/datum.h"
 #include "utils/dynahash.h"
 #include "utils/expandeddatum.h"
+#include "utils/logtape.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
@@ -288,8 +289,9 @@ typedef struct HashAggSpill
        int       n_partitions;         /* number of output partitions */
        int       partition_bits;       /* number of bits for partition mask
                                                                   
log2(n_partitions) parent partition bits */
-       BufFile **partitions;           /* output partition files */
+       int      *partitions;           /* output logtape numbers */
        int64    *ntuples;                      /* number of tuples in each 
partition */
+       LogicalTapeSet *lts;
 } HashAggSpill;
 
 /*
@@ -298,11 +300,12 @@ typedef struct HashAggSpill
  */
 typedef struct HashAggBatch
 {
-       BufFile *input_file;            /* input partition */
+       int      input_tape;            /* input partition */
        int      input_bits;            /* number of bits for input partition 
mask */
        int64    input_tuples;          /* number of tuples in this batch */
        int              setno;                         /* grouping set */
        HashAggSpill spill;                     /* spill output */
+       LogicalTapeSet *lts;
 } HashAggBatch;
 
 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
@@ -359,9 +362,8 @@ static void hash_spill_init(HashAggSpill *spill, int 
input_bits,
                                                        uint64 input_tuples, 
double hashentrysize);
 static Size hash_spill_tuple(HashAggSpill *spill, int input_bits,
                                                         TupleTableSlot *slot, 
uint32 hash);
-static MinimalTuple hash_read_spilled(BufFile *file, uint32 *hashp);
-static HashAggBatch *hash_batch_new(BufFile *input_file, int setno,
-                                                                       int64 
input_tuples, int input_bits);
+static MinimalTuple hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 
*hashp);
+static HashAggBatch *hash_batch_new(LogicalTapeSet *lts, int tapenum, int 
setno, int64 input_tuples, int input_bits);
 static void hash_finish_initial_spills(AggState *aggstate);
 static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill,
                                                          int setno, int 
input_bits);
@@ -2462,7 +2464,7 @@ agg_refill_hash_table(AggState *aggstate)
 
                CHECK_FOR_INTERRUPTS();
 
-               tuple = hash_read_spilled(batch->input_file, &hash);
+               tuple = hash_read_spilled(batch->lts, batch->input_tape, &hash);
                if (tuple == NULL)
                        break;
 
@@ -2490,8 +2492,8 @@ agg_refill_hash_table(AggState *aggstate)
                                                                
batch->input_tuples, aggstate->hashentrysize);
                        }
 
-                       aggstate->hash_disk_used += hash_spill_tuple(
-                               &batch->spill, batch->input_bits, slot, hash);
+                       //aggstate->hash_disk_used +=
+                       hash_spill_tuple(&batch->spill, batch->input_bits, 
slot, hash);
                }
 
                /* Advance the aggregates (or combine functions) */
@@ -2504,8 +2506,6 @@ agg_refill_hash_table(AggState *aggstate)
                ResetExprContext(aggstate->tmpcontext);
        }
 
-       BufFileClose(batch->input_file);
-
        aggstate->current_phase = 0;
        aggstate->phase = &aggstate->phases[aggstate->current_phase];
 
@@ -2690,6 +2690,9 @@ hash_spill_init(HashAggSpill *spill, int input_bits, 
uint64 input_groups,
 {
        int     npartitions;
        int     partition_bits;
+       int     i;
+       int     j;
+       int     old_npartitions;
 
        npartitions = hash_choose_num_spill_partitions(input_groups,
                                                                                
                   hashentrysize);
@@ -2702,10 +2705,33 @@ hash_spill_init(HashAggSpill *spill, int input_bits, 
uint64 input_groups,
        /* number of partitions will be a power of two */
        npartitions = 1L << partition_bits;
 
-       spill->partition_bits = partition_bits;
-       spill->n_partitions = npartitions;
-       spill->partitions = palloc0(sizeof(BufFile *) * npartitions);
-       spill->ntuples = palloc0(sizeof(int64) * npartitions);
+       if (spill->lts == NULL)
+       {
+               spill->partition_bits = partition_bits;
+               spill->n_partitions   = npartitions;
+               spill->partitions     = palloc0(sizeof(int) * npartitions);
+               for (i = 0; i < spill->n_partitions; ++i)
+               {
+                       spill->partitions[i] = i;
+               }
+               spill->ntuples        = palloc0(sizeof(int64) * 
spill->n_partitions);
+               spill->lts            = LogicalTapeSetCreate(npartitions, NULL, 
NULL, 0); // TODO: worker is 0?
+       }
+       else // respill
+       {
+               old_npartitions = LogicalTapeGetNTapes(spill->lts);
+               spill->partition_bits = my_log2(npartitions);
+               spill->n_partitions   = (1L << spill->partition_bits);
+               spill->partitions     = palloc0(sizeof(int) * npartitions);
+               j = old_npartitions;
+               for (i = 0; i < spill->n_partitions; ++i)
+               {
+                       spill->partitions[i] = j;
+                       j++;
+               }
+               spill->ntuples        = palloc0(sizeof(int64) * 
spill->n_partitions);
+               spill->lts            = LogicalTapeSetExtend(spill->lts, 
spill->n_partitions);
+       }
 }
 
 /*
@@ -2720,8 +2746,6 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, 
TupleTableSlot *slot,
 {
        int                                      partition;
        MinimalTuple             tuple;
-       BufFile                         *file;
-       int                                      written;
        int                                      total_written = 0;
        bool                             shouldFree;
 
@@ -2743,23 +2767,11 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, 
TupleTableSlot *slot,
 
        spill->ntuples[partition]++;
 
-       if (spill->partitions[partition] == NULL)
-               spill->partitions[partition] = BufFileCreateTemp(false);
-       file = spill->partitions[partition];
-
-       written = BufFileWrite(file, (void *) &hash, sizeof(uint32));
-       if (written != sizeof(uint32))
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not write to HashAgg temporary 
file: %m")));
-       total_written += written;
+       LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) 
&hash, sizeof(uint32));
+       total_written += sizeof(uint32);
 
-       written = BufFileWrite(file, (void *) tuple, tuple->t_len);
-       if (written != tuple->t_len)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not write to HashAgg temporary 
file: %m")));
-       total_written += written;
+       LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) 
tuple, tuple->t_len);
+       total_written += tuple->t_len;
 
        if (shouldFree)
                pfree(tuple);
@@ -2772,38 +2784,37 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, 
TupleTableSlot *slot,
  *             read the next tuple from a batch file.  Return NULL if no more.
  */
 static MinimalTuple
-hash_read_spilled(BufFile *file, uint32 *hashp)
+hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 *hashp)
 {
        MinimalTuple    tuple;
        uint32                  t_len;
        size_t                  nread;
        uint32                  hash;
 
-       nread = BufFileRead(file, &hash, sizeof(uint32));
+       nread = LogicalTapeRead(lts, tapenum, &hash, sizeof(uint32));
        if (nread == 0)
                return NULL;
        if (nread != sizeof(uint32))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not read from HashAgg temporary 
file: %m")));
+                                errmsg("could not read the hash from HashAgg 
spilled tape: %m")));
        if (hashp != NULL)
                *hashp = hash;
 
-       nread = BufFileRead(file, &t_len, sizeof(t_len));
+       nread = LogicalTapeRead(lts, tapenum, &t_len, sizeof(t_len));
        if (nread != sizeof(uint32))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not read from HashAgg temporary 
file: %m")));
+                                errmsg("could not read the t_len from HashAgg 
spilled tape: %m")));
 
        tuple = (MinimalTuple) palloc(t_len);
        tuple->t_len = t_len;
 
-       nread = BufFileRead(file, (void *)((char *)tuple + sizeof(uint32)),
-                                               t_len - sizeof(uint32));
+       nread = LogicalTapeRead(lts, tapenum, (void *)((char *)tuple + 
sizeof(uint32)), t_len - sizeof(uint32));
        if (nread != t_len - sizeof(uint32))
                ereport(ERROR,
                                (errcode_for_file_access(),
-                                errmsg("could not read from HashAgg temporary 
file: %m")));
+                                errmsg("could not read the data from HashAgg 
spilled tape: %m")));
 
        return tuple;
 }
@@ -2815,15 +2826,17 @@ hash_read_spilled(BufFile *file, uint32 *hashp)
  * be done. Should be called in the aggregate's memory context.
  */
 static HashAggBatch *
-hash_batch_new(BufFile *input_file, int setno, int64 input_tuples,
+hash_batch_new(LogicalTapeSet *lts, int tapenum, int setno, int64 input_tuples,
                           int input_bits)
 {
        HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
-       batch->input_file = input_file;
+       batch->input_tape = tapenum;
        batch->input_bits = input_bits;
        batch->input_tuples = input_tuples;
        batch->setno = setno;
+       batch->lts = lts;
+       batch->spill.lts = lts; // share same logical tape set
 
        /* batch->spill will be set only after spilling this batch */
 
@@ -2860,7 +2873,7 @@ hash_finish_initial_spills(AggState *aggstate)
 /*
  * hash_spill_finish
  *
- * Transform spill files into new batches.
+ * Transform spill files into new batches. // XXX so the partitions are empty 
and ready to be reused
  */
 static void
 hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int 
input_bits)
@@ -2872,28 +2885,20 @@ hash_spill_finish(AggState *aggstate, HashAggSpill 
*spill, int setno, int input_
 
        for (i = 0; i < spill->n_partitions; i++)
        {
-               BufFile         *file = spill->partitions[i];
                MemoryContext    oldContext;
                HashAggBatch    *new_batch;
 
-               /* partition is empty */
-               if (file == NULL)
-                       continue;
-
-               /* rewind file for reading */
-               if (BufFileSeek(file, 0, 0L, SEEK_SET))
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not rewind HashAgg 
temporary file: %m")));
-
                oldContext = 
MemoryContextSwitchTo(aggstate->ss.ps.state->es_query_cxt);
-               new_batch = hash_batch_new(file, setno, spill->ntuples[i],
-                                                                  
spill->partition_bits + input_bits);
+               LogicalTapeRewindForRead(spill->lts, spill->partitions[i], 0);
+               new_batch = hash_batch_new(spill->lts, spill->partitions[i], 
setno, spill->ntuples[i],
+                                          spill->partition_bits + input_bits);
                aggstate->hash_batches = lappend(aggstate->hash_batches, 
new_batch);
                aggstate->hash_batches_used++;
                MemoryContextSwitchTo(oldContext);
        }
 
+       if (!list_member_ptr(aggstate->lts_list, spill->lts))
+               aggstate->lts_list = lappend(aggstate->lts_list, spill->lts);
        pfree(spill->ntuples);
        pfree(spill->partitions);
 }
@@ -2904,13 +2909,10 @@ hash_spill_finish(AggState *aggstate, HashAggSpill 
*spill, int setno, int input_
 static void
 hash_reset_spill(HashAggSpill *spill)
 {
-       int i;
-       for (i = 0; i < spill->n_partitions; i++)
+       if (spill->lts != NULL)
        {
-               BufFile         *file = spill->partitions[i];
-
-               if (file != NULL)
-                       BufFileClose(file);
+               LogicalTapeSetClose(spill->lts);
+               spill->lts = NULL;
        }
        if (spill->ntuples != NULL)
                pfree(spill->ntuples);
@@ -2940,16 +2942,19 @@ hash_reset_spills(AggState *aggstate)
        foreach(lc, aggstate->hash_batches)
        {
                HashAggBatch *batch = (HashAggBatch*) lfirst(lc);
-               if (batch->input_file != NULL)
-               {
-                       BufFileClose(batch->input_file);
-                       batch->input_file = NULL;
-               }
                hash_reset_spill(&batch->spill);
                pfree(batch);
        }
        list_free(aggstate->hash_batches);
        aggstate->hash_batches = NIL;
+
+       foreach(lc, aggstate->lts_list)
+       {
+               LogicalTapeSet *lts = (LogicalTapeSet *) lfirst(lc);
+               LogicalTapeSetClose(lts);
+       }
+       list_free(aggstate->lts_list);
+       aggstate->lts_list = NIL;
 }
 
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 8985b9e095..677b992743 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -202,7 +202,7 @@ struct LogicalTapeSet
 
        /* The array of logical tapes. */
        int                     nTapes;                 /* # of logical tapes 
in set */
-       LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER];       /* has nTapes nentries 
*/
+       LogicalTape *tapes;     /* has nTapes nentries */
 };
 
 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
@@ -518,8 +518,8 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, 
SharedFileSet *fileset,
         * Create top-level struct including per-tape LogicalTape structs.
         */
        Assert(ntapes > 0);
-       lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
-                                                                       ntapes 
* sizeof(LogicalTape));
+       lts = (LogicalTapeSet *) palloc0(sizeof(LogicalTapeSet));
+       lts->tapes = (LogicalTape *)palloc0(ntapes * sizeof(LogicalTape));
        lts->nBlocksAllocated = 0L;
        lts->nBlocksWritten = 0L;
        lts->nHoleBlocks = 0L;
@@ -577,6 +577,45 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, 
SharedFileSet *fileset,
        return lts;
 }
 
+LogicalTapeSet *
+LogicalTapeSetExtend(LogicalTapeSet *lts, int ntoextend)
+{
+       LogicalTape    *lt;
+       int            i;
+
+       /*
+        * Create top-level struct including per-tape LogicalTape structs.
+        */
+       Assert(ntoextend > 0);
+       lts->tapes = (LogicalTape *) repalloc(lts->tapes, (lts->nTapes + 
ntoextend) * sizeof(LogicalTape));
+       lts->nTapes           = lts->nTapes + ntoextend;
+
+       /*
+        * Initialize per-tape structs.  Note we allocate the I/O buffer and the
+        * first block for a tape only when it is first actually written to.  
This
+        * avoids wasting memory space when we overestimate the number of tapes 
needed.
+        */
+       for (i = lts->nTapes - ntoextend; i < lts->nTapes; i++)
+       {
+               lt = &lts->tapes[i];
+               lt->writing           = true;
+               lt->frozen            = false;
+               lt->dirty             = false;
+               lt->firstBlockNumber  = -1L;
+               lt->curBlockNumber    = -1L;
+               lt->nextBlockNumber   = -1L;
+               lt->offsetBlockNumber = 0L;
+               lt->buffer            = NULL;
+               lt->buffer_size       = 0;
+               /* palloc() larger than MaxAllocSize would fail */
+               lt->max_size          = MaxAllocSize;
+               lt->pos               = 0;
+               lt->nbytes            = 0;
+       }
+
+       return lts;
+}
+
 /*
  * Close a logical tape set and release all resources.
  */
@@ -1083,3 +1122,9 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
        return lts->nBlocksAllocated - lts->nHoleBlocks;
 }
+
+int
+LogicalTapeGetNTapes(LogicalTapeSet *lts)
+{
+       return lts->nTapes;
+}
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 8d4a36a353..d45473101c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2083,6 +2083,7 @@ typedef struct AggState
        uint64          hash_disk_used; /* bytes of disk space used */
        int                     hash_batches_used;      /* batches used during 
entire execution */
        List       *hash_batches;       /* hash batches remaining to be 
processed */
+       List       *lts_list;
 
        AggStatePerHash perhash;        /* array of per-hashtable data */
        AggStatePerGroup *hash_pergroup;        /* grouping set indexed array of
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 081b03880a..c2f5c72665 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -56,6 +56,7 @@ typedef struct TapeShare
 
 extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared,
                                                                                
        SharedFileSet *fileset, int worker);
+extern LogicalTapeSet * LogicalTapeSetExtend(LogicalTapeSet *lts, int 
ntoextend);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
 extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
@@ -74,5 +75,6 @@ extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
                                                        long *blocknum, int 
*offset);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
+extern int LogicalTapeGetNTapes(LogicalTapeSet *lts);
 
 #endif                                                 /* LOGTAPE_H */

Reply via email to