On Wed, 2020-01-29 at 14:48 -0800, Jeff Davis wrote: > 2) Use a different structure more capable of handling a large > fraction > of free space. A compressed bitmap might make sense, but that seems > like overkill to waste effort tracking a lot of space that is > unlikely > to ever be used.
I ended up converting the freelist to a min heap. Attached is a patch which makes three changes to better support HashAgg: 1. Use a minheap for the freelist. The original design used an array that had to be sorted between a read (which frees a block) and a write (which needs to sort the array to consume the lowest block number). The comments said: * sorted. This is an efficient way to handle it because we expect cycles * of releasing many blocks followed by re-using many blocks, due to * the larger read buffer. But I didn't find a case where that actually wins over a simple minheap. With that in mind, a minheap seems closer to what one might expect for that purpose, and more robust when the assumptions don't hold up as well. If someone knows of a case where the re-sorting behavior is important, please let me know. Changing to a minheap effectively solves the problem for HashAgg, though in theory the memory consumption of the freelist itself could become significant (though it's only 0.1% of the free space being tracked). 2. Lazily-allocate the read buffer. The write buffer was always lazily- allocated, so this patch creates better symmetry. More importantly, it means freshly-rewound tapes don't have any buffer allocated, so it greatly expands the number of tapes that can be managed efficiently as long as only a limited number are active at once. 3. Allow expanding the number of tapes for an existing tape set. This is useful for HashAgg, which doesn't know how many tapes will be needed in advance. Regards, Jeff Davis
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 42cfb1f9f98..20b27b3558b 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -49,12 +49,8 @@ * when reading, and read multiple blocks from the same tape in one go, * whenever the buffer becomes empty. * - * To support the above policy of writing to the lowest free block, - * ltsGetFreeBlock sorts the list of free block numbers into decreasing - * order each time it is asked for a block and the list isn't currently - * sorted. This is an efficient way to handle it because we expect cycles - * of releasing many blocks followed by re-using many blocks, due to - * the larger read buffer. + * To support the above policy of writing to the lowest free block, the + * freelist is a min heap. * * Since all the bookkeeping and buffer memory is allocated with palloc(), * and the underlying file(s) are made with OpenTemporaryFile, all resources @@ -170,7 +166,7 @@ struct LogicalTapeSet /* * File size tracking. nBlocksWritten is the size of the underlying file, * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated - * by ltsGetFreeBlock(), and it is always greater than or equal to + * by ltsReleaseBlock(), and it is always greater than or equal to * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are * blocks that have been allocated for a tape, but have not been written * to the underlying file yet. nHoleBlocks tracks the total number of @@ -188,15 +184,9 @@ struct LogicalTapeSet * If forgetFreeSpace is true then any freed blocks are simply forgotten * rather than being remembered in freeBlocks[]. See notes for * LogicalTapeSetForgetFreeSpace(). - * - * If blocksSorted is true then the block numbers in freeBlocks are in - * *decreasing* order, so that removing the last entry gives us the lowest - * free block. We re-sort the blocks whenever a block is demanded; this - * should be reasonably efficient given the expected usage pattern. */ bool forgetFreeSpace; /* are we remembering free blocks? */ - bool blocksSorted; /* is freeBlocks[] currently in order? */ - long *freeBlocks; /* resizable array */ + long *freeBlocks; /* resizable array holding minheap */ int nFreeBlocks; /* # of currently free blocks */ int freeBlocksLen; /* current allocated length of freeBlocks[] */ @@ -211,6 +201,7 @@ static long ltsGetFreeBlock(LogicalTapeSet *lts); static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset); +static void ltsInitTape(LogicalTape *lt); /* @@ -321,46 +312,88 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt) return (lt->nbytes > 0); } -/* - * qsort comparator for sorting freeBlocks[] into decreasing order. - */ -static int -freeBlocks_cmp(const void *a, const void *b) +static inline void +swap_nodes(long *heap, int a, int b) { - long ablk = *((const long *) a); - long bblk = *((const long *) b); - - /* can't just subtract because long might be wider than int */ - if (ablk < bblk) - return 1; - if (ablk > bblk) - return -1; - return 0; + long swap; + + swap = heap[a]; + heap[a] = heap[b]; + heap[b] = swap; +} + +static inline int +left_offset(int i) +{ + return 2 * i + 1; +} + +static inline int +right_offset(int i) +{ + return 2 * i + 2; +} + +static inline int +parent_offset(int i) +{ + return (i - 1) / 2; } /* - * Select a currently unused block for writing to. + * Select the lowest currently unused block by taking the first element from + * the freelist min heap. */ static long ltsGetFreeBlock(LogicalTapeSet *lts) { - /* - * If there are multiple free blocks, we select the one appearing last in - * freeBlocks[] (after sorting the array if needed). If there are none, - * assign the next block at the end of the file. - */ - if (lts->nFreeBlocks > 0) + long *heap = lts->freeBlocks; + long blocknum; + int heapsize; + int pos; + + /* freelist empty; allocate a new block */ + if (lts->nFreeBlocks == 0) + return lts->nBlocksAllocated++; + + if (lts->nFreeBlocks == 1) { - if (!lts->blocksSorted) - { - qsort((void *) lts->freeBlocks, lts->nFreeBlocks, - sizeof(long), freeBlocks_cmp); - lts->blocksSorted = true; - } - return lts->freeBlocks[--lts->nFreeBlocks]; + lts->nFreeBlocks--; + return lts->freeBlocks[0]; } - else - return lts->nBlocksAllocated++; + + /* take top of minheap */ + blocknum = heap[0]; + + /* replace with end of minheap array */ + heap[0] = heap[--lts->nFreeBlocks]; + + /* sift down */ + pos = 0; + heapsize = lts->nFreeBlocks; + while (true) + { + int left = left_offset(pos); + int right = right_offset(pos); + int min_child; + + if (left < heapsize && right < heapsize) + min_child = (heap[left] < heap[right]) ? left : right; + else if (left < heapsize) + min_child = left; + else if (right < heapsize) + min_child = right; + else + break; + + if (heap[min_child] >= heap[pos]) + break; + + swap_nodes(heap, min_child, pos); + pos = min_child; + } + + return blocknum; } /* @@ -369,7 +402,8 @@ ltsGetFreeBlock(LogicalTapeSet *lts) static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) { - int ndx; + long *heap; + int pos; /* * Do nothing if we're no longer interested in remembering free space. @@ -387,14 +421,23 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) lts->freeBlocksLen * sizeof(long)); } - /* - * Add blocknum to array, and mark the array unsorted if it's no longer in - * decreasing order. - */ - ndx = lts->nFreeBlocks++; - lts->freeBlocks[ndx] = blocknum; - if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum) - lts->blocksSorted = false; + heap = lts->freeBlocks; + pos = lts->nFreeBlocks; + + /* place entry at end of minheap array */ + heap[pos] = blocknum; + lts->nFreeBlocks++; + + /* sift up */ + while (pos != 0) + { + int parent = parent_offset(pos); + if (heap[parent] < heap[pos]) + break; + + swap_nodes(heap, parent, pos); + pos = parent; + } } /* @@ -486,6 +529,30 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks; } +/* + * Initialize per-tape struct. Note we allocate the I/O buffer and the first + * block for a tape only when it is first actually written to. This avoids + * wasting memory space when tuplesort.c overestimates the number of tapes + * needed. + */ +static void +ltsInitTape(LogicalTape *lt) +{ + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; +} + /* * Create a set of logical tapes in a temporary underlying file. * @@ -511,7 +578,6 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker) { LogicalTapeSet *lts; - LogicalTape *lt; int i; /* @@ -524,35 +590,13 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, lts->nBlocksWritten = 0L; lts->nHoleBlocks = 0L; lts->forgetFreeSpace = false; - lts->blocksSorted = true; /* a zero-length array is sorted ... */ lts->freeBlocksLen = 32; /* reasonable initial guess */ lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); lts->nFreeBlocks = 0; lts->nTapes = ntapes; - /* - * Initialize per-tape structs. Note we allocate the I/O buffer and the - * first block for a tape only when it is first actually written to. This - * avoids wasting memory space when tuplesort.c overestimates the number - * of tapes needed. - */ for (i = 0; i < ntapes; i++) - { - lt = <s->tapes[i]; - lt->writing = true; - lt->frozen = false; - lt->dirty = false; - lt->firstBlockNumber = -1L; - lt->curBlockNumber = -1L; - lt->nextBlockNumber = -1L; - lt->offsetBlockNumber = 0L; - lt->buffer = NULL; - lt->buffer_size = 0; - /* palloc() larger than MaxAllocSize would fail */ - lt->max_size = MaxAllocSize; - lt->pos = 0; - lt->nbytes = 0; - } + ltsInitTape(<s->tapes[i]); /* * Create temp BufFile storage as required. @@ -773,15 +817,12 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) lt->buffer_size = 0; if (lt->firstBlockNumber != -1L) { - lt->buffer = palloc(buffer_size); + /* + * The buffer is lazily allocated in LogicalTapeRead(), but we set the + * size here. + */ lt->buffer_size = buffer_size; } - - /* Read the first block, or reset if tape is empty */ - lt->nextBlockNumber = lt->firstBlockNumber; - lt->pos = 0; - lt->nbytes = 0; - ltsReadFillBuffer(lts, lt); } /* @@ -830,6 +871,22 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, lt = <s->tapes[tapenum]; Assert(!lt->writing); + if (lt->buffer == NULL) + { + /* lazily allocate buffer */ + if (lt->firstBlockNumber != -1L) + { + Assert(lt->buffer_size > 0); + lt->buffer = palloc(lt->buffer_size); + } + + /* Read the first block, or reset if tape is empty */ + lt->nextBlockNumber = lt->firstBlockNumber; + lt->pos = 0; + lt->nbytes = 0; + ltsReadFillBuffer(lts, lt); + } + while (size > 0) { if (lt->pos >= lt->nbytes) @@ -943,6 +1000,28 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) } } +/* + * Add additional tapes to this tape set. + */ +LogicalTapeSet * +LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional) +{ + int i; + int nTapesOrig = lts->nTapes; + Size newSize; + + lts->nTapes += nAdditional; + newSize = offsetof(LogicalTapeSet, tapes) + + lts->nTapes * sizeof(LogicalTape); + + lts = (LogicalTapeSet *) repalloc(lts, newSize); + + for (i = nTapesOrig; i < lts->nTapes; i++) + ltsInitTape(<s->tapes[i]); + + return lts; +} + /* * Backspace the tape a given number of bytes. (We also support a more * general seek interface, see below.) diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 695d2c00ee4..3ebe52239f8 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -67,6 +67,8 @@ extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share); +extern LogicalTapeSet *LogicalTapeSetExtend(LogicalTapeSet *lts, + int nAdditional); extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,