On Wed, Sep 13, 2023 at 08:01:39PM -0400, Tom Lane wrote: > Nathan Bossart <nathandboss...@gmail.com> writes: >> Upon closer inspection, I found a rather nasty problem. The qsort >> comparator expects a TocEntry **, but the binaryheap comparator expects a >> TocEntry *, and we simply pass the arguments through to the qsort >> comparator. In v9, I added the requisite ampersands. > > Ooops :-( > >> I'm surprised this >> worked at all. > > Probably it was not sorting things appropriately. Might be worth adding > some test scaffolding to check that bigger tasks are chosen before > smaller ones.
Further testing revealed that the binaryheap comparator function was actually generating a min-heap since the qsort comparator sorts by decreasing dataLength. This is fixed in v10. And I am 0 for 2 today... Now that this appears to be functioning as expected, I see that the larger entries are typically picked up earlier, but we do sometimes pick entries quite a bit further down the list, as anticipated. The case I was testing (10k tables with the number of rows equal to the table number) was much faster with this patch (just over a minute) than without it (over 16 minutes). Sincerest apologies for the noise. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 59498e233d725d5c427e58f700402908fce2a10f Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 22 Jul 2023 15:04:45 -0700 Subject: [PATCH v10 1/4] Make binaryheap available to frontend code. There are a couple of places in frontend code that could make use of this simple binary heap implementation. This commit makes binaryheap usable in frontend code, much like commit 26aaf97b68 did for StringInfo. Like StringInfo, the header file is left in lib/ to reduce the likelihood of unnecessary breakage. The frontend version of binaryheap exposes a void *-based API since frontend code does not have access to the Datum definitions. This seemed like a better approach than switching all existing uses to void * or making the Datum definitions available to frontend code. Reviewed-by: Tom Lane, Alvaro Herrera Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us --- src/backend/lib/Makefile | 1 - src/backend/lib/meson.build | 1 - src/common/Makefile | 1 + src/{backend/lib => common}/binaryheap.c | 41 +++++++++++++++++------- src/common/meson.build | 1 + src/include/lib/binaryheap.h | 26 +++++++++++---- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 52 insertions(+), 20 deletions(-) rename src/{backend/lib => common}/binaryheap.c (90%) diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile index 9dad31398a..b6cefd9cca 100644 --- a/src/backend/lib/Makefile +++ b/src/backend/lib/Makefile @@ -13,7 +13,6 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = \ - binaryheap.o \ bipartite_match.o \ bloomfilter.o \ dshash.o \ diff --git a/src/backend/lib/meson.build b/src/backend/lib/meson.build index 974cab8776..b4e88f54ae 100644 --- a/src/backend/lib/meson.build +++ b/src/backend/lib/meson.build @@ -1,7 +1,6 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group backend_sources += files( - 'binaryheap.c', 'bipartite_match.c', 'bloomfilter.c', 'dshash.c', diff --git a/src/common/Makefile b/src/common/Makefile index 113029bf7b..cc5c54dcee 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -48,6 +48,7 @@ LIBS += $(PTHREAD_LIBS) OBJS_COMMON = \ archive.o \ base64.o \ + binaryheap.o \ checksum_helper.o \ compression.o \ config_info.o \ diff --git a/src/backend/lib/binaryheap.c b/src/common/binaryheap.c similarity index 90% rename from src/backend/lib/binaryheap.c rename to src/common/binaryheap.c index 1737546757..39a8243a6d 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/common/binaryheap.c @@ -6,15 +6,22 @@ * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group * * IDENTIFICATION - * src/backend/lib/binaryheap.c + * src/common/binaryheap.c * *------------------------------------------------------------------------- */ +#ifdef FRONTEND +#include "postgres_fe.h" +#else #include "postgres.h" +#endif #include <math.h> +#ifdef FRONTEND +#include "common/logging.h" +#endif #include "lib/binaryheap.h" static void sift_down(binaryheap *heap, int node_off); @@ -34,7 +41,7 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) int sz; binaryheap *heap; - sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity; + sz = offsetof(binaryheap, bh_nodes) + sizeof(bh_node_type) * capacity; heap = (binaryheap *) palloc(sz); heap->bh_space = capacity; heap->bh_compare = compare; @@ -106,10 +113,16 @@ parent_offset(int i) * afterwards. */ void -binaryheap_add_unordered(binaryheap *heap, Datum d) +binaryheap_add_unordered(binaryheap *heap, bh_node_type d) { if (heap->bh_size >= heap->bh_space) + { +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif + } heap->bh_has_heap_property = false; heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; @@ -138,10 +151,16 @@ binaryheap_build(binaryheap *heap) * the heap property. */ void -binaryheap_add(binaryheap *heap, Datum d) +binaryheap_add(binaryheap *heap, bh_node_type d) { if (heap->bh_size >= heap->bh_space) + { +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif + } heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; sift_up(heap, heap->bh_size - 1); @@ -154,7 +173,7 @@ binaryheap_add(binaryheap *heap, Datum d) * without modifying the heap. The caller must ensure that this * routine is not used on an empty heap. Always O(1). */ -Datum +bh_node_type binaryheap_first(binaryheap *heap) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -169,10 +188,10 @@ binaryheap_first(binaryheap *heap) * that this routine is not used on an empty heap. O(log n) worst * case. */ -Datum +bh_node_type binaryheap_remove_first(binaryheap *heap) { - Datum result; + bh_node_type result; Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -204,7 +223,7 @@ binaryheap_remove_first(binaryheap *heap) * sifting the new node down. */ void -binaryheap_replace_first(binaryheap *heap, Datum d) +binaryheap_replace_first(binaryheap *heap, bh_node_type d) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -221,7 +240,7 @@ binaryheap_replace_first(binaryheap *heap, Datum d) static void sift_up(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + bh_node_type node_val = heap->bh_nodes[node_off]; /* * Within the loop, the node_off'th array entry is a "hole" that @@ -232,7 +251,7 @@ sift_up(binaryheap *heap, int node_off) { int cmp; int parent_off; - Datum parent_val; + bh_node_type parent_val; /* * If this node is smaller than its parent, the heap condition is @@ -264,7 +283,7 @@ sift_up(binaryheap *heap, int node_off) static void sift_down(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + bh_node_type node_val = heap->bh_nodes[node_off]; /* * Within the loop, the node_off'th array entry is a "hole" that diff --git a/src/common/meson.build b/src/common/meson.build index 53942a9a61..3b97497d1a 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -3,6 +3,7 @@ common_sources = files( 'archive.c', 'base64.c', + 'binaryheap.c', 'checksum_helper.c', 'compression.c', 'controldata_utils.c', diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 52f7b06b25..3647aeae65 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -11,11 +11,23 @@ #ifndef BINARYHEAP_H #define BINARYHEAP_H +/* + * We provide a Datum-based API for backend code and a void *-based API for + * frontend code (since the Datum definitions are not available to frontend + * code). You should typically avoid using bh_node_type directly and instead + * use Datum or void * as appropriate. + */ +#ifdef FRONTEND +typedef void *bh_node_type; +#else +typedef Datum bh_node_type; +#endif + /* * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b, * and >0 iff a > b. For a min-heap, the conditions are reversed. */ -typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg); +typedef int (*binaryheap_comparator) (bh_node_type a, bh_node_type b, void *arg); /* * binaryheap @@ -34,7 +46,7 @@ typedef struct binaryheap bool bh_has_heap_property; /* debugging cross-check */ binaryheap_comparator bh_compare; void *bh_arg; - Datum bh_nodes[FLEXIBLE_ARRAY_MEMBER]; + bh_node_type bh_nodes[FLEXIBLE_ARRAY_MEMBER]; } binaryheap; extern binaryheap *binaryheap_allocate(int capacity, @@ -42,12 +54,12 @@ extern binaryheap *binaryheap_allocate(int capacity, void *arg); extern void binaryheap_reset(binaryheap *heap); extern void binaryheap_free(binaryheap *heap); -extern void binaryheap_add_unordered(binaryheap *heap, Datum d); +extern void binaryheap_add_unordered(binaryheap *heap, bh_node_type d); extern void binaryheap_build(binaryheap *heap); -extern void binaryheap_add(binaryheap *heap, Datum d); -extern Datum binaryheap_first(binaryheap *heap); -extern Datum binaryheap_remove_first(binaryheap *heap); -extern void binaryheap_replace_first(binaryheap *heap, Datum d); +extern void binaryheap_add(binaryheap *heap, bh_node_type d); +extern bh_node_type binaryheap_first(binaryheap *heap); +extern bh_node_type binaryheap_remove_first(binaryheap *heap); +extern void binaryheap_replace_first(binaryheap *heap, bh_node_type d); #define binaryheap_empty(h) ((h)->bh_size == 0) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f3d8a2a855..b5bbdd1608 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3198,6 +3198,7 @@ bbstreamer_tar_archiver bbstreamer_tar_parser bbstreamer_zstd_frame bgworker_main_type +bh_node_type binaryheap binaryheap_comparator bitmapword -- 2.25.1
>From 303cd18beaaf4a4133a0988d71bfea0bd2b4444b Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 20 Jul 2023 09:52:20 -0700 Subject: [PATCH v10 2/4] Add function for removing arbitrary nodes in binaryheap. This commit introduces binaryheap_remove_node(), which can be used to remove any node from a binary heap. The implementation is straightforward. The target node is replaced with the last node in the heap, and then we sift as needed to preserve the heap property. This new function is intended for use in a follow-up commit that will improve the performance of pg_restore. Reviewed-by: Tom Lane Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us --- src/common/binaryheap.c | 29 +++++++++++++++++++++++++++++ src/include/lib/binaryheap.h | 3 +++ 2 files changed, 32 insertions(+) diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index 39a8243a6d..19e095f1fb 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -215,6 +215,35 @@ binaryheap_remove_first(binaryheap *heap) return result; } +/* + * binaryheap_remove_node + * + * Removes the nth (zero based) node from the heap. The caller must ensure + * that there are at least (n + 1) nodes in the heap. O(log n) worst case. + */ +void +binaryheap_remove_node(binaryheap *heap, int n) +{ + int cmp; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(n >= 0 && n < heap->bh_size); + + /* compare last node to the one that is being removed */ + cmp = heap->bh_compare(heap->bh_nodes[--heap->bh_size], + heap->bh_nodes[n], + heap->bh_arg); + + /* remove the last node, placing it in the vacated entry */ + heap->bh_nodes[n] = heap->bh_nodes[heap->bh_size]; + + /* sift as needed to preserve the heap property */ + if (cmp > 0) + sift_up(heap, n); + else if (cmp < 0) + sift_down(heap, n); +} + /* * binaryheap_replace_first * diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 3647aeae65..9525dcaec4 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -59,8 +59,11 @@ extern void binaryheap_build(binaryheap *heap); extern void binaryheap_add(binaryheap *heap, bh_node_type d); extern bh_node_type binaryheap_first(binaryheap *heap); extern bh_node_type binaryheap_remove_first(binaryheap *heap); +extern void binaryheap_remove_node(binaryheap *heap, int n); extern void binaryheap_replace_first(binaryheap *heap, bh_node_type d); #define binaryheap_empty(h) ((h)->bh_size == 0) +#define binaryheap_size(h) ((h)->bh_size) +#define binaryheap_get_node(h, n) ((h)->bh_nodes[n]) #endif /* BINARYHEAP_H */ -- 2.25.1
>From 06dd45a66995cd10db0b9a752b9d601b053cd244 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 20 Jul 2023 10:19:08 -0700 Subject: [PATCH v10 3/4] Convert pg_restore's ready_list to a priority queue. Presently, we spend a lot of time sorting this list so that we pick the largest items first. With many tables, this sorting can become a significant bottleneck. There are a couple of reports from the field about this, and it is easily reproducible, so this is not a hypothetical issue. This commit improves the performance of pg_restore with many tables by converting its ready_list to a priority queue, i.e., a binary heap. We will first try to run the highest priority item, but if it cannot be chosen due to the lock heuristic, we'll do a sequential scan through the heap nodes until we find one that is runnable. This means that we might end up picking an item with much lower priority, but since we expect that we'll typically be able to choose one of the first few nodes, we should usually pick an item with a relatively high priority. On my machine, a basic test with 100,000 tables takes 11.5 minutes without this patch and 1.5 minutes with it. Pierre Ducroquet claims to see a speedup from 30 minutes to 23 minutes for a real-world dump of over 50,000 tables. Suggested-by: Tom Lane Tested-by: Pierre Ducroquet Reviewed-by: Tom Lane Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us --- src/bin/pg_dump/pg_backup_archiver.c | 198 ++++++++------------------- 1 file changed, 58 insertions(+), 140 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 4d83381d84..b30b49702d 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -34,6 +34,7 @@ #include "compress_io.h" #include "dumputils.h" #include "fe_utils/string_utils.h" +#include "lib/binaryheap.h" #include "lib/stringinfo.h" #include "libpq/libpq-fs.h" #include "parallel.h" @@ -44,24 +45,6 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* - * State for tracking TocEntrys that are ready to process during a parallel - * restore. (This used to be a list, and we still call it that, though now - * it's really an array so that we can apply qsort to it.) - * - * tes[] is sized large enough that we can't overrun it. - * The valid entries are indexed first_te .. last_te inclusive. - * We periodically sort the array to bring larger-by-dataLength entries to - * the front; "sorted" is true if the valid entries are known sorted. - */ -typedef struct _parallelReadyList -{ - TocEntry **tes; /* Ready-to-dump TocEntrys */ - int first_te; /* index of first valid entry in tes[] */ - int last_te; /* index of last valid entry in tes[] */ - bool sorted; /* are valid entries currently sorted? */ -} ParallelReadyList; - static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, @@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH, static void pending_list_header_init(TocEntry *l); static void pending_list_append(TocEntry *l, TocEntry *te); static void pending_list_remove(TocEntry *te); -static void ready_list_init(ParallelReadyList *ready_list, int tocCount); -static void ready_list_free(ParallelReadyList *ready_list); -static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); -static void ready_list_remove(ParallelReadyList *ready_list, int i); -static void ready_list_sort(ParallelReadyList *ready_list); -static int TocEntrySizeCompare(const void *p1, const void *p2); -static void move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +static int TocEntrySizeCompareQsort(const void *p1, const void *p2); +static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg); +static void move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass); -static TocEntry *pop_next_work_item(ParallelReadyList *ready_list, +static TocEntry *pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list); + binaryheap *ready_heap); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) } if (ntes > 1) - qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare); + qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort); for (int i = 0; i < ntes; i++) DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, @@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) (void) restore_toc_entry(AH, next_work_item, false); - /* Reduce dependencies, but don't move anything to ready_list */ + /* Reduce dependencies, but don't move anything to ready_heap */ reduce_dependencies(AH, next_work_item, NULL); } else @@ -4027,24 +4006,26 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - ParallelReadyList ready_list; + binaryheap *ready_heap; TocEntry *next_work_item; pg_log_debug("entering restore_toc_entries_parallel"); - /* Set up ready_list with enough room for all known TocEntrys */ - ready_list_init(&ready_list, AH->tocCount); + /* Set up ready_heap with enough room for all known TocEntrys */ + ready_heap = binaryheap_allocate(AH->tocCount, + TocEntrySizeCompareBinaryheap, + NULL); /* * The pending_list contains all items that we need to restore. Move all - * items that are available to process immediately into the ready_list. + * items that are available to process immediately into the ready_heap. * After this setup, the pending list is everything that needs to be done - * but is blocked by one or more dependencies, while the ready list + * but is blocked by one or more dependencies, while the ready heap * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ AH->restorePass = RESTORE_PASS_MAIN; - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* * main parent loop @@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = pop_next_work_item(&ready_list, pstate); + next_work_item = pop_next_work_item(ready_heap, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); /* Update its dependencies as though we'd completed it */ - reduce_dependencies(AH, next_work_item, &ready_list); + reduce_dependencies(AH, next_work_item, ready_heap); /* Loop around to see if anything else can be dispatched */ continue; } @@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, - mark_restore_job_done, &ready_list); + mark_restore_job_done, ready_heap); } else if (IsEveryWorkerIdle(pstate)) { @@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Advance to next restore pass */ AH->restorePass++; /* That probably allows some stuff to be made ready */ - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* Loop around to see if anything's now ready */ continue; } @@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS); } - /* There should now be nothing in ready_list. */ - Assert(ready_list.first_te > ready_list.last_te); + /* There should now be nothing in ready_heap. */ + Assert(binaryheap_empty(ready_heap)); - ready_list_free(&ready_list); + binaryheap_free(ready_heap); pg_log_info("finished main parallel loop"); } @@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te) } -/* - * Initialize the ready_list with enough room for up to tocCount entries. - */ -static void -ready_list_init(ParallelReadyList *ready_list, int tocCount) -{ - ready_list->tes = (TocEntry **) - pg_malloc(tocCount * sizeof(TocEntry *)); - ready_list->first_te = 0; - ready_list->last_te = -1; - ready_list->sorted = false; -} - -/* - * Free storage for a ready_list. - */ -static void -ready_list_free(ParallelReadyList *ready_list) -{ - pg_free(ready_list->tes); -} - -/* Add te to the ready_list */ -static void -ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) -{ - ready_list->tes[++ready_list->last_te] = te; - /* List is (probably) not sorted anymore. */ - ready_list->sorted = false; -} - -/* Remove the i'th entry in the ready_list */ -static void -ready_list_remove(ParallelReadyList *ready_list, int i) -{ - int f = ready_list->first_te; - - Assert(i >= f && i <= ready_list->last_te); - - /* - * In the typical case where the item to be removed is the first ready - * entry, we need only increment first_te to remove it. Otherwise, move - * the entries before it to compact the list. (This preserves sortedness, - * if any.) We could alternatively move the entries after i, but there - * are typically many more of those. - */ - if (i > f) - { - TocEntry **first_te_ptr = &ready_list->tes[f]; - - memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); - } - ready_list->first_te++; -} - -/* Sort the ready_list into the desired order */ -static void -ready_list_sort(ParallelReadyList *ready_list) -{ - if (!ready_list->sorted) - { - int n = ready_list->last_te - ready_list->first_te + 1; - - if (n > 1) - qsort(ready_list->tes + ready_list->first_te, n, - sizeof(TocEntry *), - TocEntrySizeCompare); - ready_list->sorted = true; - } -} - /* qsort comparator for sorting TocEntries by dataLength */ static int -TocEntrySizeCompare(const void *p1, const void *p2) +TocEntrySizeCompareQsort(const void *p1, const void *p2) { const TocEntry *te1 = *(const TocEntry *const *) p1; const TocEntry *te2 = *(const TocEntry *const *) p2; @@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2) return 0; } +/* binaryheap comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg) +{ + /* return opposite of qsort comparator for max-heap */ + return -TocEntrySizeCompareQsort(&p1, &p2); +} + /* - * Move all immediately-ready items from pending_list to ready_list. + * Move all immediately-ready items from pending_list to ready_heap. * * Items are considered ready if they have no remaining dependencies and * they belong in the current restore pass. (See also reduce_dependencies, * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass) { TocEntry *te; @@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list, { /* Remove it from pending_list ... */ pending_list_remove(te); - /* ... and add to ready_list */ - ready_list_insert(ready_list, te); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, te); } } } /* * Find the next work item (if any) that is capable of being run now, - * and remove it from the ready_list. + * and remove it from the ready_heap. * * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with - * items currently running. Items in the ready_list are known to have + * items currently running. Items in the ready_heap are known to have * no remaining dependencies, but we have to check for lock conflicts. */ static TocEntry * -pop_next_work_item(ParallelReadyList *ready_list, +pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate) { /* - * Sort the ready_list so that we'll tackle larger jobs first. - */ - ready_list_sort(ready_list); - - /* - * Search the ready_list until we find a suitable item. + * Search the ready_heap until we find a suitable item. Note that we do a + * sequential scan through the heap nodes, so even though we will first + * try to choose the highest-priority item, we might end up picking + * something with a much lower priority. However, it is expected that we + * will typically be able to pick one of the first few items, which should + * usually have a relatively high priority. */ - for (int i = ready_list->first_te; i <= ready_list->last_te; i++) + for (int i = 0; i < binaryheap_size(ready_heap); i++) { - TocEntry *te = ready_list->tes[i]; + TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i); bool conflicts = false; /* @@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list, continue; /* passed all tests, so this item can run */ - ready_list_remove(ready_list, i); + binaryheap_remove_node(ready_heap, i); return te; } @@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; + binaryheap *ready_heap = (binaryheap *) callback_data; pg_log_info("finished item %d %s %s", te->dumpId, te->desc, te->tag); @@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH, pg_fatal("worker process failed: exit code %d", status); - reduce_dependencies(AH, te, ready_list); + reduce_dependencies(AH, te, ready_heap); } @@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) /* * Remove the specified TOC entry from the depCounts of items that depend on * it, thereby possibly making them ready-to-run. Any pending item that - * becomes ready should be moved to the ready_list, if that's provided. + * becomes ready should be moved to the ready_heap, if that's provided. */ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list) + binaryheap *ready_heap) { int i; @@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, * the current restore pass, and it is currently a member of the * pending list (that check is needed to prevent double restore in * some cases where a list-file forces out-of-order restoring). - * However, if ready_list == NULL then caller doesn't want any list + * However, if ready_heap == NULL then caller doesn't want any list * memberships changed. */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && otherte->pending_prev != NULL && - ready_list != NULL) + ready_heap != NULL) { /* Remove it from pending list ... */ pending_list_remove(otherte); - /* ... and add to ready_list */ - ready_list_insert(ready_list, otherte); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, otherte); } } } -- 2.25.1
>From 60ae7a331e6445f9630695b1a13d64dd4287ce62 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Tue, 25 Jul 2023 11:18:52 -0700 Subject: [PATCH v10 4/4] Remove open-coded binary heap in pg_dump_sort.c. Thanks to commit XXXXXXXXXX, binaryheap is available to frontend code. This commit replaces the open-coded heap implementation in pg_dump_sort.c with a binaryheap, saving a few lines of code. Reviewed-by: Tom Lane Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us --- src/bin/pg_dump/pg_dump_sort.c | 105 +++++++-------------------------- 1 file changed, 22 insertions(+), 83 deletions(-) diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 523a19c155..8436c1d757 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -16,6 +16,7 @@ #include "postgres_fe.h" #include "catalog/pg_class_d.h" +#include "lib/binaryheap.h" #include "pg_backup_archiver.h" #include "pg_backup_utils.h" #include "pg_dump.h" @@ -161,8 +162,6 @@ static bool TopoSort(DumpableObject **objs, int numObjs, DumpableObject **ordering, int *nOrdering); -static void addHeapElement(int val, int *heap, int heapLength); -static int removeHeapElement(int *heap, int heapLength); static void findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs); static int findLoop(DumpableObject *obj, DumpId startPoint, @@ -174,6 +173,7 @@ static void repairDependencyLoop(DumpableObject **loop, int nLoop); static void describeDumpableObject(DumpableObject *obj, char *buf, int bufsize); +static int int_cmp(void *a, void *b, void *arg); /* @@ -374,11 +374,10 @@ TopoSort(DumpableObject **objs, int *nOrdering) /* output argument */ { DumpId maxDumpId = getMaxDumpId(); - int *pendingHeap; + binaryheap *pendingHeap; int *beforeConstraints; int *idMap; DumpableObject *obj; - int heapLength; int i, j, k; @@ -403,7 +402,7 @@ TopoSort(DumpableObject **objs, return true; /* Create workspace for the above-described heap */ - pendingHeap = (int *) pg_malloc(numObjs * sizeof(int)); + pendingHeap = binaryheap_allocate(numObjs, int_cmp, NULL); /* * Scan the constraints, and for each item in the input, generate a count @@ -434,19 +433,16 @@ TopoSort(DumpableObject **objs, * Now initialize the heap of items-ready-to-output by filling it with the * indexes of items that already have beforeConstraints[id] == 0. * - * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each j - * in the range 1..heapLength-1 (note we are using 0-based subscripts - * here, while the discussion in Knuth assumes 1-based subscripts). So, if - * we simply enter the indexes into pendingHeap[] in decreasing order, we - * a-fortiori have the heap invariant satisfied at completion of this - * loop, and don't need to do any sift-up comparisons. + * We enter the objects into pendingHeap in decreasing order so that the + * heap invariant is satisfied at the completion of this loop. This + * reduces the amount of work that binaryheap_build() must do. */ - heapLength = 0; for (i = numObjs; --i >= 0;) { if (beforeConstraints[objs[i]->dumpId] == 0) - pendingHeap[heapLength++] = i; + binaryheap_add_unordered(pendingHeap, (void *) (intptr_t) i); } + binaryheap_build(pendingHeap); /*-------------------- * Now emit objects, working backwards in the output list. At each step, @@ -464,10 +460,10 @@ TopoSort(DumpableObject **objs, *-------------------- */ i = numObjs; - while (heapLength > 0) + while (!binaryheap_empty(pendingHeap)) { /* Select object to output by removing largest heap member */ - j = removeHeapElement(pendingHeap, heapLength--); + j = (int) (intptr_t) binaryheap_remove_first(pendingHeap); obj = objs[j]; /* Output candidate to ordering[] */ ordering[--i] = obj; @@ -477,7 +473,7 @@ TopoSort(DumpableObject **objs, int id = obj->dependencies[k]; if ((--beforeConstraints[id]) == 0) - addHeapElement(idMap[id], pendingHeap, heapLength++); + binaryheap_add(pendingHeap, (void *) (intptr_t) idMap[id]); } } @@ -497,79 +493,13 @@ TopoSort(DumpableObject **objs, } /* Done */ - free(pendingHeap); + binaryheap_free(pendingHeap); free(beforeConstraints); free(idMap); return (i == 0); } -/* - * Add an item to a heap (priority queue) - * - * heapLength is the current heap size; caller is responsible for increasing - * its value after the call. There must be sufficient storage at *heap. - */ -static void -addHeapElement(int val, int *heap, int heapLength) -{ - int j; - - /* - * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is - * using 1-based array indexes, not 0-based. - */ - j = heapLength; - while (j > 0) - { - int i = (j - 1) >> 1; - - if (val <= heap[i]) - break; - heap[j] = heap[i]; - j = i; - } - heap[j] = val; -} - -/* - * Remove the largest item present in a heap (priority queue) - * - * heapLength is the current heap size; caller is responsible for decreasing - * its value after the call. - * - * We remove and return heap[0], which is always the largest element of - * the heap, and then "sift up" to maintain the heap invariant. - */ -static int -removeHeapElement(int *heap, int heapLength) -{ - int result = heap[0]; - int val; - int i; - - if (--heapLength <= 0) - return result; - val = heap[heapLength]; /* value that must be reinserted */ - i = 0; /* i is where the "hole" is */ - for (;;) - { - int j = 2 * i + 1; - - if (j >= heapLength) - break; - if (j + 1 < heapLength && - heap[j] < heap[j + 1]) - j++; - if (val >= heap[j]) - break; - heap[i] = heap[j]; - i = j; - } - heap[i] = val; - return result; -} - /* * findDependencyLoops - identify loops in TopoSort's failure output, * and pass each such loop to repairDependencyLoop() for action @@ -1559,3 +1489,12 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) (int) obj->objType, obj->dumpId, obj->catId.oid); } + +/* + * Compares "a" and "b" as integers. + */ +static int +int_cmp(void *a, void *b, void *arg) +{ + return (int) (intptr_t) a - (int) (intptr_t) b; +} -- 2.25.1