Hi, I did a basic review and testing of this patch today. Overall I think the patch is in very good shape - I agree with the tradeoffs it makes, and I like the approach in general. I do have a couple minor comments about the code, and then maybe a couple thoughts about the approach.
First, some comments - I'll put them here, but I also kept them in "review" commits, because that makes it easier to show the exact place in the code the comment is about. 1) binaryheap_allocate got a new "indexed" argument, but the comment is not updated to document it 2) I think it's preferable to use descriptive argument names for bh_set_node. I don't think there's a good reason to keep it short. 3) In a couple places we have code like this: if (heap->bh_indexed) bh_nodeidx_delete(heap->bh_nodeidx, result); Maybe it'd be better to have the if condition in bh_nodeidx_delete, so that it can be called without it. 4) Could we check the "found" flag in bh_set_node, somehow? I mean, we either expect to find the node (update of already tracked transaction) or not (when inserting it). The life cycle may be non-trivial (node added, updated and removed, ...), would be useful assert I think. 5) Do we actually need the various mem_freed local variables in a couple places, when we expect the value to be equal to txn->size (there's even assert enforcing that)? 6) ReorderBufferCleanupTXN has a comment about maybe not using the same threshold both to enable & disable usage of the binaryheap. I agree with that, otherwise we could easily end up "trashing" if we add/remove transactions right around the threshold. I think 90-95% for disabling the heap would work fine. 7) The code disabling binaryheap (based on the threshold) is copied in a couple places, perhaps it should be a separate function called from those places. 8) Similarly to (3), maybe ReorderBufferTXNMemoryUpdate should do the memory size check internally, to make the calls simpler. 9) The ReorderBufferChangeMemoryUpdate / ReorderBufferTXNMemoryUpdate split maybe not very clear. It's not clear to me why it's divided like this, or why we can't simply call ReorderBufferTXNMemoryUpdate directly. performance ----------- I did some benchmarks, to see the behavior in simple good/bad cases (see the attached scripts.tgz). "large" is one large transaction inserting 1M rows, small is 64k single-row inserts, and subxacts is the original case with ~100k subxacts. Finally, subxacts-small is many transactions with 128 subxacts each (the main transactions are concurrent). The results are pretty good, I think: test master patched ----------------------------------------------------- large 2587 2459 95% small 956 856 89% subxacts 138915 2911 2% subxacts-small 13632 13187 97% This is timing (ms) with logical_work_mem=4MB. I also tried with 64MB, where the subxact timing goes way down, but the overall conclusions do not change. I was a bit surprised I haven't seen any clear regression, but in the end that's a good thing, right? There's a couple results in this thread showing ~10% regression, but I've been unable to reproduce those. Perhaps the newer patch versions fix that, I guess. Anyway, I think that at some point we'd have to accept that some cases may have slight regression. I think that's inherent for almost any heuristics - there's always going to be some rare case that defeats it. What's important is that the case needs to be rare and/or the impact very limited. And I think that's true here. overall design -------------- As for the design, I agree with the approach of using a binaryheap to track transactions by size. When going over the thread history, describing the initial approach with only keeping "large" transactions above some threshold (e.g. 10%), I was really concerned that'll either lead to abrupt changes in behavior (when transactions move just around the 10%), or won't help with many common cases (with most transactions being below the limit). I was going to suggest some sort of "binning" - keeping lists for transactions of similar size (e.g. <1kB, 1-2kB, 2-4kB, 4-8kB, ...) and evicting transactions from a list, i.e. based on approximate size. But if the indexed binary heap seems to be cheap enough, I think it's a better solution. The one thing I'm a bit concerned about is the threshold used to start using binary heap - these thresholds with binary decisions may easily lead to a "cliff" and robustness issues, i.e. abrupt change in behavior with significant runtime change (e.g. you add/remove one transaction and the code takes a much more expensive path). The value (1024) seems rather arbitrary, I wonder if there's something to justify that choice. In any case, I agree it'd be good to have some dampening factor, to reduce the risk of trashing because of adding/removing a single transaction to the decoding. related stuff / GenerationContext --------------------------------- It's not the fault of this patch, but this reminds me I have some doubts about how the eviction interferes with using the GenerationContext for some of the data. I suspect we can easily get into a situation where we evict the largest transaction, but that doesn't actually reduce the memory usage at all, because the memory context blocks are shared with some other transactions and don't get 100% empty (so we can't release them). But it's actually worse, because GenerationContext does not even reuse this memory. So do we even gain anything by the eviction? When the earlier patch versions also considered age of the transaction, to try evicting the older ones first, I think that was interesting. I think we may want to do something like this even with the binary heap. related stuff / increase of logical_decoding_work_mem ----------------------------------------------------- In the thread, one of the "alternatives to spilling" suggested in the thread was to enable streaming, but I think there's often a much more efficient alternative - increase the amount of memory, so that we don't actually need to spill. For example, a system may be doing a lot of eviction / spilling with logical_decoding_work_mem=64MB, but setting 128MB may completely eliminate that. Of course, if there are large transactions, this may not be possible (the GUC would have to exceed RAM). But I don't think that's very common, the incidents that I've observed were often resolved by bumping the logical_decoding_work_mem by a little bit. I wonder if there's something we might do to help users to tune this. We should be able to measure the "peak" memory usage (how much memory we'd need to not spill), so maybe we could log that as a WARNING, similarly to checkpoints - there we only log "checkpoints too frequent, tune WAL limits", but perhaps we might do more here? Or maybe we could add the watermark to the system catalog? regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
From 6dfeb61ffddeedc8e00f8de5eb6b644b28ae1f62 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Fri, 23 Feb 2024 13:15:44 +0100 Subject: [PATCH v5 5/5] review --- .../replication/logical/reorderbuffer.c | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f22cf2fb9b8..40fa2ba9843 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1537,7 +1537,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; - Size mem_freed = 0; + Size mem_freed = 0; /* XXX why don't we use txn->size directly? */ /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1571,11 +1571,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferReturnChange(rb, change, false); } - /* Update the memory counter */ - Assert(mem_freed == txn->size); - if (mem_freed > 0) - ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); - /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1635,14 +1630,21 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* deallocate */ ReorderBufferReturnTXN(rb, txn); + /* Update the memory counter */ + Assert(mem_freed == txn->size); + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + /* - * Check if the number of transactions get lower than the threshold. If + * Check if the number of transactions got lower than the threshold. If * so, switch to NO_MAXHEAP state and reset the max-heap. * - * XXX: If a new transaction is added and the memory usage reached the + * XXX: If a new transaction is added and the memory usage reaches the * limit soon, we will end up building the max-heap again. It might be * more efficient if we accept a certain amount of transactions to switch * back to the NO_MAXHEAP state, say 95% of the threshold. + * + * XXX Yes, having the enable/disable threshold exactly the same can lead + * to trashing. Something like 90% would work, I think. */ if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) @@ -3257,6 +3259,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, * counters instead - we don't really care about subtransactions as we * can't stream them individually anyway, and we only pick toplevel * transactions for eviction. So only toplevel transactions matter. + * + * XXX Not sure the naming is great, it seems pretty similar to the earlier + * function, can be quite confusing. Why do we even need the separate function + * and can't simply call ReorderBufferChangeMemoryUpdate from everywhere? */ static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, @@ -3264,6 +3270,9 @@ ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, { ReorderBufferTXN *toptxn; + if (sz == 0) + return; + /* * Update the total size in top level as well. This is later used to * compute the decoding stats. @@ -3745,6 +3754,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * Check the number of transactions in max-heap after evicting large * transactions. If the number of transactions is small, we switch back * to the NO_MAXHEAP state, and reset the current the max-heap. + * + * XXX We already have this block elsewhere, maybe have a function? */ if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) @@ -3769,7 +3780,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo curOpenSegNo = 0; Size spilled = 0; Size size = txn->size; - Size mem_freed = 0; + Size mem_freed = 0; /* XXX why needed? can't we just use txn->size? */ elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -3831,8 +3842,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Update the memory counter */ Assert(mem_freed == txn->size); - if (mem_freed > 0) - ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); /* update the statistics iff we have spilled anything */ if (spilled) -- 2.43.0
From 889d0dc3a3ff203fd382e5020029a78b9334c586 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.m...@gmail.com> Date: Fri, 26 Jan 2024 11:31:41 +0900 Subject: [PATCH v5 4/5] Use max-heap to evict largest transactions in ReorderBuffer. Previously, when selecting the transaction to evict, we check all transactions to find the largest transaction. Which could lead to a significant replication lag especially in case where there are many subtransactions. This commit improves the eviction algorithm in ReorderBuffer using the max-heap with transaction size as the key to find the largest transaction. The max-heap state is maneged in two states. Overall algorithm: REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do not update the max-heap when updating the memory counter. We build the max-heap just before selecting large transactions. Therefore, in this state, we can update the memory counter with no additional costs but need O(n) time to get the largest transaction, where n is the number of transactions including top-level transactions and subtransactions. Once we build the max-heap, we switch to REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update the max-heap when updating the memory counter. The intention is to efficiently retrieve the largest transaction in O(1) time instead of incurring the cost of memory counter updates (O(log n)). We remain in this state as long as the number of transactions is larger than the threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap. The performance benchmark results showed significant speed up (more than x30 speed up on my machine) in decoding a transaction with 100k subtransactions, whereas there is no visible overhead in other cases. XXX: update typedef.list Author: Reviewed-by: Discussion: https://postgr.es/m/ --- .../replication/logical/reorderbuffer.c | 197 +++++++++++++++--- src/include/replication/reorderbuffer.h | 21 ++ 2 files changed, 189 insertions(+), 29 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 91b9618d7ec..f22cf2fb9b8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -67,6 +67,26 @@ * allocator, evicting the oldest changes would make it more likely the * memory gets actually freed. * + * We use a max-heap with transaction size as the key to efficiently find + * the largest transaction. The max-heap state is managed in two states: + * REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP. + * + * REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do + * not update the max-heap when updating the memory counter. We build the + * max-heap just before selecting large transactions. Therefore, in this + * state, we can update the memory counter with no additional costs but + * need O(n) time to get the largest transaction, where n is the number of + * transactions including top-level transactions and subtransactions. + * + * Once we build the max-heap, we switch to + * REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update + * the max-heap when updating the memory counter. The intention is to + * efficiently retrieve the largest transaction in O(1) time instead of + * incurring the cost of memory counter updates (O(log n)). We remain in + * this state as long as the number of transactions is larger than the + * threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back + * to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap. + * * We still rely on max_changes_in_memory when loading serialized changes * back into memory. At that point we can't use the memory limit directly * as we load the subxacts independently. One option to deal with this @@ -109,6 +129,11 @@ #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * The threshold of the number of transactions in the max-heap (rb->txn_heap) + * to switch the state. + */ +#define REORDER_BUFFER_MEM_TRACK_THRESHOLD 1024 /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -296,6 +321,9 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz); +static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg); +static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool addition, Size sz); /* * Allocate a new ReorderBuffer and clean out any old serialized state from @@ -357,6 +385,15 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + /* + * Don't start with a lower number than REORDER_BUFFER_MEM_TRACK_THRESHOLD, since + * we add at least REORDER_BUFFER_MEM_TRACK_THRESHOLD entries at once. + */ + buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + buffer->txn_heap = binaryheap_allocate(REORDER_BUFFER_MEM_TRACK_THRESHOLD * 2, + ReorderBufferTXNSizeCompare, + true, NULL); + buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; @@ -1500,6 +1537,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { bool found; dlist_mutable_iter iter; + Size mem_freed = 0; /* cleanup subtransactions & their changes */ dlist_foreach_modify(iter, &txn->subtxns) @@ -1529,9 +1567,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change, true); + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); } + /* Update the memory counter */ + Assert(mem_freed == txn->size); + if (mem_freed > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -1590,6 +1634,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* deallocate */ ReorderBufferReturnTXN(rb, txn); + + /* + * Check if the number of transactions get lower than the threshold. If + * so, switch to NO_MAXHEAP state and reset the max-heap. + * + * XXX: If a new transaction is added and the memory usage reached the + * limit soon, we will end up building the max-heap again. It might be + * more efficient if we accept a certain amount of transactions to switch + * back to the NO_MAXHEAP state, say 95% of the threshold. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && + (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) + { + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + binaryheap_reset(rb->txn_heap); + } } /* @@ -3162,16 +3222,6 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, /* * Update memory counters to account for the new or removed change. - * - * We update two counters - in the reorder buffer, and in the transaction - * containing the change. The reorder buffer counter allows us to quickly - * decide if we reached the memory limit, the transaction counter allows - * us to quickly pick the largest transaction for eviction. - * - * When streaming is enabled, we need to update the toplevel transaction - * counters instead - we don't really care about subtransactions as we - * can't stream them individually anyway, and we only pick toplevel - * transactions for eviction. So only toplevel transactions matter. */ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, @@ -3179,7 +3229,6 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, bool addition, Size sz) { ReorderBufferTXN *txn; - ReorderBufferTXN *toptxn; Assert(change->txn); @@ -3193,6 +3242,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn = change->txn; + ReorderBufferTXNMemoryUpdate(rb, txn, addition, sz); +} + +/* + * Update memory counter of the given transaction. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. + */ +static void +ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool addition, Size sz) +{ + ReorderBufferTXN *toptxn; + /* * Update the total size in top level as well. This is later used to * compute the decoding stats. @@ -3206,6 +3277,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size += sz; + + /* Update the max-heap as well if necessary */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP) + { + if ((txn->size - sz) == 0) + binaryheap_add(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn)); + } } else { @@ -3215,6 +3295,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size -= sz; + + /* Update the max-heap as well if necessary */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP) + { + if (txn->size == 0) + binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn)); + } } Assert(txn->size <= rb->size); @@ -3472,31 +3561,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) /* * Find the largest transaction (toplevel or subxact) to evict (spill to disk). - * - * XXX With many subtransactions this might be quite slow, because we'll have - * to walk through all of them. There are some options how we could improve - * that: (a) maintain some secondary structure with transactions sorted by - * amount of changes, (b) not looking for the entirely largest transaction, - * but e.g. for transaction using at least some fraction of the memory limit, - * and (c) evicting multiple transactions at once, e.g. to free a given portion - * of the memory limit (e.g. 50%). */ static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb) { - HASH_SEQ_STATUS hash_seq; - ReorderBufferTXNByIdEnt *ent; ReorderBufferTXN *largest = NULL; - hash_seq_init(&hash_seq, rb->by_txn); - while ((ent = hash_seq_search(&hash_seq)) != NULL) + /* + * Build the max-heap to pick the largest transaction if not yet. We will + * run a heap assembly step at the end, which is more efficient. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP) { - ReorderBufferTXN *txn = ent->txn; + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) + { + ReorderBufferTXN *txn = ent->txn; + + if (txn->size == 0) + continue; + + binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn)); + } + + binaryheap_build(rb->txn_heap); + + /* + * The max-heap is ready now. We remain in this state at least until + * we free up enough transactions to bring the total memory usage + * below the limit. + */ + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP; } + else + Assert(binaryheap_size(rb->txn_heap) > 0); + + largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap)); Assert(largest); Assert(largest->size > 0); @@ -3638,6 +3741,18 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->nentries_mem == 0); } + /* + * Check the number of transactions in max-heap after evicting large + * transactions. If the number of transactions is small, we switch back + * to the NO_MAXHEAP state, and reset the current the max-heap. + */ + if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP && + (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD)) + { + rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP; + binaryheap_reset(rb->txn_heap); + } + /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * 1024L); } @@ -3654,6 +3769,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) XLogSegNo curOpenSegNo = 0; Size spilled = 0; Size size = txn->size; + Size mem_freed = 0; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -3707,11 +3823,17 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + mem_freed += ReorderBufferChangeSize(change); + ReorderBufferReturnChange(rb, change, false); spilled++; } + /* Update the memory counter */ + Assert(mem_freed == txn->size); + if (mem_freed > 0) + ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed); + /* update the statistics iff we have spilled anything */ if (spilled) { @@ -5273,3 +5395,20 @@ restart: *cmax = ent->cmax; return true; } + +/* + * Compare between sizes of two transactions. This is for a binary heap + * comparison function. + */ +static int +ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) +{ + ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a); + ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b); + + if (ta->size < tb->size) + return -1; + if (ta->size > tb->size) + return 1; + return 0; +} diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0b2c95f7aa0..f0d352cfcc6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "lib/binaryheap.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -531,6 +532,22 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) ( ReorderBufferTXN *txn, XLogRecPtr lsn); +/* State of how to track the memory usage of each transaction being decoded */ +typedef enum ReorderBufferMemTrackState +{ + /* + * We don't update max-heap while updating the memory counter. The + * max-heap is built before use. + */ + REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP, + + /* + * We also update the max-heap when updating the memory counter so + * the heap property is always preserved. + */ + REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP, +} ReorderBufferMemTrackState; + struct ReorderBuffer { /* @@ -631,6 +648,10 @@ struct ReorderBuffer /* memory accounting */ Size size; + /* Max-heap for sizes of all top-level and sub transactions */ + ReorderBufferMemTrackState memtrack_state; + binaryheap *txn_heap; + /* * Statistics about transactions spilled to disk. * -- 2.43.0
From f2b54fbb2bc0b6a74d10f46b086e238d76fe822f Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Fri, 23 Feb 2024 13:32:04 +0100 Subject: [PATCH v5 3/5] review --- src/common/binaryheap.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index ff03c477dc9..f656c47524e 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -54,6 +54,8 @@ static void sift_up(binaryheap *heap, int node_off); * store the given number of nodes, with the heap property defined by * the given comparator function, which will be invoked with the additional * argument specified by 'arg'. + * + * XXX Should document the new "indexed" argument. */ binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, @@ -110,6 +112,7 @@ binaryheap_free(binaryheap *heap) { if (heap->bh_indexed) bh_nodeidx_destroy(heap->bh_nodeidx); + pfree(heap); } @@ -152,28 +155,34 @@ bh_enlarge_node_array(binaryheap *heap) } /* - * Set the given node at the 'idx' and updates its position accordingly. + * Set the given node at the 'index' and updates its position accordingly. + * + * XXX No need to shorten the argument names, I think. + * + * XXX Should this return "found" maybe? */ static void -bh_set_node(binaryheap *heap, bh_node_type d, int idx) +bh_set_node(binaryheap *heap, bh_node_type node, int index) { bh_nodeidx_entry *ent; bool found; /* Set the node to the nodes array */ - heap->bh_nodes[idx] = d; + heap->bh_nodes[index] = node; if (heap->bh_indexed) { /* Remember its index in the nodes array */ - ent = bh_nodeidx_insert(heap->bh_nodeidx, d, &found); - ent->idx = idx; + ent = bh_nodeidx_insert(heap->bh_nodeidx, node, &found); + ent->idx = index; } } /* * Replace the node at 'idx' with the given node 'replaced_by'. Also * update their positions accordingly. + * + * XXX can we do Assert(found) here? if bh_set_node returns it, ofc */ static void bh_replace_node(binaryheap *heap, int idx, bh_node_type replaced_by) @@ -280,6 +289,8 @@ binaryheap_remove_first(binaryheap *heap) { heap->bh_size--; + /* XXX maybe it'd be good to make the check in bh_nodeidx_delete, so that + * we don't need to do it everywhere. */ if (heap->bh_indexed) bh_nodeidx_delete(heap->bh_nodeidx, result); -- 2.43.0
From a2a7db6e02344982764b07ec4bf4d509d1dd7ae4 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.m...@gmail.com> Date: Fri, 26 Jan 2024 11:20:23 +0900 Subject: [PATCH v5 2/5] Add functions to binaryheap to efficiently remove/update keys. Previously, binaryheap didn't support key updates and removing nodes in an efficient way. For example, in order to remove a node from the binaryheap, the caller has to pass the node's position within the array that the binaryheap internally has. Removing a node from the binaryheap is done in O(log n) but searching for the key's position is done in O(n). This commit adds a hash table to binaryheap to track of positions of each nodes in the binaryheap. That way, by using newly added functions such as binaryheap_update_up() etc., both updating a key and removing a node can node can be done in O(1) in an average and O(log n) in worst case. This is known as the indexed priority queue. The caller can specify to use the indexed binaryheap by passing indexed = true. There is no user of it but it will be used by a upcoming patch. XXX: update typedef.list Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- src/backend/executor/nodeGatherMerge.c | 1 + src/backend/executor/nodeMergeAppend.c | 2 +- src/backend/postmaster/pgarch.c | 3 +- .../replication/logical/reorderbuffer.c | 1 + src/backend/storage/buffer/bufmgr.c | 1 + src/bin/pg_dump/pg_backup_archiver.c | 1 + src/bin/pg_dump/pg_dump_sort.c | 2 +- src/common/binaryheap.c | 167 ++++++++++++++++-- src/include/lib/binaryheap.h | 35 +++- 9 files changed, 199 insertions(+), 14 deletions(-) diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 2d552f42240..250f226d5f8 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -427,6 +427,7 @@ gather_merge_setup(GatherMergeState *gm_state) /* Allocate the resources for the merge */ gm_state->gm_heap = binaryheap_allocate(nreaders + 1, heap_compare_slots, + false, gm_state); } diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 08178684528..1980794cb7a 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -125,7 +125,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) mergestate->ms_nplans = nplans; mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); - mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, + mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, false, mergestate); /* diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c index 9c18e4b3efb..36522940dd4 100644 --- a/src/backend/postmaster/pgarch.c +++ b/src/backend/postmaster/pgarch.c @@ -250,7 +250,8 @@ PgArchiverMain(void) /* Initialize our max-heap for prioritizing files to archive. */ arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN, - ready_file_comparator, NULL); + ready_file_comparator, false, + NULL); /* Load the archive_library. */ LoadArchiveLibrary(); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5446df3c647..91b9618d7ec 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1296,6 +1296,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, /* allocate heap */ state->heap = binaryheap_allocate(state->nr_txns, ReorderBufferIterCompare, + false, state); /* Now that the state fields are initialized, it is safe to return it. */ diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index bdf89bbc4dc..69f071321dd 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -2725,6 +2725,7 @@ BufferSync(int flags) */ ts_heap = binaryheap_allocate(num_spaces, ts_ckpt_progress_comparator, + false, NULL); for (i = 0; i < num_spaces; i++) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index d97ebaff5b8..6587a7b0814 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -4033,6 +4033,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Set up ready_heap with enough room for all known TocEntrys */ ready_heap = binaryheap_allocate(AH->tocCount, TocEntrySizeCompareBinaryheap, + false, NULL); /* diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 8ee8a42781a..4d10af3a344 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -405,7 +405,7 @@ TopoSort(DumpableObject **objs, return true; /* Create workspace for the above-described heap */ - pendingHeap = binaryheap_allocate(numObjs, int_cmp, NULL); + pendingHeap = binaryheap_allocate(numObjs, int_cmp, false, NULL); /* * Scan the constraints, and for each item in the input, generate a count diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index 6f16c83295d..ff03c477dc9 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -22,8 +22,28 @@ #ifdef FRONTEND #include "common/logging.h" #endif +#include "common/hashfn.h" #include "lib/binaryheap.h" +/* + * Define parameters for hash table code generation. The interface is *also*" + * declared in binaryheaph.h (to generate the types, which are externally + * visible). + */ +#define SH_PREFIX bh_nodeidx +#define SH_ELEMENT_TYPE bh_nodeidx_entry +#define SH_KEY_TYPE bh_node_type +#define SH_KEY key +#define SH_HASH_KEY(tb, key) \ + hash_bytes((const unsigned char *) &key, sizeof(bh_node_type)) +#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(bh_node_type)) == 0) +#define SH_SCOPE extern +#ifdef FRONTEND +#define SH_RAW_ALLOCATOR pg_malloc0 +#endif +#define SH_DEFINE +#include "lib/simplehash.h" + static void sift_down(binaryheap *heap, int node_off); static void sift_up(binaryheap *heap, int node_off); @@ -36,7 +56,8 @@ static void sift_up(binaryheap *heap, int node_off); * argument specified by 'arg'. */ binaryheap * -binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) +binaryheap_allocate(int capacity, binaryheap_comparator compare, + bool indexed, void *arg) { binaryheap *heap; @@ -49,6 +70,17 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) heap->bh_has_heap_property = true; heap->bh_nodes = (bh_node_type *) palloc(sizeof(bh_node_type) * capacity); + heap->bh_indexed = indexed; + if (heap->bh_indexed) + { +#ifdef FRONTEND + heap->bh_nodeidx = bh_nodeidx_create(capacity, NULL); +#else + heap->bh_nodeidx = bh_nodeidx_create(CurrentMemoryContext, capacity, + NULL); +#endif + } + return heap; } @@ -63,6 +95,9 @@ binaryheap_reset(binaryheap *heap) { heap->bh_size = 0; heap->bh_has_heap_property = true; + + if (heap->bh_indexed) + bh_nodeidx_reset(heap->bh_nodeidx); } /* @@ -73,6 +108,8 @@ binaryheap_reset(binaryheap *heap) void binaryheap_free(binaryheap *heap) { + if (heap->bh_indexed) + bh_nodeidx_destroy(heap->bh_nodeidx); pfree(heap); } @@ -114,6 +151,44 @@ bh_enlarge_node_array(binaryheap *heap) sizeof(bh_node_type) * heap->bh_space); } +/* + * Set the given node at the 'idx' and updates its position accordingly. + */ +static void +bh_set_node(binaryheap *heap, bh_node_type d, int idx) +{ + bh_nodeidx_entry *ent; + bool found; + + /* Set the node to the nodes array */ + heap->bh_nodes[idx] = d; + + if (heap->bh_indexed) + { + /* Remember its index in the nodes array */ + ent = bh_nodeidx_insert(heap->bh_nodeidx, d, &found); + ent->idx = idx; + } +} + +/* + * Replace the node at 'idx' with the given node 'replaced_by'. Also + * update their positions accordingly. + */ +static void +bh_replace_node(binaryheap *heap, int idx, bh_node_type replaced_by) +{ + bh_node_type node = heap->bh_nodes[idx]; + + /* Remove overwritten node's index */ + if (heap->bh_indexed) + (void) bh_nodeidx_delete(heap->bh_nodeidx, node); + + /* Replace it with the given new node */ + if (idx < heap->bh_size) + bh_set_node(heap, replaced_by, idx); +} + /* * binaryheap_add_unordered * @@ -130,7 +205,7 @@ binaryheap_add_unordered(binaryheap *heap, bh_node_type d) bh_enlarge_node_array(heap); heap->bh_has_heap_property = false; - heap->bh_nodes[heap->bh_size] = d; + bh_set_node(heap, d, heap->bh_size); heap->bh_size++; } @@ -163,7 +238,7 @@ binaryheap_add(binaryheap *heap, bh_node_type d) if (heap->bh_size >= heap->bh_space) bh_enlarge_node_array(heap); - heap->bh_nodes[heap->bh_size] = d; + bh_set_node(heap, d, heap->bh_size); heap->bh_size++; sift_up(heap, heap->bh_size - 1); } @@ -204,6 +279,10 @@ binaryheap_remove_first(binaryheap *heap) if (heap->bh_size == 1) { heap->bh_size--; + + if (heap->bh_indexed) + bh_nodeidx_delete(heap->bh_nodeidx, result); + return result; } @@ -211,7 +290,7 @@ binaryheap_remove_first(binaryheap *heap) * Remove the last node, placing it in the vacated root entry, and sift * the new root node down to its correct position. */ - heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size]; + bh_replace_node(heap, 0, heap->bh_nodes[--heap->bh_size]); sift_down(heap, 0); return result; @@ -237,7 +316,7 @@ binaryheap_remove_node(binaryheap *heap, int n) heap->bh_arg); /* remove the last node, placing it in the vacated entry */ - heap->bh_nodes[n] = heap->bh_nodes[heap->bh_size]; + bh_replace_node(heap, n, heap->bh_nodes[heap->bh_size]); /* sift as needed to preserve the heap property */ if (cmp > 0) @@ -246,6 +325,74 @@ binaryheap_remove_node(binaryheap *heap, int n) sift_down(heap, n); } +/* + * binaryheap_remove_node_ptr + * + * Similar to binaryheap_remove_node() but removes the given node. The caller + * must ensure that the given node is in the heap. O(log n) worst case. + * + * This function can be used only if bh_indexed is true. + */ +void +binaryheap_remove_node_ptr(binaryheap *heap, bh_node_type d) +{ + bh_nodeidx_entry *ent; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(heap->bh_indexed); + + ent = bh_nodeidx_lookup(heap->bh_nodeidx, d); + Assert(ent); + + binaryheap_remove_node(heap, ent->idx); +} + +/* + * binaryheap_update_up + * + * Sift the given node up after the node's key is updated. The caller must + * ensure that the given node is in the heap. O(log n) worst case. + * + * This function can be used only if bh_indexed is true. + */ +void +binaryheap_update_up(binaryheap *heap, bh_node_type d) +{ + bh_nodeidx_entry *ent; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(heap->bh_indexed); + + ent = bh_nodeidx_lookup(heap->bh_nodeidx, d); + Assert(ent); + Assert(ent->idx >= 0 && ent->idx < heap->bh_size); + + sift_up(heap, ent->idx); +} + +/* + * binaryheap_update_down + * + * Sift the given node down after the node's key is updated. The caller must + * ensure that the given node is in the heap. O(log n) worst case. + * + * This function can be used only if bh_indexed is true. + */ +void +binaryheap_update_down(binaryheap *heap, bh_node_type d) +{ + bh_nodeidx_entry *ent; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(heap->bh_indexed); + + ent = bh_nodeidx_lookup(heap->bh_nodeidx, d); + Assert(ent); + Assert(ent->idx >= 0 && ent->idx < heap->bh_size); + + sift_down(heap, ent->idx); +} + /* * binaryheap_replace_first * @@ -258,7 +405,7 @@ binaryheap_replace_first(binaryheap *heap, bh_node_type d) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); - heap->bh_nodes[0] = d; + bh_replace_node(heap, 0, d); if (heap->bh_size > 1) sift_down(heap, 0); @@ -300,11 +447,11 @@ sift_up(binaryheap *heap, int node_off) * Otherwise, swap the parent value with the hole, and go on to check * the node's new parent. */ - heap->bh_nodes[node_off] = parent_val; + bh_set_node(heap, parent_val, node_off); node_off = parent_off; } /* Re-fill the hole */ - heap->bh_nodes[node_off] = node_val; + bh_set_node(heap, node_val, node_off); } /* @@ -359,9 +506,9 @@ sift_down(binaryheap *heap, int node_off) * Otherwise, swap the hole with the child that violates the heap * property; then go on to check its children. */ - heap->bh_nodes[node_off] = heap->bh_nodes[swap_off]; + bh_set_node(heap, heap->bh_nodes[swap_off], node_off); node_off = swap_off; } /* Re-fill the hole */ - heap->bh_nodes[node_off] = node_val; + bh_set_node(heap, node_val, node_off); } diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 1439f208033..48c2de33b48 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -29,6 +29,28 @@ typedef Datum bh_node_type; */ typedef int (*binaryheap_comparator) (bh_node_type a, bh_node_type b, void *arg); +/* + * Struct for A hash table element to store the node's index in the bh_nodes + * array. + */ +typedef struct bh_nodeidx_entry +{ + bh_node_type key; + char status; + int idx; +} bh_nodeidx_entry; + +/* define parameters necessary to generate the hash table interface */ +#define SH_PREFIX bh_nodeidx +#define SH_ELEMENT_TYPE bh_nodeidx_entry +#define SH_KEY_TYPE bh_node_type +#define SH_SCOPE extern +#ifdef FRONTEND +#define SH_RAW_ALLOCATOR pg_malloc0 +#endif +#define SH_DECLARE +#include "lib/simplehash.h" + /* * binaryheap * @@ -47,11 +69,19 @@ typedef struct binaryheap binaryheap_comparator bh_compare; void *bh_arg; bh_node_type *bh_nodes; + + /* + * If bh_indexed is true, the bh_nodeidx is used to track of each + * node's index in bh_nodes. This enables the caller to perform + * binaryheap_remove_node_ptr(), binaryheap_update_up/down in O(log n). + */ + bool bh_indexed; + bh_nodeidx_hash *bh_nodeidx; } binaryheap; extern binaryheap *binaryheap_allocate(int capacity, binaryheap_comparator compare, - void *arg); + bool indexed, void *arg); extern void binaryheap_reset(binaryheap *heap); extern void binaryheap_free(binaryheap *heap); extern void binaryheap_add_unordered(binaryheap *heap, bh_node_type d); @@ -60,7 +90,10 @@ extern void binaryheap_add(binaryheap *heap, bh_node_type d); extern bh_node_type binaryheap_first(binaryheap *heap); extern bh_node_type binaryheap_remove_first(binaryheap *heap); extern void binaryheap_remove_node(binaryheap *heap, int n); +extern void binaryheap_remove_node_ptr(binaryheap *heap, bh_node_type d); extern void binaryheap_replace_first(binaryheap *heap, bh_node_type d); +extern void binaryheap_update_up(binaryheap *heap, bh_node_type d); +extern void binaryheap_update_down(binaryheap *heap, bh_node_type d); #define binaryheap_empty(h) ((h)->bh_size == 0) #define binaryheap_size(h) ((h)->bh_size) -- 2.43.0
From 540bfa5568ee07205bc3e18aaec78e02ef2051c0 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.m...@gmail.com> Date: Fri, 26 Jan 2024 17:12:20 +0900 Subject: [PATCH v5 1/5] Make binaryheap enlareable. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch-through: --- src/common/binaryheap.c | 36 +++++++++++++++++++----------------- src/include/lib/binaryheap.h | 2 +- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index 7377ebdf156..6f16c83295d 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -38,17 +38,16 @@ static void sift_up(binaryheap *heap, int node_off); binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) { - int sz; binaryheap *heap; - sz = offsetof(binaryheap, bh_nodes) + sizeof(bh_node_type) * capacity; - heap = (binaryheap *) palloc(sz); + heap = (binaryheap *) palloc(sizeof(binaryheap)); heap->bh_space = capacity; heap->bh_compare = compare; heap->bh_arg = arg; heap->bh_size = 0; heap->bh_has_heap_property = true; + heap->bh_nodes = (bh_node_type *) palloc(sizeof(bh_node_type) * capacity); return heap; } @@ -104,6 +103,17 @@ parent_offset(int i) return (i - 1) / 2; } +/* + * Make sure there is enough space for nodes. + */ +static void +bh_enlarge_node_array(binaryheap *heap) +{ + heap->bh_space *= 2; + heap->bh_nodes = repalloc(heap->bh_nodes, + sizeof(bh_node_type) * heap->bh_space); +} + /* * binaryheap_add_unordered * @@ -115,14 +125,10 @@ parent_offset(int i) void binaryheap_add_unordered(binaryheap *heap, bh_node_type d) { + /* make sure enough space for a new node */ if (heap->bh_size >= heap->bh_space) - { -#ifdef FRONTEND - pg_fatal("out of binary heap slots"); -#else - elog(ERROR, "out of binary heap slots"); -#endif - } + bh_enlarge_node_array(heap); + heap->bh_has_heap_property = false; heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; @@ -153,14 +159,10 @@ binaryheap_build(binaryheap *heap) void binaryheap_add(binaryheap *heap, bh_node_type d) { + /* make sure enough space for a new node */ if (heap->bh_size >= heap->bh_space) - { -#ifdef FRONTEND - pg_fatal("out of binary heap slots"); -#else - elog(ERROR, "out of binary heap slots"); -#endif - } + bh_enlarge_node_array(heap); + heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; sift_up(heap, heap->bh_size - 1); diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 19025c08ef1..1439f208033 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -46,7 +46,7 @@ typedef struct binaryheap bool bh_has_heap_property; /* debugging cross-check */ binaryheap_comparator bh_compare; void *bh_arg; - bh_node_type bh_nodes[FLEXIBLE_ARRAY_MEMBER]; + bh_node_type *bh_nodes; } binaryheap; extern binaryheap *binaryheap_allocate(int capacity, -- 2.43.0
test-scripts.tgz
Description: application/compressed-tar