On Sat, Sep 02, 2023 at 11:55:21AM -0700, Nathan Bossart wrote: > I ended up hacking together a (nowhere near committable) patch to see how > hard it would be to allow using any type with binaryheap. It doesn't seem > too bad.
I spent some more time on this patch and made the relevant adjustments to the rest of the set. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 3a51f25ae712b792bdde90b133c09b0b940f4198 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 4 Sep 2023 15:04:40 -0700 Subject: [PATCH v7 1/5] Allow binaryheap to use any type. We would like to make binaryheap available to frontend code in a future commit, but presently the implementation uses Datum for the node type, which is inaccessible in the frontend. This commit modifies the implementation to allow using any type for the nodes. binaryheap_allocate() now requires callers to specify the size of the node, and several of the other functions now use void pointers. --- src/backend/executor/nodeGatherMerge.c | 19 ++-- src/backend/executor/nodeMergeAppend.c | 22 ++--- src/backend/lib/binaryheap.c | 99 +++++++++++-------- src/backend/postmaster/pgarch.c | 36 ++++--- .../replication/logical/reorderbuffer.c | 21 ++-- src/backend/storage/buffer/bufmgr.c | 20 ++-- src/include/lib/binaryheap.h | 21 ++-- 7 files changed, 137 insertions(+), 101 deletions(-) diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 9d5e1a46e9..f76406b575 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -52,7 +52,7 @@ typedef struct GMReaderTupleBuffer } GMReaderTupleBuffer; static TupleTableSlot *ExecGatherMerge(PlanState *pstate); -static int32 heap_compare_slots(Datum a, Datum b, void *arg); +static int32 heap_compare_slots(const void *a, const void *b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done); @@ -429,6 +429,7 @@ gather_merge_setup(GatherMergeState *gm_state) /* Allocate the resources for the merge */ gm_state->gm_heap = binaryheap_allocate(nreaders + 1, + sizeof(int), heap_compare_slots, gm_state); } @@ -489,7 +490,7 @@ reread: /* Don't have a tuple yet, try to get one */ if (gather_merge_readnext(gm_state, i, nowait)) binaryheap_add_unordered(gm_state->gm_heap, - Int32GetDatum(i)); + &i); } else { @@ -565,14 +566,14 @@ gather_merge_getnext(GatherMergeState *gm_state) * the heap, because it might now compare differently against the * other elements of the heap. */ - i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + binaryheap_first(gm_state->gm_heap, &i); if (gather_merge_readnext(gm_state, i, false)) - binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); + binaryheap_replace_first(gm_state->gm_heap, &i); else { /* reader exhausted, remove it from heap */ - (void) binaryheap_remove_first(gm_state->gm_heap); + binaryheap_remove_first(gm_state->gm_heap, NULL); } } @@ -585,7 +586,7 @@ gather_merge_getnext(GatherMergeState *gm_state) else { /* Return next tuple from whichever participant has the leading one */ - i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + binaryheap_first(gm_state->gm_heap, &i); return gm_state->gm_slots[i]; } } @@ -750,11 +751,11 @@ typedef int32 SlotNumber; * Compare the tuples in the two given slots. */ static int32 -heap_compare_slots(Datum a, Datum b, void *arg) +heap_compare_slots(const void *a, const void *b, void *arg) { GatherMergeState *node = (GatherMergeState *) arg; - SlotNumber slot1 = DatumGetInt32(a); - SlotNumber slot2 = DatumGetInt32(b); + SlotNumber slot1 = *((const int *) a); + SlotNumber slot2 = *((const int *) b); TupleTableSlot *s1 = node->gm_slots[slot1]; TupleTableSlot *s2 = node->gm_slots[slot2]; diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 21b5726e6e..e0ace294a0 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -52,7 +52,7 @@ typedef int32 SlotNumber; static TupleTableSlot *ExecMergeAppend(PlanState *pstate); -static int heap_compare_slots(Datum a, Datum b, void *arg); +static int heap_compare_slots(const void *a, const void *b, void *arg); /* ---------------------------------------------------------------- @@ -125,8 +125,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) mergestate->ms_nplans = nplans; mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); - mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, - mergestate); + mergestate->ms_heap = binaryheap_allocate(nplans, sizeof(int), + heap_compare_slots, mergestate); /* * Miscellaneous initialization @@ -229,7 +229,7 @@ ExecMergeAppend(PlanState *pstate) { node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); + binaryheap_add_unordered(node->ms_heap, &i); } binaryheap_build(node->ms_heap); node->ms_initialized = true; @@ -244,12 +244,12 @@ ExecMergeAppend(PlanState *pstate) * by doing this before returning from the prior call, but it's better * to not pull tuples until necessary.) */ - i = DatumGetInt32(binaryheap_first(node->ms_heap)); + binaryheap_first(node->ms_heap, &i); node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); + binaryheap_replace_first(node->ms_heap, &i); else - (void) binaryheap_remove_first(node->ms_heap); + binaryheap_remove_first(node->ms_heap, NULL); } if (binaryheap_empty(node->ms_heap)) @@ -259,7 +259,7 @@ ExecMergeAppend(PlanState *pstate) } else { - i = DatumGetInt32(binaryheap_first(node->ms_heap)); + binaryheap_first(node->ms_heap, &i); result = node->ms_slots[i]; } @@ -270,11 +270,11 @@ ExecMergeAppend(PlanState *pstate) * Compare the tuples in the two given slots. */ static int32 -heap_compare_slots(Datum a, Datum b, void *arg) +heap_compare_slots(const void *a, const void *b, void *arg) { MergeAppendState *node = (MergeAppendState *) arg; - SlotNumber slot1 = DatumGetInt32(a); - SlotNumber slot2 = DatumGetInt32(b); + SlotNumber slot1 = *((const int *) a); + SlotNumber slot2 = *((const int *) b); TupleTableSlot *s1 = node->ms_slots[slot1]; TupleTableSlot *s2 = node->ms_slots[slot2]; diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c index 1737546757..6f63181c4c 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/backend/lib/binaryheap.c @@ -29,16 +29,19 @@ static void sift_up(binaryheap *heap, int node_off); * argument specified by 'arg'. */ binaryheap * -binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) +binaryheap_allocate(int capacity, size_t elem_size, + binaryheap_comparator compare, void *arg) { int sz; binaryheap *heap; - sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity; + sz = offsetof(binaryheap, bh_nodes) + elem_size * (capacity + 1); heap = (binaryheap *) palloc(sz); heap->bh_space = capacity; + heap->bh_elem_size = elem_size; heap->bh_compare = compare; heap->bh_arg = arg; + heap->bh_hole = &heap->bh_nodes[capacity * elem_size]; heap->bh_size = 0; heap->bh_has_heap_property = true; @@ -97,6 +100,27 @@ parent_offset(int i) return (i - 1) / 2; } +/* + * This utility function returns a pointer to the nth node of the binary heap. + */ +static inline void * +bh_node(binaryheap *heap, int n) +{ + return &heap->bh_nodes[n * heap->bh_elem_size]; +} + +/* + * This utility function sets the nth node of the binary heap to the value that + * d points to. + */ +static inline void +bh_set(binaryheap *heap, int n, const void *d) +{ + void *node = bh_node(heap, n); + + memmove(node, d, heap->bh_elem_size); +} + /* * binaryheap_add_unordered * @@ -106,12 +130,12 @@ parent_offset(int i) * afterwards. */ void -binaryheap_add_unordered(binaryheap *heap, Datum d) +binaryheap_add_unordered(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) elog(ERROR, "out of binary heap slots"); heap->bh_has_heap_property = false; - heap->bh_nodes[heap->bh_size] = d; + bh_set(heap, heap->bh_size, d); heap->bh_size++; } @@ -138,11 +162,11 @@ binaryheap_build(binaryheap *heap) * the heap property. */ void -binaryheap_add(binaryheap *heap, Datum d) +binaryheap_add(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) elog(ERROR, "out of binary heap slots"); - heap->bh_nodes[heap->bh_size] = d; + bh_set(heap, heap->bh_size, d); heap->bh_size++; sift_up(heap, heap->bh_size - 1); } @@ -154,11 +178,11 @@ binaryheap_add(binaryheap *heap, Datum d) * without modifying the heap. The caller must ensure that this * routine is not used on an empty heap. Always O(1). */ -Datum -binaryheap_first(binaryheap *heap) +void +binaryheap_first(binaryheap *heap, void *result) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); - return heap->bh_nodes[0]; + memmove(result, heap->bh_nodes, heap->bh_elem_size); } /* @@ -169,31 +193,28 @@ binaryheap_first(binaryheap *heap) * that this routine is not used on an empty heap. O(log n) worst * case. */ -Datum -binaryheap_remove_first(binaryheap *heap) +void +binaryheap_remove_first(binaryheap *heap, void *result) { - Datum result; - Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); /* extract the root node, which will be the result */ - result = heap->bh_nodes[0]; + if (result) + memmove(result, heap->bh_nodes, heap->bh_elem_size); /* easy if heap contains one element */ if (heap->bh_size == 1) { heap->bh_size--; - return result; + return; } /* * Remove the last node, placing it in the vacated root entry, and sift * the new root node down to its correct position. */ - heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size]; + bh_set(heap, 0, bh_node(heap, --heap->bh_size)); sift_down(heap, 0); - - return result; } /* @@ -204,11 +225,11 @@ binaryheap_remove_first(binaryheap *heap) * sifting the new node down. */ void -binaryheap_replace_first(binaryheap *heap, Datum d) +binaryheap_replace_first(binaryheap *heap, void *d) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); - heap->bh_nodes[0] = d; + bh_set(heap, 0, d); if (heap->bh_size > 1) sift_down(heap, 0); @@ -221,26 +242,26 @@ binaryheap_replace_first(binaryheap *heap, Datum d) static void sift_up(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + memmove(heap->bh_hole, bh_node(heap, node_off), heap->bh_elem_size); /* * Within the loop, the node_off'th array entry is a "hole" that - * notionally holds node_val, but we don't actually store node_val there - * till the end, saving some unnecessary data copying steps. + * notionally holds the swapped value, but we don't actually store the + * value there till the end, saving some unnecessary data copying steps. */ while (node_off != 0) { int cmp; int parent_off; - Datum parent_val; + void *parent_val; /* * If this node is smaller than its parent, the heap condition is * satisfied, and we're done. */ parent_off = parent_offset(node_off); - parent_val = heap->bh_nodes[parent_off]; - cmp = heap->bh_compare(node_val, + parent_val = bh_node(heap, parent_off); + cmp = heap->bh_compare(heap->bh_hole, parent_val, heap->bh_arg); if (cmp <= 0) @@ -250,11 +271,11 @@ sift_up(binaryheap *heap, int node_off) * Otherwise, swap the parent value with the hole, and go on to check * the node's new parent. */ - heap->bh_nodes[node_off] = parent_val; + bh_set(heap, node_off, parent_val); node_off = parent_off; } /* Re-fill the hole */ - heap->bh_nodes[node_off] = node_val; + bh_set(heap, node_off, heap->bh_hole); } /* @@ -264,12 +285,12 @@ sift_up(binaryheap *heap, int node_off) static void sift_down(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + memmove(heap->bh_hole, bh_node(heap, node_off), heap->bh_elem_size); /* * Within the loop, the node_off'th array entry is a "hole" that - * notionally holds node_val, but we don't actually store node_val there - * till the end, saving some unnecessary data copying steps. + * notionally holds the swapped value, but we don't actually store the + * value there till the end, saving some unnecessary data copying steps. */ while (true) { @@ -279,21 +300,21 @@ sift_down(binaryheap *heap, int node_off) /* Is the left child larger than the parent? */ if (left_off < heap->bh_size && - heap->bh_compare(node_val, - heap->bh_nodes[left_off], + heap->bh_compare(heap->bh_hole, + bh_node(heap, left_off), heap->bh_arg) < 0) swap_off = left_off; /* Is the right child larger than the parent? */ if (right_off < heap->bh_size && - heap->bh_compare(node_val, - heap->bh_nodes[right_off], + heap->bh_compare(heap->bh_hole, + bh_node(heap, right_off), heap->bh_arg) < 0) { /* swap with the larger child */ if (!swap_off || - heap->bh_compare(heap->bh_nodes[left_off], - heap->bh_nodes[right_off], + heap->bh_compare(bh_node(heap, left_off), + bh_node(heap, right_off), heap->bh_arg) < 0) swap_off = right_off; } @@ -309,9 +330,9 @@ sift_down(binaryheap *heap, int node_off) * Otherwise, swap the hole with the child that violates the heap * property; then go on to check its children. */ - heap->bh_nodes[node_off] = heap->bh_nodes[swap_off]; + bh_set(heap, node_off, bh_node(heap, swap_off)); node_off = swap_off; } /* Re-fill the hole */ - heap->bh_nodes[node_off] = node_val; + bh_set(heap, node_off, heap->bh_hole); } diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c index 46af349564..f7ee95d8f9 100644 --- a/src/backend/postmaster/pgarch.c +++ b/src/backend/postmaster/pgarch.c @@ -145,7 +145,7 @@ static bool pgarch_readyXlog(char *xlog); static void pgarch_archiveDone(char *xlog); static void pgarch_die(int code, Datum arg); static void HandlePgArchInterrupts(void); -static int ready_file_comparator(Datum a, Datum b, void *arg); +static int ready_file_comparator(const void *a, const void *b, void *arg); static void LoadArchiveLibrary(void); static void pgarch_call_module_shutdown_cb(int code, Datum arg); @@ -250,6 +250,7 @@ PgArchiverMain(void) /* Initialize our max-heap for prioritizing files to archive. */ arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN, + sizeof(char *), ready_file_comparator, NULL); /* Load the archive_library. */ @@ -631,22 +632,27 @@ pgarch_readyXlog(char *xlog) /* If the heap isn't full yet, quickly add it. */ arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size]; strcpy(arch_file, basename); - binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file)); + binaryheap_add_unordered(arch_files->arch_heap, &arch_file); /* If we just filled the heap, make it a valid one. */ if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN) binaryheap_build(arch_files->arch_heap); } - else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap), - CStringGetDatum(basename), NULL) > 0) + else { - /* - * Remove the lowest priority file and add the current one to the - * heap. - */ - arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap)); - strcpy(arch_file, basename); - binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file)); + char *root; + + binaryheap_first(arch_files->arch_heap, &root); + if (ready_file_comparator(&root, &basename, NULL) > 0) + { + /* + * Remove the lowest priority file and add the current one to + * the heap. + */ + binaryheap_remove_first(arch_files->arch_heap, &arch_file); + strcpy(arch_file, basename); + binaryheap_add(arch_files->arch_heap, &arch_file); + } } } FreeDir(rldir); @@ -668,7 +674,7 @@ pgarch_readyXlog(char *xlog) */ arch_files->arch_files_size = arch_files->arch_heap->bh_size; for (int i = 0; i < arch_files->arch_files_size; i++) - arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap)); + binaryheap_remove_first(arch_files->arch_heap, &arch_files->arch_files[i]); /* Return the highest priority file. */ arch_files->arch_files_size--; @@ -686,10 +692,10 @@ pgarch_readyXlog(char *xlog) * If "a" and "b" have equivalent values, 0 will be returned. */ static int -ready_file_comparator(Datum a, Datum b, void *arg) +ready_file_comparator(const void *a, const void *b, void *arg) { - char *a_str = DatumGetCString(a); - char *b_str = DatumGetCString(b); + const char *a_str = *((const char **) a); + const char *b_str = *((const char **) b); bool a_history = IsTLHistoryFileName(a_str); bool b_history = IsTLHistoryFileName(b_str); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 12edc5772a..e22680da0b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1223,11 +1223,11 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, * Binary heap comparison function. */ static int -ReorderBufferIterCompare(Datum a, Datum b, void *arg) +ReorderBufferIterCompare(const void *a, const void *b, void *arg) { ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg; - XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; - XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; + XLogRecPtr pos_a = state->entries[*((const int *) a)].lsn; + XLogRecPtr pos_b = state->entries[*((const int *) b)].lsn; if (pos_a < pos_b) return 1; @@ -1297,6 +1297,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, /* allocate heap */ state->heap = binaryheap_allocate(state->nr_txns, + sizeof(int), ReorderBufferIterCompare, state); @@ -1330,7 +1331,8 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, &off); + off++; } /* add subtransactions if they contain changes */ @@ -1359,7 +1361,8 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, &off); + off++; } } @@ -1384,7 +1387,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) if (state->heap->bh_size == 0) return NULL; - off = DatumGetInt32(binaryheap_first(state->heap)); + binaryheap_first(state->heap, &off); entry = &state->entries[off]; /* free memory we might have "leaked" in the previous *Next call */ @@ -1414,7 +1417,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + binaryheap_replace_first(state->heap, &off); return change; } @@ -1450,14 +1453,14 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* txn stays the same */ state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + binaryheap_replace_first(state->heap, &off); return change; } } /* ok, no changes there anymore, remove */ - binaryheap_remove_first(state->heap); + binaryheap_remove_first(state->heap, NULL); return change; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 3bd82dbfca..7c78a0d625 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -501,7 +501,7 @@ static void CheckForBufferLeaks(void); static int rlocator_comparator(const void *p1, const void *p2); static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb); static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b); -static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg); +static int ts_ckpt_progress_comparator(const void *a, const void *b, void *arg); /* @@ -2640,6 +2640,7 @@ BufferSync(int flags) * processed buffer is. */ ts_heap = binaryheap_allocate(num_spaces, + sizeof(CkptTsStatus *), ts_ckpt_progress_comparator, NULL); @@ -2649,7 +2650,7 @@ BufferSync(int flags) ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan; - binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat)); + binaryheap_add_unordered(ts_heap, &ts_stat); } binaryheap_build(ts_heap); @@ -2665,8 +2666,9 @@ BufferSync(int flags) while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; - CkptTsStatus *ts_stat = (CkptTsStatus *) - DatumGetPointer(binaryheap_first(ts_heap)); + CkptTsStatus *ts_stat; + + binaryheap_first(ts_heap, &ts_stat); buf_id = CkptBufferIds[ts_stat->index].buf_id; Assert(buf_id != -1); @@ -2708,12 +2710,12 @@ BufferSync(int flags) /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) { - binaryheap_remove_first(ts_heap); + binaryheap_remove_first(ts_heap, NULL); } else { /* update heap with the new progress */ - binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat)); + binaryheap_replace_first(ts_heap, &ts_stat); } /* @@ -5416,10 +5418,10 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b) * progress. */ static int -ts_ckpt_progress_comparator(Datum a, Datum b, void *arg) +ts_ckpt_progress_comparator(const void *a, const void *b, void *arg) { - CkptTsStatus *sa = (CkptTsStatus *) a; - CkptTsStatus *sb = (CkptTsStatus *) b; + const CkptTsStatus *sa = *((const CkptTsStatus **) a); + const CkptTsStatus *sb = *((const CkptTsStatus **) b); /* we want a min-heap, so return 1 for the a < b */ if (sa->progress < sb->progress) diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 52f7b06b25..b8c7ef81ef 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -15,7 +15,7 @@ * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b, * and >0 iff a > b. For a min-heap, the conditions are reversed. */ -typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg); +typedef int (*binaryheap_comparator) (const void *a, const void *b, void *arg); /* * binaryheap @@ -25,29 +25,32 @@ typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg); * bh_has_heap_property no unordered operations since last heap build * bh_compare comparison function to define the heap property * bh_arg user data for comparison function - * bh_nodes variable-length array of "space" nodes + * bh_hole extra node used during sifting operations + * bh_nodes variable-length array of "space + 1" nodes */ typedef struct binaryheap { int bh_size; int bh_space; + size_t bh_elem_size; bool bh_has_heap_property; /* debugging cross-check */ binaryheap_comparator bh_compare; void *bh_arg; - Datum bh_nodes[FLEXIBLE_ARRAY_MEMBER]; + void *bh_hole; + char bh_nodes[FLEXIBLE_ARRAY_MEMBER]; } binaryheap; -extern binaryheap *binaryheap_allocate(int capacity, +extern binaryheap *binaryheap_allocate(int capacity, size_t elem_size, binaryheap_comparator compare, void *arg); extern void binaryheap_reset(binaryheap *heap); extern void binaryheap_free(binaryheap *heap); -extern void binaryheap_add_unordered(binaryheap *heap, Datum d); +extern void binaryheap_add_unordered(binaryheap *heap, void *d); extern void binaryheap_build(binaryheap *heap); -extern void binaryheap_add(binaryheap *heap, Datum d); -extern Datum binaryheap_first(binaryheap *heap); -extern Datum binaryheap_remove_first(binaryheap *heap); -extern void binaryheap_replace_first(binaryheap *heap, Datum d); +extern void binaryheap_add(binaryheap *heap, void *d); +extern void binaryheap_first(binaryheap *heap, void *result); +extern void binaryheap_remove_first(binaryheap *heap, void *result); +extern void binaryheap_replace_first(binaryheap *heap, void *d); #define binaryheap_empty(h) ((h)->bh_size == 0) -- 2.25.1
>From a0dad80775e9e1947d49bb1483cbdc2f2b86415a Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 4 Sep 2023 15:17:34 -0700 Subject: [PATCH v7 2/5] Make binaryheap available to frontend code. There are a couple of places in frontend code that could make use of this simple binary heap implementation. This commit makes binaryheap usable in frontend code, much like commit 26aaf97b68 did for StringInfo. Like StringInfo, the header file is left in lib/ to reduce the likelihood of unnecessary breakage. --- src/backend/lib/Makefile | 1 - src/backend/lib/meson.build | 1 - src/common/Makefile | 1 + src/{backend/lib => common}/binaryheap.c | 17 ++++++++++++++++- src/common/meson.build | 1 + 5 files changed, 18 insertions(+), 3 deletions(-) rename src/{backend/lib => common}/binaryheap.c (96%) diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile index 9dad31398a..b6cefd9cca 100644 --- a/src/backend/lib/Makefile +++ b/src/backend/lib/Makefile @@ -13,7 +13,6 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = \ - binaryheap.o \ bipartite_match.o \ bloomfilter.o \ dshash.o \ diff --git a/src/backend/lib/meson.build b/src/backend/lib/meson.build index 974cab8776..b4e88f54ae 100644 --- a/src/backend/lib/meson.build +++ b/src/backend/lib/meson.build @@ -1,7 +1,6 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group backend_sources += files( - 'binaryheap.c', 'bipartite_match.c', 'bloomfilter.c', 'dshash.c', diff --git a/src/common/Makefile b/src/common/Makefile index 113029bf7b..cc5c54dcee 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -48,6 +48,7 @@ LIBS += $(PTHREAD_LIBS) OBJS_COMMON = \ archive.o \ base64.o \ + binaryheap.o \ checksum_helper.o \ compression.o \ config_info.o \ diff --git a/src/backend/lib/binaryheap.c b/src/common/binaryheap.c similarity index 96% rename from src/backend/lib/binaryheap.c rename to src/common/binaryheap.c index 6f63181c4c..07021d10b8 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/common/binaryheap.c @@ -6,15 +6,22 @@ * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group * * IDENTIFICATION - * src/backend/lib/binaryheap.c + * src/common/binaryheap.c * *------------------------------------------------------------------------- */ +#ifdef FRONTEND +#include "postgres_fe.h" +#else #include "postgres.h" +#endif #include <math.h> +#ifdef FRONTEND +#include "common/logging.h" +#endif #include "lib/binaryheap.h" static void sift_down(binaryheap *heap, int node_off); @@ -133,7 +140,11 @@ void binaryheap_add_unordered(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif heap->bh_has_heap_property = false; bh_set(heap, heap->bh_size, d); heap->bh_size++; @@ -165,7 +176,11 @@ void binaryheap_add(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif bh_set(heap, heap->bh_size, d); heap->bh_size++; sift_up(heap, heap->bh_size - 1); diff --git a/src/common/meson.build b/src/common/meson.build index 53942a9a61..3b97497d1a 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -3,6 +3,7 @@ common_sources = files( 'archive.c', 'base64.c', + 'binaryheap.c', 'checksum_helper.c', 'compression.c', 'controldata_utils.c', -- 2.25.1
>From 658f4c1899b4721275986e50f388dd5be88e8c20 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 20 Jul 2023 09:52:20 -0700 Subject: [PATCH v7 3/5] Add function for removing arbitrary nodes in binaryheap. This commit introduces binaryheap_remove_node(), which can be used to remove any node from a binary heap. The implementation is straightforward. The target node is replaced with the last node in the heap, and then we sift as needed to preserve the heap property. This new function is intended for use in a follow-up commit that will improve the performance of pg_restore. --- src/common/binaryheap.c | 44 ++++++++++++++++++++++++++++++++++++ src/include/lib/binaryheap.h | 3 +++ 2 files changed, 47 insertions(+) diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index 07021d10b8..f1ed0e157e 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -232,6 +232,50 @@ binaryheap_remove_first(binaryheap *heap, void *result) sift_down(heap, 0); } +/* + * binaryheap_nth + * + * Returns the nth node from the heap. The caller must ensure that there are + * at least (n - 1) nodes in the heap. Always O(1). + */ +void +binaryheap_nth(binaryheap *heap, int n, void *result) +{ + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(n >= 0 && n < heap->bh_size); + + memmove(result, bh_node(heap, n), heap->bh_elem_size); +} + +/* + * binaryheap_remove_nth + * + * Removes the nth node from the heap. The caller must ensure that there are + * at least (n - 1) nodes in the heap. O(log n) worst case. + */ +void +binaryheap_remove_nth(binaryheap *heap, int n) +{ + int cmp; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(n >= 0 && n < heap->bh_size); + + /* compare last node to the one that is being removed */ + cmp = heap->bh_compare(bh_node(heap, --heap->bh_size), + bh_node(heap, n), + heap->bh_arg); + + /* remove the last node, placing it in the vacated entry */ + bh_set(heap, n, bh_node(heap, heap->bh_size)); + + /* sift as needed to preserve the heap property */ + if (cmp > 0) + sift_up(heap, n); + else if (cmp < 0) + sift_down(heap, n); +} + /* * binaryheap_replace_first * diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index b8c7ef81ef..083821ddbd 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -50,8 +50,11 @@ extern void binaryheap_build(binaryheap *heap); extern void binaryheap_add(binaryheap *heap, void *d); extern void binaryheap_first(binaryheap *heap, void *result); extern void binaryheap_remove_first(binaryheap *heap, void *result); +extern void binaryheap_nth(binaryheap *heap, int n, void *result); +extern void binaryheap_remove_nth(binaryheap *heap, int n); extern void binaryheap_replace_first(binaryheap *heap, void *d); #define binaryheap_empty(h) ((h)->bh_size == 0) +#define binaryheap_size(h) ((h)->bh_size) #endif /* BINARYHEAP_H */ -- 2.25.1
>From 65de4421ba130a5716fc226f6de05b97b30c2711 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 4 Sep 2023 15:35:08 -0700 Subject: [PATCH v7 4/5] Convert pg_restore's ready_list to a priority queue. Presently, we spend a lot of time sorting this list so that we pick the largest items first. With many tables, this sorting can become a significant bottleneck. There are a couple of reports from the field about this, and it is easily reproducible, so this is not a hypothetical issue. This commit improves the performance of pg_restore with many tables by converting its ready_list to a priority queue, i.e., a binary heap. We will first try to run the highest priority item, but if it cannot be chosen due to the lock heuristic, we'll do a sequential scan through the heap nodes until we find one that is runnable. This means that we might end up picking an item with much lower priority, but since we expect that we'll typically be able to choose one of the first few nodes, we should usually pick an item with a relatively high priority. On my machine, a basic test with 100,000 tables takes 11.5 minutes without this patch and 1.5 minutes with it. Pierre Ducroquet claims to see a speedup from 30 minutes to 23 minutes for a real-world dump of over 50,000 tables. --- src/bin/pg_dump/pg_backup_archiver.c | 201 ++++++++------------------- 1 file changed, 61 insertions(+), 140 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 39ebcfec32..ff7349537e 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -34,6 +34,7 @@ #include "compress_io.h" #include "dumputils.h" #include "fe_utils/string_utils.h" +#include "lib/binaryheap.h" #include "lib/stringinfo.h" #include "libpq/libpq-fs.h" #include "parallel.h" @@ -44,24 +45,6 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* - * State for tracking TocEntrys that are ready to process during a parallel - * restore. (This used to be a list, and we still call it that, though now - * it's really an array so that we can apply qsort to it.) - * - * tes[] is sized large enough that we can't overrun it. - * The valid entries are indexed first_te .. last_te inclusive. - * We periodically sort the array to bring larger-by-dataLength entries to - * the front; "sorted" is true if the valid entries are known sorted. - */ -typedef struct _parallelReadyList -{ - TocEntry **tes; /* Ready-to-dump TocEntrys */ - int first_te; /* index of first valid entry in tes[] */ - int last_te; /* index of last valid entry in tes[] */ - bool sorted; /* are valid entries currently sorted? */ -} ParallelReadyList; - static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, @@ -110,16 +93,13 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH, static void pending_list_header_init(TocEntry *l); static void pending_list_append(TocEntry *l, TocEntry *te); static void pending_list_remove(TocEntry *te); -static void ready_list_init(ParallelReadyList *ready_list, int tocCount); -static void ready_list_free(ParallelReadyList *ready_list); -static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); -static void ready_list_remove(ParallelReadyList *ready_list, int i); -static void ready_list_sort(ParallelReadyList *ready_list); -static int TocEntrySizeCompare(const void *p1, const void *p2); -static void move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +static int TocEntrySizeCompareQsort(const void *p1, const void *p2); +static int TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, + void *arg); +static void move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass); -static TocEntry *pop_next_work_item(ParallelReadyList *ready_list, +static TocEntry *pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -134,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list); + binaryheap *ready_heap); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -2380,7 +2360,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) } if (ntes > 1) - qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare); + qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort); for (int i = 0; i < ntes; i++) DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, @@ -3980,7 +3960,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) (void) restore_toc_entry(AH, next_work_item, false); - /* Reduce dependencies, but don't move anything to ready_list */ + /* Reduce dependencies, but don't move anything to ready_heap */ reduce_dependencies(AH, next_work_item, NULL); } else @@ -4023,24 +4003,27 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - ParallelReadyList ready_list; + binaryheap *ready_heap; TocEntry *next_work_item; pg_log_debug("entering restore_toc_entries_parallel"); - /* Set up ready_list with enough room for all known TocEntrys */ - ready_list_init(&ready_list, AH->tocCount); + /* Set up ready_heap with enough room for all known TocEntrys */ + ready_heap = binaryheap_allocate(AH->tocCount, + sizeof(TocEntry *), + TocEntrySizeCompareBinaryheap, + NULL); /* * The pending_list contains all items that we need to restore. Move all - * items that are available to process immediately into the ready_list. + * items that are available to process immediately into the ready_heap. * After this setup, the pending list is everything that needs to be done - * but is blocked by one or more dependencies, while the ready list + * but is blocked by one or more dependencies, while the ready heap * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ AH->restorePass = RESTORE_PASS_MAIN; - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* * main parent loop @@ -4054,7 +4037,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = pop_next_work_item(&ready_list, pstate); + next_work_item = pop_next_work_item(ready_heap, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4064,7 +4047,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); /* Update its dependencies as though we'd completed it */ - reduce_dependencies(AH, next_work_item, &ready_list); + reduce_dependencies(AH, next_work_item, ready_heap); /* Loop around to see if anything else can be dispatched */ continue; } @@ -4075,7 +4058,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, - mark_restore_job_done, &ready_list); + mark_restore_job_done, ready_heap); } else if (IsEveryWorkerIdle(pstate)) { @@ -4089,7 +4072,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Advance to next restore pass */ AH->restorePass++; /* That probably allows some stuff to be made ready */ - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* Loop around to see if anything's now ready */ continue; } @@ -4118,10 +4101,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS); } - /* There should now be nothing in ready_list. */ - Assert(ready_list.first_te > ready_list.last_te); + /* There should now be nothing in ready_heap. */ + Assert(binaryheap_empty(ready_heap)); - ready_list_free(&ready_list); + binaryheap_free(ready_heap); pg_log_info("finished main parallel loop"); } @@ -4221,80 +4204,9 @@ pending_list_remove(TocEntry *te) } -/* - * Initialize the ready_list with enough room for up to tocCount entries. - */ -static void -ready_list_init(ParallelReadyList *ready_list, int tocCount) -{ - ready_list->tes = (TocEntry **) - pg_malloc(tocCount * sizeof(TocEntry *)); - ready_list->first_te = 0; - ready_list->last_te = -1; - ready_list->sorted = false; -} - -/* - * Free storage for a ready_list. - */ -static void -ready_list_free(ParallelReadyList *ready_list) -{ - pg_free(ready_list->tes); -} - -/* Add te to the ready_list */ -static void -ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) -{ - ready_list->tes[++ready_list->last_te] = te; - /* List is (probably) not sorted anymore. */ - ready_list->sorted = false; -} - -/* Remove the i'th entry in the ready_list */ -static void -ready_list_remove(ParallelReadyList *ready_list, int i) -{ - int f = ready_list->first_te; - - Assert(i >= f && i <= ready_list->last_te); - - /* - * In the typical case where the item to be removed is the first ready - * entry, we need only increment first_te to remove it. Otherwise, move - * the entries before it to compact the list. (This preserves sortedness, - * if any.) We could alternatively move the entries after i, but there - * are typically many more of those. - */ - if (i > f) - { - TocEntry **first_te_ptr = &ready_list->tes[f]; - - memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); - } - ready_list->first_te++; -} - -/* Sort the ready_list into the desired order */ -static void -ready_list_sort(ParallelReadyList *ready_list) -{ - if (!ready_list->sorted) - { - int n = ready_list->last_te - ready_list->first_te + 1; - - if (n > 1) - qsort(ready_list->tes + ready_list->first_te, n, - sizeof(TocEntry *), - TocEntrySizeCompare); - ready_list->sorted = true; - } -} - /* qsort comparator for sorting TocEntries by dataLength */ static int -TocEntrySizeCompare(const void *p1, const void *p2) +TocEntrySizeCompareQsort(const void *p1, const void *p2) { const TocEntry *te1 = *(const TocEntry *const *) p1; const TocEntry *te2 = *(const TocEntry *const *) p2; @@ -4314,17 +4226,24 @@ TocEntrySizeCompare(const void *p1, const void *p2) return 0; } +/* binaryheap comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, void *arg) +{ + return TocEntrySizeCompareQsort(p1, p2); +} + /* - * Move all immediately-ready items from pending_list to ready_list. + * Move all immediately-ready items from pending_list to ready_heap. * * Items are considered ready if they have no remaining dependencies and * they belong in the current restore pass. (See also reduce_dependencies, * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass) { TocEntry *te; @@ -4340,40 +4259,42 @@ move_to_ready_list(TocEntry *pending_list, { /* Remove it from pending_list ... */ pending_list_remove(te); - /* ... and add to ready_list */ - ready_list_insert(ready_list, te); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, &te); } } } /* * Find the next work item (if any) that is capable of being run now, - * and remove it from the ready_list. + * and remove it from the ready_heap. * * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with - * items currently running. Items in the ready_list are known to have + * items currently running. Items in the ready_heap are known to have * no remaining dependencies, but we have to check for lock conflicts. */ static TocEntry * -pop_next_work_item(ParallelReadyList *ready_list, +pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate) { /* - * Sort the ready_list so that we'll tackle larger jobs first. - */ - ready_list_sort(ready_list); - - /* - * Search the ready_list until we find a suitable item. + * Search the ready_heap until we find a suitable item. Note that we do a + * sequential scan through the heap nodes, so even though we will first + * try to choose the highest-priority item, we might end up picking + * something with a much lower priority. However, it is expected that we + * will typically be able to pick one of the first few items, which should + * usually have a relatively high priority. */ - for (int i = ready_list->first_te; i <= ready_list->last_te; i++) + for (int i = 0; i < binaryheap_size(ready_heap); i++) { - TocEntry *te = ready_list->tes[i]; + TocEntry *te; bool conflicts = false; + binaryheap_nth(ready_heap, i, &te); + /* * Check to see if the item would need exclusive lock on something * that a currently running item also needs lock on, or vice versa. If @@ -4397,7 +4318,7 @@ pop_next_work_item(ParallelReadyList *ready_list, continue; /* passed all tests, so this item can run */ - ready_list_remove(ready_list, i); + binaryheap_remove_nth(ready_heap, i); return te; } @@ -4443,7 +4364,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; + binaryheap *ready_heap = (binaryheap *) callback_data; pg_log_info("finished item %d %s %s", te->dumpId, te->desc, te->tag); @@ -4461,7 +4382,7 @@ mark_restore_job_done(ArchiveHandle *AH, pg_fatal("worker process failed: exit code %d", status); - reduce_dependencies(AH, te, ready_list); + reduce_dependencies(AH, te, ready_heap); } @@ -4704,11 +4625,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) /* * Remove the specified TOC entry from the depCounts of items that depend on * it, thereby possibly making them ready-to-run. Any pending item that - * becomes ready should be moved to the ready_list, if that's provided. + * becomes ready should be moved to the ready_heap, if that's provided. */ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list) + binaryheap *ready_heap) { int i; @@ -4726,18 +4647,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, * the current restore pass, and it is currently a member of the * pending list (that check is needed to prevent double restore in * some cases where a list-file forces out-of-order restoring). - * However, if ready_list == NULL then caller doesn't want any list + * However, if ready_heap == NULL then caller doesn't want any list * memberships changed. */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && otherte->pending_prev != NULL && - ready_list != NULL) + ready_heap != NULL) { /* Remove it from pending list ... */ pending_list_remove(otherte); - /* ... and add to ready_list */ - ready_list_insert(ready_list, otherte); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, &otherte); } } } -- 2.25.1
>From 51ebcf055faadf2c174bbe533479843837c3945a Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Tue, 25 Jul 2023 11:18:52 -0700 Subject: [PATCH v7 5/5] Remove open-coded binary heap in pg_dump_sort.c. Thanks to commit XXXXXXXXXX, binaryheap is available to frontend code. This commit replaces the open-coded heap implementation in pg_dump_sort.c with a binaryheap, saving a few lines of code. --- src/bin/pg_dump/pg_dump_sort.c | 103 +++++++-------------------------- 1 file changed, 20 insertions(+), 83 deletions(-) diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 523a19c155..c9b161ed30 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -16,6 +16,7 @@ #include "postgres_fe.h" #include "catalog/pg_class_d.h" +#include "lib/binaryheap.h" #include "pg_backup_archiver.h" #include "pg_backup_utils.h" #include "pg_dump.h" @@ -161,8 +162,6 @@ static bool TopoSort(DumpableObject **objs, int numObjs, DumpableObject **ordering, int *nOrdering); -static void addHeapElement(int val, int *heap, int heapLength); -static int removeHeapElement(int *heap, int heapLength); static void findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs); static int findLoop(DumpableObject *obj, DumpId startPoint, @@ -174,6 +173,7 @@ static void repairDependencyLoop(DumpableObject **loop, int nLoop); static void describeDumpableObject(DumpableObject *obj, char *buf, int bufsize); +static int int_cmp(const void *a, const void *b, void *arg); /* @@ -374,11 +374,10 @@ TopoSort(DumpableObject **objs, int *nOrdering) /* output argument */ { DumpId maxDumpId = getMaxDumpId(); - int *pendingHeap; + binaryheap *pendingHeap; int *beforeConstraints; int *idMap; DumpableObject *obj; - int heapLength; int i, j, k; @@ -403,7 +402,7 @@ TopoSort(DumpableObject **objs, return true; /* Create workspace for the above-described heap */ - pendingHeap = (int *) pg_malloc(numObjs * sizeof(int)); + pendingHeap = binaryheap_allocate(numObjs, sizeof(int), int_cmp, NULL); /* * Scan the constraints, and for each item in the input, generate a count @@ -434,19 +433,16 @@ TopoSort(DumpableObject **objs, * Now initialize the heap of items-ready-to-output by filling it with the * indexes of items that already have beforeConstraints[id] == 0. * - * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each j - * in the range 1..heapLength-1 (note we are using 0-based subscripts - * here, while the discussion in Knuth assumes 1-based subscripts). So, if - * we simply enter the indexes into pendingHeap[] in decreasing order, we - * a-fortiori have the heap invariant satisfied at completion of this - * loop, and don't need to do any sift-up comparisons. + * We enter the objects into pendingHeap in decreasing order so that the + * heap invariant is satisfied at the completion of this loop. This + * reduces the amount of work that binaryheap_build() must do. */ - heapLength = 0; for (i = numObjs; --i >= 0;) { if (beforeConstraints[objs[i]->dumpId] == 0) - pendingHeap[heapLength++] = i; + binaryheap_add_unordered(pendingHeap, &i); } + binaryheap_build(pendingHeap); /*-------------------- * Now emit objects, working backwards in the output list. At each step, @@ -464,10 +460,10 @@ TopoSort(DumpableObject **objs, *-------------------- */ i = numObjs; - while (heapLength > 0) + while (!binaryheap_empty(pendingHeap)) { /* Select object to output by removing largest heap member */ - j = removeHeapElement(pendingHeap, heapLength--); + binaryheap_remove_first(pendingHeap, &j); obj = objs[j]; /* Output candidate to ordering[] */ ordering[--i] = obj; @@ -477,7 +473,7 @@ TopoSort(DumpableObject **objs, int id = obj->dependencies[k]; if ((--beforeConstraints[id]) == 0) - addHeapElement(idMap[id], pendingHeap, heapLength++); + binaryheap_add(pendingHeap, &idMap[id]); } } @@ -497,79 +493,13 @@ TopoSort(DumpableObject **objs, } /* Done */ - free(pendingHeap); + binaryheap_free(pendingHeap); free(beforeConstraints); free(idMap); return (i == 0); } -/* - * Add an item to a heap (priority queue) - * - * heapLength is the current heap size; caller is responsible for increasing - * its value after the call. There must be sufficient storage at *heap. - */ -static void -addHeapElement(int val, int *heap, int heapLength) -{ - int j; - - /* - * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is - * using 1-based array indexes, not 0-based. - */ - j = heapLength; - while (j > 0) - { - int i = (j - 1) >> 1; - - if (val <= heap[i]) - break; - heap[j] = heap[i]; - j = i; - } - heap[j] = val; -} - -/* - * Remove the largest item present in a heap (priority queue) - * - * heapLength is the current heap size; caller is responsible for decreasing - * its value after the call. - * - * We remove and return heap[0], which is always the largest element of - * the heap, and then "sift up" to maintain the heap invariant. - */ -static int -removeHeapElement(int *heap, int heapLength) -{ - int result = heap[0]; - int val; - int i; - - if (--heapLength <= 0) - return result; - val = heap[heapLength]; /* value that must be reinserted */ - i = 0; /* i is where the "hole" is */ - for (;;) - { - int j = 2 * i + 1; - - if (j >= heapLength) - break; - if (j + 1 < heapLength && - heap[j] < heap[j + 1]) - j++; - if (val >= heap[j]) - break; - heap[i] = heap[j]; - i = j; - } - heap[i] = val; - return result; -} - /* * findDependencyLoops - identify loops in TopoSort's failure output, * and pass each such loop to repairDependencyLoop() for action @@ -1559,3 +1489,10 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) (int) obj->objType, obj->dumpId, obj->catId.oid); } + +/* binaryheap comparator for integers */ +static int +int_cmp(const void *a, const void *b, void *arg) +{ + return *((const int *) a) - *((const int *) b); +} -- 2.25.1