Hi, Jeff I tried to use the logical tape APIs for hash agg spilling, based on your 1220 version.
Turns out it doesn't make much of performance difference with the default 8K block size (might be my patch's problem), but the disk space (not I/O) would be saved a lot because I force the respilling to use the same LogicalTapeSet. Logtape APIs with default block size 8K: ``` postgres=# EXPLAIN ANALYZE SELECT avg(g) FROM generate_series(0,5000000) g GROUP BY g; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- HashAggregate (cost=75000.02..75002.52 rows=200 width=36) (actual time=7701.706..24473.002 rows=5000001 loops=1) Group Key: g Memory Usage: 4096kB Batches: 516 Disk: 116921kB -> Function Scan on generate_series g (cost=0.00..50000.01 rows=5000001 width=4) (actual time=1611.829..3253.150 rows=5000001 loops=1) Planning Time: 0.194 ms Execution Time: 25129.239 ms (6 rows) ``` Bare BufFile APIs: ``` postgres=# EXPLAIN ANALYZE SELECT avg(g) FROM generate_series(0,5000000) g GROUP BY g; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- HashAggregate (cost=75000.02..75002.52 rows=200 width=36) (actual time=7339.835..24472.466 rows=5000001 loops=1) Group Key: g Memory Usage: 4096kB Batches: 516 Disk: 232773kB -> Function Scan on generate_series g (cost=0.00..50000.01 rows=5000001 width=4) (actual time=1580.057..3128.749 rows=5000001 loops=1) Planning Time: 0.769 ms Execution Time: 26696.502 ms (6 rows) ``` Even though, I'm not sure which API is better, because we should avoid the respilling as much as we could in the planner, and hash join uses the bare BufFile. Attached my hacky and probably not robust diff for your reference. -- Adam Lee
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index f1989b10ea..8c743d7561 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -247,6 +247,7 @@ #include "utils/datum.h" #include "utils/dynahash.h" #include "utils/expandeddatum.h" +#include "utils/logtape.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -288,8 +289,9 @@ typedef struct HashAggSpill int n_partitions; /* number of output partitions */ int partition_bits; /* number of bits for partition mask log2(n_partitions) parent partition bits */ - BufFile **partitions; /* output partition files */ + int *partitions; /* output logtape numbers */ int64 *ntuples; /* number of tuples in each partition */ + LogicalTapeSet *lts; } HashAggSpill; /* @@ -298,11 +300,12 @@ typedef struct HashAggSpill */ typedef struct HashAggBatch { - BufFile *input_file; /* input partition */ + int input_tape; /* input partition */ int input_bits; /* number of bits for input partition mask */ int64 input_tuples; /* number of tuples in this batch */ int setno; /* grouping set */ HashAggSpill spill; /* spill output */ + LogicalTapeSet *lts; } HashAggBatch; static void select_current_set(AggState *aggstate, int setno, bool is_hash); @@ -359,9 +362,8 @@ static void hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_tuples, double hashentrysize); static Size hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, uint32 hash); -static MinimalTuple hash_read_spilled(BufFile *file, uint32 *hashp); -static HashAggBatch *hash_batch_new(BufFile *input_file, int setno, - int64 input_tuples, int input_bits); +static MinimalTuple hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 *hashp); +static HashAggBatch *hash_batch_new(LogicalTapeSet *lts, int tapenum, int setno, int64 input_tuples, int input_bits); static void hash_finish_initial_spills(AggState *aggstate); static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_bits); @@ -2462,7 +2464,7 @@ agg_refill_hash_table(AggState *aggstate) CHECK_FOR_INTERRUPTS(); - tuple = hash_read_spilled(batch->input_file, &hash); + tuple = hash_read_spilled(batch->lts, batch->input_tape, &hash); if (tuple == NULL) break; @@ -2490,8 +2492,8 @@ agg_refill_hash_table(AggState *aggstate) batch->input_tuples, aggstate->hashentrysize); } - aggstate->hash_disk_used += hash_spill_tuple( - &batch->spill, batch->input_bits, slot, hash); + //aggstate->hash_disk_used += + hash_spill_tuple(&batch->spill, batch->input_bits, slot, hash); } /* Advance the aggregates (or combine functions) */ @@ -2504,8 +2506,6 @@ agg_refill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } - BufFileClose(batch->input_file); - aggstate->current_phase = 0; aggstate->phase = &aggstate->phases[aggstate->current_phase]; @@ -2690,6 +2690,9 @@ hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_groups, { int npartitions; int partition_bits; + int i; + int j; + int old_npartitions; npartitions = hash_choose_num_spill_partitions(input_groups, hashentrysize); @@ -2702,10 +2705,33 @@ hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_groups, /* number of partitions will be a power of two */ npartitions = 1L << partition_bits; - spill->partition_bits = partition_bits; - spill->n_partitions = npartitions; - spill->partitions = palloc0(sizeof(BufFile *) * npartitions); - spill->ntuples = palloc0(sizeof(int64) * npartitions); + if (spill->lts == NULL) + { + spill->partition_bits = partition_bits; + spill->n_partitions = npartitions; + spill->partitions = palloc0(sizeof(int) * npartitions); + for (i = 0; i < spill->n_partitions; ++i) + { + spill->partitions[i] = i; + } + spill->ntuples = palloc0(sizeof(int64) * spill->n_partitions); + spill->lts = LogicalTapeSetCreate(npartitions, NULL, NULL, 0); // TODO: worker is 0? + } + else // respill + { + old_npartitions = LogicalTapeGetNTapes(spill->lts); + spill->partition_bits = my_log2(npartitions); + spill->n_partitions = (1L << spill->partition_bits); + spill->partitions = palloc0(sizeof(int) * npartitions); + j = old_npartitions; + for (i = 0; i < spill->n_partitions; ++i) + { + spill->partitions[i] = j; + j++; + } + spill->ntuples = palloc0(sizeof(int64) * spill->n_partitions); + spill->lts = LogicalTapeSetExtend(spill->lts, spill->n_partitions); + } } /* @@ -2720,8 +2746,6 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, { int partition; MinimalTuple tuple; - BufFile *file; - int written; int total_written = 0; bool shouldFree; @@ -2743,23 +2767,11 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, spill->ntuples[partition]++; - if (spill->partitions[partition] == NULL) - spill->partitions[partition] = BufFileCreateTemp(false); - file = spill->partitions[partition]; - - written = BufFileWrite(file, (void *) &hash, sizeof(uint32)); - if (written != sizeof(uint32)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to HashAgg temporary file: %m"))); - total_written += written; + LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) &hash, sizeof(uint32)); + total_written += sizeof(uint32); - written = BufFileWrite(file, (void *) tuple, tuple->t_len); - if (written != tuple->t_len) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to HashAgg temporary file: %m"))); - total_written += written; + LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) tuple, tuple->t_len); + total_written += tuple->t_len; if (shouldFree) pfree(tuple); @@ -2772,38 +2784,37 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, * read the next tuple from a batch file. Return NULL if no more. */ static MinimalTuple -hash_read_spilled(BufFile *file, uint32 *hashp) +hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 *hashp) { MinimalTuple tuple; uint32 t_len; size_t nread; uint32 hash; - nread = BufFileRead(file, &hash, sizeof(uint32)); + nread = LogicalTapeRead(lts, tapenum, &hash, sizeof(uint32)); if (nread == 0) return NULL; if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the hash from HashAgg spilled tape: %m"))); if (hashp != NULL) *hashp = hash; - nread = BufFileRead(file, &t_len, sizeof(t_len)); + nread = LogicalTapeRead(lts, tapenum, &t_len, sizeof(t_len)); if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the t_len from HashAgg spilled tape: %m"))); tuple = (MinimalTuple) palloc(t_len); tuple->t_len = t_len; - nread = BufFileRead(file, (void *)((char *)tuple + sizeof(uint32)), - t_len - sizeof(uint32)); + nread = LogicalTapeRead(lts, tapenum, (void *)((char *)tuple + sizeof(uint32)), t_len - sizeof(uint32)); if (nread != t_len - sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the data from HashAgg spilled tape: %m"))); return tuple; } @@ -2815,15 +2826,17 @@ hash_read_spilled(BufFile *file, uint32 *hashp) * be done. Should be called in the aggregate's memory context. */ static HashAggBatch * -hash_batch_new(BufFile *input_file, int setno, int64 input_tuples, +hash_batch_new(LogicalTapeSet *lts, int tapenum, int setno, int64 input_tuples, int input_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); - batch->input_file = input_file; + batch->input_tape = tapenum; batch->input_bits = input_bits; batch->input_tuples = input_tuples; batch->setno = setno; + batch->lts = lts; + batch->spill.lts = lts; // share same logical tape set /* batch->spill will be set only after spilling this batch */ @@ -2860,7 +2873,7 @@ hash_finish_initial_spills(AggState *aggstate) /* * hash_spill_finish * - * Transform spill files into new batches. + * Transform spill files into new batches. // XXX so the partitions are empty and ready to be reused */ static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_bits) @@ -2872,28 +2885,20 @@ hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_ for (i = 0; i < spill->n_partitions; i++) { - BufFile *file = spill->partitions[i]; MemoryContext oldContext; HashAggBatch *new_batch; - /* partition is empty */ - if (file == NULL) - continue; - - /* rewind file for reading */ - if (BufFileSeek(file, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind HashAgg temporary file: %m"))); - oldContext = MemoryContextSwitchTo(aggstate->ss.ps.state->es_query_cxt); - new_batch = hash_batch_new(file, setno, spill->ntuples[i], - spill->partition_bits + input_bits); + LogicalTapeRewindForRead(spill->lts, spill->partitions[i], 0); + new_batch = hash_batch_new(spill->lts, spill->partitions[i], setno, spill->ntuples[i], + spill->partition_bits + input_bits); aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch); aggstate->hash_batches_used++; MemoryContextSwitchTo(oldContext); } + if (!list_member_ptr(aggstate->lts_list, spill->lts)) + aggstate->lts_list = lappend(aggstate->lts_list, spill->lts); pfree(spill->ntuples); pfree(spill->partitions); } @@ -2904,13 +2909,10 @@ hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_ static void hash_reset_spill(HashAggSpill *spill) { - int i; - for (i = 0; i < spill->n_partitions; i++) + if (spill->lts != NULL) { - BufFile *file = spill->partitions[i]; - - if (file != NULL) - BufFileClose(file); + LogicalTapeSetClose(spill->lts); + spill->lts = NULL; } if (spill->ntuples != NULL) pfree(spill->ntuples); @@ -2940,16 +2942,19 @@ hash_reset_spills(AggState *aggstate) foreach(lc, aggstate->hash_batches) { HashAggBatch *batch = (HashAggBatch*) lfirst(lc); - if (batch->input_file != NULL) - { - BufFileClose(batch->input_file); - batch->input_file = NULL; - } hash_reset_spill(&batch->spill); pfree(batch); } list_free(aggstate->hash_batches); aggstate->hash_batches = NIL; + + foreach(lc, aggstate->lts_list) + { + LogicalTapeSet *lts = (LogicalTapeSet *) lfirst(lc); + LogicalTapeSetClose(lts); + } + list_free(aggstate->lts_list); + aggstate->lts_list = NIL; } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 8985b9e095..677b992743 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -202,7 +202,7 @@ struct LogicalTapeSet /* The array of logical tapes. */ int nTapes; /* # of logical tapes in set */ - LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */ + LogicalTape *tapes; /* has nTapes nentries */ }; static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); @@ -518,8 +518,8 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, * Create top-level struct including per-tape LogicalTape structs. */ Assert(ntapes > 0); - lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) + - ntapes * sizeof(LogicalTape)); + lts = (LogicalTapeSet *) palloc0(sizeof(LogicalTapeSet)); + lts->tapes = (LogicalTape *)palloc0(ntapes * sizeof(LogicalTape)); lts->nBlocksAllocated = 0L; lts->nBlocksWritten = 0L; lts->nHoleBlocks = 0L; @@ -577,6 +577,45 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, return lts; } +LogicalTapeSet * +LogicalTapeSetExtend(LogicalTapeSet *lts, int ntoextend) +{ + LogicalTape *lt; + int i; + + /* + * Create top-level struct including per-tape LogicalTape structs. + */ + Assert(ntoextend > 0); + lts->tapes = (LogicalTape *) repalloc(lts->tapes, (lts->nTapes + ntoextend) * sizeof(LogicalTape)); + lts->nTapes = lts->nTapes + ntoextend; + + /* + * Initialize per-tape structs. Note we allocate the I/O buffer and the + * first block for a tape only when it is first actually written to. This + * avoids wasting memory space when we overestimate the number of tapes needed. + */ + for (i = lts->nTapes - ntoextend; i < lts->nTapes; i++) + { + lt = <s->tapes[i]; + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; + } + + return lts; +} + /* * Close a logical tape set and release all resources. */ @@ -1083,3 +1122,9 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nBlocksAllocated - lts->nHoleBlocks; } + +int +LogicalTapeGetNTapes(LogicalTapeSet *lts) +{ + return lts->nTapes; +} diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 8d4a36a353..d45473101c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2083,6 +2083,7 @@ typedef struct AggState uint64 hash_disk_used; /* bytes of disk space used */ int hash_batches_used; /* batches used during entire execution */ List *hash_batches; /* hash batches remaining to be processed */ + List *lts_list; AggStatePerHash perhash; /* array of per-hashtable data */ AggStatePerGroup *hash_pergroup; /* grouping set indexed array of diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 081b03880a..c2f5c72665 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -56,6 +56,7 @@ typedef struct TapeShare extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker); +extern LogicalTapeSet * LogicalTapeSetExtend(LogicalTapeSet *lts, int ntoextend); extern void LogicalTapeSetClose(LogicalTapeSet *lts); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, @@ -74,5 +75,6 @@ extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); +extern int LogicalTapeGetNTapes(LogicalTapeSet *lts); #endif /* LOGTAPE_H */