On Tue, 2020-02-04 at 15:08 -0800, Peter Geoghegan wrote: > Have you tested this against tuplesort.c, particularly parallel > CREATE > INDEX? It would be worth trying to measure any performance impact. > Note that most parallel CREATE INDEX tuplesorts will do a merge > within > each worker, and one big merge in the leader. It's much more likely > to > have multiple passes than a regular serial external sort.
I did not observe any performance regression when creating an index in parallel over 20M ints (random ints in random order). I tried 2 parallel workers with work_mem=4MB and also 4 parallel workers with work_mem=256kB. > Have you thought about integer overflow in your heap related > routines? > This isn't as unlikely as you might think. See commit 512f67c8, for > example. It's dealing with blocks rather than tuples, so it's a bit less likely. But changed it to use "unsigned long" instead. > Have you thought about the MaxAllocSize restriction as it concerns > lts->freeBlocks? Will that be okay when you have many more tapes than > before? I added a check. If it exceeds MaxAllocSize, before trying to perform the allocation, just leak the block rather than adding it to the freelist. Perhaps there's a usecase for an extraordinarily-long freelist, but it's outside the scope of this patch. > LogicalTapeSetExtend() seems to work in a way that assumes that the > tape is frozen. It would be good to document that assumption, and > possible enforce it by way of an assertion. The same remark applies > to > any other assumptions you're making there. Can you explain? I am not freezing any tapes in Hash Aggregation, so what about LogicalTapeSetExtend() assumes the tape is frozen? Attached new logtape.c patches. Regards, Jeff Davis
From d3593ff34c83c20c75165624faf6d84803390b36 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Fri, 31 Jan 2020 16:43:41 -0800 Subject: [PATCH 1/3] Logical Tape Set: lazily allocate read buffer. The write buffer was already lazily-allocated, so this is more symmetric. It also means that a freshly-rewound tape (whether for reading or writing) is not consuming memory for the buffer. --- src/backend/utils/sort/logtape.c | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 42cfb1f9f98..ba6d6e1f80a 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -773,15 +773,12 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size) lt->buffer_size = 0; if (lt->firstBlockNumber != -1L) { - lt->buffer = palloc(buffer_size); + /* + * The buffer is lazily allocated in LogicalTapeRead(), but we set the + * size here. + */ lt->buffer_size = buffer_size; } - - /* Read the first block, or reset if tape is empty */ - lt->nextBlockNumber = lt->firstBlockNumber; - lt->pos = 0; - lt->nbytes = 0; - ltsReadFillBuffer(lts, lt); } /* @@ -830,6 +827,22 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, lt = <s->tapes[tapenum]; Assert(!lt->writing); + if (lt->buffer == NULL) + { + /* lazily allocate buffer */ + if (lt->firstBlockNumber != -1L) + { + Assert(lt->buffer_size > 0); + lt->buffer = palloc(lt->buffer_size); + } + + /* Read the first block, or reset if tape is empty */ + lt->nextBlockNumber = lt->firstBlockNumber; + lt->pos = 0; + lt->nbytes = 0; + ltsReadFillBuffer(lts, lt); + } + while (size > 0) { if (lt->pos >= lt->nbytes) -- 2.17.1
From 667335c98e3ee830b042801b29b62053325070d2 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Fri, 31 Jan 2020 16:44:40 -0800 Subject: [PATCH 2/3] Logical Tape Set: change freelist to min heap. Previously, the freelist of blocks was tracked as an occasionally-sorted array. A min heap is more resilient to larger freelists or more frequent changes between reading and writing. --- src/backend/utils/sort/logtape.c | 160 ++++++++++++++++++++----------- 1 file changed, 104 insertions(+), 56 deletions(-) diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index ba6d6e1f80a..8d934f6d44e 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -49,12 +49,8 @@ * when reading, and read multiple blocks from the same tape in one go, * whenever the buffer becomes empty. * - * To support the above policy of writing to the lowest free block, - * ltsGetFreeBlock sorts the list of free block numbers into decreasing - * order each time it is asked for a block and the list isn't currently - * sorted. This is an efficient way to handle it because we expect cycles - * of releasing many blocks followed by re-using many blocks, due to - * the larger read buffer. + * To support the above policy of writing to the lowest free block, the + * freelist is a min heap. * * Since all the bookkeeping and buffer memory is allocated with palloc(), * and the underlying file(s) are made with OpenTemporaryFile, all resources @@ -170,7 +166,7 @@ struct LogicalTapeSet /* * File size tracking. nBlocksWritten is the size of the underlying file, * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated - * by ltsGetFreeBlock(), and it is always greater than or equal to + * by ltsReleaseBlock(), and it is always greater than or equal to * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are * blocks that have been allocated for a tape, but have not been written * to the underlying file yet. nHoleBlocks tracks the total number of @@ -188,17 +184,11 @@ struct LogicalTapeSet * If forgetFreeSpace is true then any freed blocks are simply forgotten * rather than being remembered in freeBlocks[]. See notes for * LogicalTapeSetForgetFreeSpace(). - * - * If blocksSorted is true then the block numbers in freeBlocks are in - * *decreasing* order, so that removing the last entry gives us the lowest - * free block. We re-sort the blocks whenever a block is demanded; this - * should be reasonably efficient given the expected usage pattern. */ bool forgetFreeSpace; /* are we remembering free blocks? */ - bool blocksSorted; /* is freeBlocks[] currently in order? */ - long *freeBlocks; /* resizable array */ - int nFreeBlocks; /* # of currently free blocks */ - int freeBlocksLen; /* current allocated length of freeBlocks[] */ + long *freeBlocks; /* resizable array holding minheap */ + long nFreeBlocks; /* # of currently free blocks */ + Size freeBlocksLen; /* current allocated length of freeBlocks[] */ /* The array of logical tapes. */ int nTapes; /* # of logical tapes in set */ @@ -321,46 +311,88 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt) return (lt->nbytes > 0); } -/* - * qsort comparator for sorting freeBlocks[] into decreasing order. - */ -static int -freeBlocks_cmp(const void *a, const void *b) +static inline void +swap_nodes(long *heap, unsigned long a, unsigned long b) +{ + unsigned long swap; + + swap = heap[a]; + heap[a] = heap[b]; + heap[b] = swap; +} + +static inline unsigned long +left_offset(unsigned long i) +{ + return 2 * i + 1; +} + +static inline unsigned long +right_offset(unsigned i) +{ + return 2 * i + 2; +} + +static inline unsigned long +parent_offset(unsigned long i) { - long ablk = *((const long *) a); - long bblk = *((const long *) b); - - /* can't just subtract because long might be wider than int */ - if (ablk < bblk) - return 1; - if (ablk > bblk) - return -1; - return 0; + return (i - 1) / 2; } /* - * Select a currently unused block for writing to. + * Select the lowest currently unused block by taking the first element from + * the freelist min heap. */ static long ltsGetFreeBlock(LogicalTapeSet *lts) { - /* - * If there are multiple free blocks, we select the one appearing last in - * freeBlocks[] (after sorting the array if needed). If there are none, - * assign the next block at the end of the file. - */ - if (lts->nFreeBlocks > 0) + long *heap = lts->freeBlocks; + long blocknum; + int heapsize; + unsigned long pos; + + /* freelist empty; allocate a new block */ + if (lts->nFreeBlocks == 0) + return lts->nBlocksAllocated++; + + if (lts->nFreeBlocks == 1) { - if (!lts->blocksSorted) - { - qsort((void *) lts->freeBlocks, lts->nFreeBlocks, - sizeof(long), freeBlocks_cmp); - lts->blocksSorted = true; - } - return lts->freeBlocks[--lts->nFreeBlocks]; + lts->nFreeBlocks--; + return lts->freeBlocks[0]; } - else - return lts->nBlocksAllocated++; + + /* take top of minheap */ + blocknum = heap[0]; + + /* replace with end of minheap array */ + heap[0] = heap[--lts->nFreeBlocks]; + + /* sift down */ + pos = 0; + heapsize = lts->nFreeBlocks; + while (true) + { + unsigned long left = left_offset(pos); + unsigned long right = right_offset(pos); + unsigned long min_child; + + if (left < heapsize && right < heapsize) + min_child = (heap[left] < heap[right]) ? left : right; + else if (left < heapsize) + min_child = left; + else if (right < heapsize) + min_child = right; + else + break; + + if (heap[min_child] >= heap[pos]) + break; + + swap_nodes(heap, min_child, pos); + pos = min_child; + } + + return blocknum; } /* @@ -369,7 +401,8 @@ ltsGetFreeBlock(LogicalTapeSet *lts) static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) { - int ndx; + long *heap; + unsigned long pos; /* * Do nothing if we're no longer interested in remembering free space. @@ -382,19 +415,35 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) */ if (lts->nFreeBlocks >= lts->freeBlocksLen) { + /* + * If the freelist becomes very large, just return and leak this free + * block. + */ + if (lts->freeBlocksLen * 2 > MaxAllocSize) + return; + lts->freeBlocksLen *= 2; lts->freeBlocks = (long *) repalloc(lts->freeBlocks, lts->freeBlocksLen * sizeof(long)); } - /* - * Add blocknum to array, and mark the array unsorted if it's no longer in - * decreasing order. - */ - ndx = lts->nFreeBlocks++; - lts->freeBlocks[ndx] = blocknum; - if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum) - lts->blocksSorted = false; + heap = lts->freeBlocks; + pos = lts->nFreeBlocks; + + /* place entry at end of minheap array */ + heap[pos] = blocknum; + lts->nFreeBlocks++; + + /* sift up */ + while (pos != 0) + { + unsigned long parent = parent_offset(pos); + if (heap[parent] < heap[pos]) + break; + + swap_nodes(heap, parent, pos); + pos = parent; + } } /* @@ -524,7 +573,6 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, lts->nBlocksWritten = 0L; lts->nHoleBlocks = 0L; lts->forgetFreeSpace = false; - lts->blocksSorted = true; /* a zero-length array is sorted ... */ lts->freeBlocksLen = 32; /* reasonable initial guess */ lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); lts->nFreeBlocks = 0; -- 2.17.1
From 6e7b9f8b246150e36e7f7df513dbfcbef9f6102d Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Fri, 31 Jan 2020 16:42:38 -0800 Subject: [PATCH 3/3] Logical Tape Set: add API to extend with additional tapes. --- src/backend/utils/sort/logtape.c | 71 +++++++++++++++++++++----------- src/include/utils/logtape.h | 2 + 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 8d934f6d44e..7556abdb4aa 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -201,6 +201,7 @@ static long ltsGetFreeBlock(LogicalTapeSet *lts); static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, SharedFileSet *fileset); +static void ltsInitTape(LogicalTape *lt); /* @@ -535,6 +536,30 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks; } +/* + * Initialize per-tape struct. Note we allocate the I/O buffer and the first + * block for a tape only when it is first actually written to. This avoids + * wasting memory space when tuplesort.c overestimates the number of tapes + * needed. + */ +static void +ltsInitTape(LogicalTape *lt) +{ + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; +} + /* * Create a set of logical tapes in a temporary underlying file. * @@ -560,7 +585,6 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker) { LogicalTapeSet *lts; - LogicalTape *lt; int i; /* @@ -578,29 +602,8 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, lts->nFreeBlocks = 0; lts->nTapes = ntapes; - /* - * Initialize per-tape structs. Note we allocate the I/O buffer and the - * first block for a tape only when it is first actually written to. This - * avoids wasting memory space when tuplesort.c overestimates the number - * of tapes needed. - */ for (i = 0; i < ntapes; i++) - { - lt = <s->tapes[i]; - lt->writing = true; - lt->frozen = false; - lt->dirty = false; - lt->firstBlockNumber = -1L; - lt->curBlockNumber = -1L; - lt->nextBlockNumber = -1L; - lt->offsetBlockNumber = 0L; - lt->buffer = NULL; - lt->buffer_size = 0; - /* palloc() larger than MaxAllocSize would fail */ - lt->max_size = MaxAllocSize; - lt->pos = 0; - lt->nbytes = 0; - } + ltsInitTape(<s->tapes[i]); /* * Create temp BufFile storage as required. @@ -1004,6 +1007,28 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share) } } +/* + * Add additional tapes to this tape set. + */ +LogicalTapeSet * +LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional) +{ + int i; + int nTapesOrig = lts->nTapes; + Size newSize; + + lts->nTapes += nAdditional; + newSize = offsetof(LogicalTapeSet, tapes) + + lts->nTapes * sizeof(LogicalTape); + + lts = (LogicalTapeSet *) repalloc(lts, newSize); + + for (i = nTapesOrig; i < lts->nTapes; i++) + ltsInitTape(<s->tapes[i]); + + return lts; +} + /* * Backspace the tape a given number of bytes. (We also support a more * general seek interface, see below.) diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 695d2c00ee4..3ebe52239f8 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -67,6 +67,8 @@ extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share); +extern LogicalTapeSet *LogicalTapeSetExtend(LogicalTapeSet *lts, + int nAdditional); extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size); extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, -- 2.17.1