On Thu, Feb 13, 2025 at 1:40 PM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> Thomas mentioned this to me off-list, and I think he's right. We
> likely need to rethink the way parallel bitmap heap scan workers get
> block assignments for reading and prefetching to make it more similar
> to parallel sequential scan. The workers should probably get
> assignments of a range of blocks. On master, each worker does end up
> issuing reads/fadvises for a bunch of blocks in a row -- even though
> that isn't the intent of the parallel bitmap table scan
> implementation. We are losing some of that with the patch -- but only
> because it is behaving as you would expect given the implementation
> and design. I don't consider that a blocker, though.

I had a crack at this one a few weeks back, and wanted to share some
thoughts.  Getting sequential block range allocation work was easy
(0001), but ramp-down at this level as a parallel query fairness
strategy seems to be fundamentally incompatible with AIO.  0002 was an
abandoned attempt before I got lost for a while researching the
futility of it all.  See further down.  I gave up for v18.

Of course we already have this problem in v18, but 0001 would magnify
it.  Whether gaining I/O combining is worth losing some parallel
fairness, IDK, but it can wait...

With just the 0001 patch, io_workers=1 and two parallel processes, you
can trace the IO worker to see the ramp-up in each parallel worker and
the maximised I/O combining, but also the abrupt and unfair
end-of-scan:

tmunro@x1:~/projects/postgresql/build$ strace -p 377335 -e
trace=preadv,pread64 -s0
strace: Process 377335 attached
pread64(16, ""..., 8192, 720896)        = 8192
pread64(16, ""..., 8192, 729088)        = 8192
pread64(16, ""..., 16384, 737280)       = 16384
pread64(16, ""..., 16384, 753664)       = 16384
pread64(16, ""..., 32768, 802816)       = 32768
pread64(16, ""..., 32768, 770048)       = 32768
pread64(16, ""..., 65536, 835584)       = 65536
pread64(16, ""..., 65536, 901120)       = 65536
pread64(16, ""..., 131072, 966656)      = 131072
pread64(16, ""..., 131072, 1228800)     = 131072
pread64(16, ""..., 131072, 1097728)     = 131072
pread64(16, ""..., 131072, 1490944)     = 131072
pread64(16, ""..., 131072, 1359872)     = 131072
pread64(16, ""..., 131072, 1753088)     = 131072
pread64(16, ""..., 131072, 1884160)     = 131072
pread64(16, ""..., 131072, 1622016)     = 131072
pread64(16, ""..., 131072, 2015232)     = 131072
preadv(16, [...], 7, 2146304)           = 131072
preadv(16, [...], 7, 2277376)           = 131072
pread64(16, ""..., 131072, 2539520)     = 131072
pread64(16, ""..., 131072, 2408448)     = 131072
pread64(16, ""..., 98304, 2801664)      = 98304
pread64(16, ""..., 131072, 2670592)     = 131072

For real parallel query fairness, I wonder if we should think about a
work stealing approach.  Let me sketch out a simple idea first, and
explain why it's still not enough, and then sketch out a bigger idea:

Imagine a thing called WorkQueueSet in shared memory that contains a
WorkQueue for each worker, and then a function
wqs_get_next_object(&state->tidbitmap_wqs, &object) that looks in your
own backend's queue first, and if there is nothing, it calls a
registered callback that tries to fill your queue up (the current TBM
iterator logic becomes that callback in this case, and the per-backend
WorkQueue replaces the little array that I added in the attached 0001
patch), and if that doesn't work it tries to steal from everyone
else's queue, and if that fails then there really is no more work to
be done and your scan is finished.  That should be perfectly fair, not
just approximately fair under certain assumptions (that are now false)
like ramp down schemes.  *If* we can figure out how to make the
interlocking so lightweight that the common case of consuming from
your own uncontended work queue is practically as fast as reading out
of a local array...  (I have a few ideas about that part but this is
already long enough...)

But that only provides ramp-down for the block number ranges
*entering* each worker's ReadStream.  Our goal is to make the workers
run out of work as close in time as possible, and their work begins
when buffers *exit* each worker's ReadStream.  Thought experiment:
what if we could also steal blocks/buffers that someone else's
ReadStream holds, whether already valid, referenced by an inflight
I/O, or waiting to be combined with more blocks in the pending read?
The buffer manager can already deal with any and all races at its
level and do the right thing, but we'd somehow need to make sure the
theft victim doesn't also process that block.  The ReadStream's buffer
queue could be integrated with your backend's WorkQueue in a
WorkQueueSet in shared memory, so that others can steal your stuff,
and read_stream_next_buffer() would have to discard anything that has
been stolen between entering and exiting the stream.  The common
non-stolen case would have to be extremely cheap, read barriers and
ordering tricks I guess.  (This is basically a new take on an idea I
have mentioned before, parallel-aware ReadStream, except that all
previous versions collapsed in a heap of lock contention and other
design problems.  Work stealing seems to offer a way to keep
everything almost as it is now, except in a few weird corner cases at
end-of-stream when new magic fires up only if required.  It also seems
to be such a broadly useful technique that I'd want a general purpose
thing, not just a ReadStream internal trick.)

Putting both things together, I'm wondering about *two* levels of
WorkQueueSet for stuff like Seq Scan and BHS.

1.  Preserve I/O combining opportunities:  The ReadStream's block
number callback uses WorkQueueSet to hand out block numbers as
described a couple of paragraphs above (perhaps
WORK_QUEUE_TYPE_BLOCK_RANGE would be a built-in mode with queue
elements [blockno, nblocks] so you can consume from them and dequeue
once depleted to implement tableam.c's parallel seq block allocator,
with the callback working just as it does now except with the
ramp-down part removed, and tidbitmap.c would use WORK_QUEUE_TYPE_BLOB
to traffic TBMWorkQueueItem objects replacing the current
TBMIteratorResult).

2.  Fairness:  The ReadStream is put into a mode that attaches to your
backend's WorkQueue in a second WorkQueueSet, and uses it for buffers
and per-buffer-data.  This allowing other ReadStreams in other
processes attached to the same WorkQueueSet to steal stuff.  In the
common case, there's no stealing, but at stream end you get perfect
fairness.

I suspect that a generic work stealing component would find many other
uses.  I can think of quite a few, but here's one: It might be able to
fix the PHJ unmatched scan unfairness problem, within the constraints
of our existing executor model.  Thought experiment: PHJ_BATCH_PROBE
and PHJ_BATCH_SCAN are merged into one phase.  Each worker emits
already joined tuples from a WorkQueueSet (IDK how they hold tuples
exactly, this is hand-waving level).  The callback that pulls those
tuples into your backend's WorkQueue does the actual joining and sets
the match bit, which has the crucial property that you can know when
all all bits are set without waiting, and then the callback can switch
to feeding unmatched tuples into the work queue for FULL/RIGHT joins,
and every worker can participate.  No weird
detach-and-drop-to-single-process, no wait, no deadlock risk.  The
callback's batch size would become a new source of unfairness when
performing the join, so perhaps you need two levels of WorkQueueSet,
one for pre-joined tuples and one for post-joined tuples, and only
when both are completely exhausted do you have all match bits (there
is a small extra complication: I think you need a concept of 'dequeued
but busy' when moving items from pre-joined to post-joined, and a CV
so you can wait for that condition to clear, or something like that,
but that's a guaranteed-progress wait and only used in a very rare
edge case; if I'm making in mistake in all this it may be related to
this type of thing, but I'm not seeing anything unsolvable).  Note
that I didn't say you could take over other workers' scans, which we
definitely *can't* do with our current executor model, I'm just saying
that if there is no stealable work left, matching must be finished.
If that sounds expensive, one thought is that it also provides a tidy
solution to my old conundrum "how to do batch-oriented outer scan with
the timing and control needed to drive software memory prefetching",
ie without falling down the executor volcano by calling ExecProcNode()
between prefetches; I strongly suspect that would provide so much
speedup that it could easily pay for the cost of the extra steps,
again assuming we can make the uncontended case blindingly fast.
Maybe, IDK, I could be missing things...

Anyway, that was a long explanation for why I didn't come up with an
easy patch to teach PBHS not to destroy I/O combining opportunities
for v18 :-)  The short version is that 0001 does address the immediate
problem we identified, but also helped me see the bigger picture a
little bit more clearly and find a few interesting connections.
Parallel query and I/O streaming are just not the best of friends,
yet.  R&D needed.
From 836c406337edda3a7e440d020d7d5a229f7e1099 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 18 Mar 2025 22:27:36 +1300
Subject: [PATCH 1/2] Teach Parallel Bitmap Heap Scan to combine I/O.

As 56788d21 did for parallel sequential scans, make TIDBitmap preserve
locality when giving block numbers to parallel workers, instead of
generating a stream of little islands with gaps in between.

Now it will fetch at least 8 block numbers from the shared bitmap at
once and then more if required keep opportunities to combine I/O.  The
exception is at the start of the scan, following a common pattern for
such things.

XXX Not done: ramp down at the end of the scan, for fair parallel work
allocation.

Even when no I/O is involved, it didn't seem ideal to hand out single
blocks under an exclusive lock.

XXX But 8 was chosen arbitrarily.

Discussion: https://postgr.es/m/CAAKRu_YhYy6Te5ZNiijtayr6mM-7O0B3UXm7mFOCN8U%3DiFgoxg%40mail.gmail.com
---
 src/backend/nodes/tidbitmap.c | 228 ++++++++++++++++++++++++----------
 1 file changed, 161 insertions(+), 67 deletions(-)

diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c
index 41031aa8f2f..d9e6a957de4 100644
--- a/src/backend/nodes/tidbitmap.c
+++ b/src/backend/nodes/tidbitmap.c
@@ -44,6 +44,7 @@
 #include "common/int.h"
 #include "nodes/bitmapset.h"
 #include "nodes/tidbitmap.h"
+#include "storage/bufmgr.h"
 #include "storage/lwlock.h"
 #include "utils/dsa.h"
 
@@ -74,6 +75,14 @@
 /* number of active words for a lossy chunk: */
 #define WORDS_PER_CHUNK  ((PAGES_PER_CHUNK - 1) / BITS_PER_BITMAPWORD + 1)
 
+/*
+ * The number of blocks to fetch at once when iterating through a shared
+ * bitmap in a parallel query to reduce lock contention.  The number actually
+ * fetched may be higher if trailing sequential neighbors are included to
+ * encourage I/O combining.
+ */
+#define TBM_SHARED_FETCH 8
+
 /*
  * The hashtable entries are represented by this data structure.  For
  * an exact page, blockno is the page number and bit k of the bitmap
@@ -212,6 +221,12 @@ struct TBMSharedIterator
 	PTEntryArray *ptbase;		/* pagetable element array */
 	PTIterationArray *ptpages;	/* sorted exact page index list */
 	PTIterationArray *ptchunks; /* sorted lossy page index list */
+
+	/* Results fetched from shared memory in bulk, ready to be consumed. */
+	int			result_index;
+	int			result_count;
+	int			result_capacity;
+	TBMIterateResult results[FLEXIBLE_ARRAY_MEMBER];
 };
 
 /* Local function prototypes */
@@ -1046,6 +1061,34 @@ tbm_private_iterate(TBMPrivateIterator *iterator, TBMIterateResult *tbmres)
 	return false;
 }
 
+/*
+ * Decide whether to claim a candidate block for the current batch or stop
+ * here without including it.
+ */
+static inline bool
+tbm_shared_iterate_include_block_p(TBMSharedIterator *iterator,
+								   BlockNumber candidate_blockno,
+								   BlockNumber *next_blockno,
+								   int *seq_count)
+{
+	/* Track the length of the current sequential cluster of blocks. */
+	if (candidate_blockno == *next_blockno)
+		*seq_count += 1;
+	else
+		*seq_count = 1;
+	*next_blockno = candidate_blockno + 1;
+
+	/* If we haven't reached the target batch size yet, then definitely. */
+	if (iterator->result_count < TBM_SHARED_FETCH)
+		return true;
+
+	/*
+	 * We're prepared to go bigger, but only to avoid an arbitrary break in a
+	 * potential combined I/O.
+	 */
+	return *seq_count > 1 && *seq_count <= io_combine_limit;
+}
+
 /*
  *	tbm_shared_iterate - scan through next page of a TIDBitmap
  *
@@ -1056,87 +1099,128 @@ tbm_private_iterate(TBMPrivateIterator *iterator, TBMIterateResult *tbmres)
 bool
 tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres)
 {
-	TBMSharedIteratorState *istate = iterator->state;
-	PagetableEntry *ptbase = NULL;
-	int		   *idxpages = NULL;
-	int		   *idxchunks = NULL;
-
-	if (iterator->ptbase != NULL)
-		ptbase = iterator->ptbase->ptentry;
-	if (iterator->ptpages != NULL)
-		idxpages = iterator->ptpages->index;
-	if (iterator->ptchunks != NULL)
-		idxchunks = iterator->ptchunks->index;
+	TBMSharedIteratorState *istate;
+	TBMIterateResult *result;
+	PagetableEntry *ptbase;
+	int		   *idxpages;
+	int		   *idxchunks;
+	int			max_results;
+	int			seq_count;
+	BlockNumber next_blockno;
+
+	/* Return results fetched earlier until they run out. */
+	if (iterator->result_index < iterator->result_count)
+	{
+		*tbmres = iterator->results[iterator->result_index++];
+		return true;
+	}
+
+	/* Prepare to fetch a new batch of results from shared memory. */
+	istate = iterator->state;
+	ptbase = iterator->ptbase->ptentry;
+	idxpages = iterator->ptpages ? iterator->ptpages->index : NULL;
+	idxchunks = iterator->ptchunks ? iterator->ptchunks->index : NULL;
+
+	/* Ramp up from size 1 in case there is a small LIMIT. */
+	if (iterator->result_count == 0)
+		max_results = 1;		/* first time caller */
+	else if (iterator->result_count < iterator->result_capacity / 2)
+		max_results = iterator->result_count * 2;	/* ramp up */
+	else
+		max_results = iterator->result_capacity;	/* saturated */
+
+	/* Write first result in caller's object directly. */
+	result = tbmres;
+	iterator->result_index = 1;
+	iterator->result_count = 0;
+	next_blockno = InvalidBlockNumber;
+	seq_count = 0;
 
 	/* Acquire the LWLock before accessing the shared members */
 	LWLockAcquire(&istate->lock, LW_EXCLUSIVE);
 
-	/*
-	 * If lossy chunk pages remain, make sure we've advanced schunkptr/
-	 * schunkbit to the next set bit.
-	 */
-	while (istate->schunkptr < istate->nchunks)
+	do
 	{
-		PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
-		int			schunkbit = istate->schunkbit;
-
-		tbm_advance_schunkbit(chunk, &schunkbit);
-		if (schunkbit < PAGES_PER_CHUNK)
+		/*
+		 * If lossy chunk pages remain, make sure we've advanced schunkptr/
+		 * schunkbit to the next set bit.
+		 */
+		while (istate->schunkptr < istate->nchunks)
 		{
-			istate->schunkbit = schunkbit;
-			break;
-		}
-		/* advance to next chunk */
-		istate->schunkptr++;
-		istate->schunkbit = 0;
-	}
+			PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+			int			schunkbit = istate->schunkbit;
 
-	/*
-	 * If both chunk and per-page data remain, must output the numerically
-	 * earlier page.
-	 */
-	if (istate->schunkptr < istate->nchunks)
-	{
-		PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
-		BlockNumber chunk_blockno;
-
-		chunk_blockno = chunk->blockno + istate->schunkbit;
+			tbm_advance_schunkbit(chunk, &schunkbit);
+			if (schunkbit < PAGES_PER_CHUNK)
+			{
+				istate->schunkbit = schunkbit;
+				break;
+			}
+			/* advance to next chunk */
+			istate->schunkptr++;
+			istate->schunkbit = 0;
+		}
 
-		if (istate->spageptr >= istate->npages ||
-			chunk_blockno < ptbase[idxpages[istate->spageptr]].blockno)
+		/*
+		 * If both chunk and per-page data remain, must output the numerically
+		 * earlier page.
+		 */
+		if (istate->schunkptr < istate->nchunks)
 		{
-			/* Return a lossy page indicator from the chunk */
-			tbmres->blockno = chunk_blockno;
-			tbmres->lossy = true;
-			tbmres->recheck = true;
-			tbmres->internal_page = NULL;
-			istate->schunkbit++;
+			PagetableEntry *chunk = &ptbase[idxchunks[istate->schunkptr]];
+			BlockNumber chunk_blockno;
 
-			LWLockRelease(&istate->lock);
-			return true;
-		}
-	}
-
-	if (istate->spageptr < istate->npages)
-	{
-		PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+			chunk_blockno = chunk->blockno + istate->schunkbit;
 
-		tbmres->internal_page = page;
-		tbmres->blockno = page->blockno;
-		tbmres->lossy = false;
-		tbmres->recheck = page->recheck;
-		istate->spageptr++;
+			if (istate->spageptr >= istate->npages ||
+				chunk_blockno < ptbase[idxpages[istate->spageptr]].blockno)
+			{
+				if (!tbm_shared_iterate_include_block_p(iterator,
+														chunk_blockno,
+														&next_blockno,
+														&seq_count))
+					break;
+
+				/* Return a lossy page indicator from the chunk */
+				result->blockno = chunk_blockno;
+				result->lossy = true;
+				result->recheck = true;
+				result->internal_page = NULL;
+				istate->schunkbit++;
+				result = &iterator->results[++iterator->result_count];
+				continue;
+			}
+		}
 
-		LWLockRelease(&istate->lock);
+		if (istate->spageptr < istate->npages)
+		{
+			PagetableEntry *page = &ptbase[idxpages[istate->spageptr]];
+
+			if (!tbm_shared_iterate_include_block_p(iterator,
+													page->blockno,
+													&next_blockno,
+													&seq_count))
+				break;
+
+			result->internal_page = page;
+			result->blockno = page->blockno;
+			result->lossy = false;
+			result->recheck = page->recheck;
+			istate->spageptr++;
+			result = &iterator->results[++iterator->result_count];
+			continue;
+		}
 
-		return true;
+		/* Nothing more in the bitmap. */
+		Assert(istate->spageptr == istate->npages);
+		Assert(istate->schunkptr == istate->nchunks);
+		break;
 	}
+	while (iterator->result_count < max_results);
 
 	LWLockRelease(&istate->lock);
 
-	/* Nothing more in the bitmap */
-	tbmres->blockno = InvalidBlockNumber;
-	return false;
+	return iterator->result_count > 0;
 }
 
 /*
@@ -1466,12 +1550,22 @@ tbm_attach_shared_iterate(dsa_area *dsa, dsa_pointer dp)
 {
 	TBMSharedIterator *iterator;
 	TBMSharedIteratorState *istate;
+	int			result_capacity;
 
 	/*
-	 * Create the TBMSharedIterator struct, with enough trailing space to
-	 * serve the needs of the TBMIterateResult sub-struct.
+	 * How many blocks to fetch from the shared bitmap at a time, allowing
+	 * extra space for trailing sequential blocks.
+	 */
+	result_capacity = TBM_SHARED_FETCH + io_combine_limit - 1;
+
+	/*
+	 * Create the TBMSharedIterator struct, with enough trailing space for the
+	 * above.
 	 */
-	iterator = (TBMSharedIterator *) palloc0(sizeof(TBMSharedIterator));
+	iterator = (TBMSharedIterator *)
+		palloc0(offsetof(TBMSharedIterator, results) +
+				sizeof(iterator->results[0]) * result_capacity);
+	iterator->result_capacity = result_capacity;
 
 	istate = (TBMSharedIteratorState *) dsa_get_address(dsa, dp);
 
-- 
2.39.5

From 7f4cc9ae6a2c7d2363b09132f523a18414d9eb41 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 8 Apr 2025 00:50:18 +1200
Subject: [PATCH 2/2] XXX attempt at rampdown in PBHS

Might have a few fundamental issues...
---
 src/backend/executor/nodeBitmapHeapscan.c |   7 +-
 src/backend/nodes/tidbitmap.c             | 132 +++++++++++++++++++++-
 src/include/nodes/tidbitmap.h             |   2 +-
 3 files changed, 136 insertions(+), 5 deletions(-)

diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index bf24f3d7fe0..f594ebc1a34 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -75,6 +75,8 @@ BitmapTableScanSetup(BitmapHeapScanState *node)
 	}
 	else if (BitmapShouldInitializeSharedState(pstate))
 	{
+		int backends;
+
 		/*
 		 * The leader will immediately come out of the function, but others
 		 * will be blocked until leader populates the TBM and wakes them up.
@@ -83,12 +85,15 @@ BitmapTableScanSetup(BitmapHeapScanState *node)
 		if (!node->tbm || !IsA(node->tbm, TIDBitmap))
 			elog(ERROR, "unrecognized result from subplan");
 
+		/* How many backends are expected to iterate over the TBM? */
+		backends = node->ss.ps.state->es_parallel_workers_launched;
+
 		/*
 		 * Prepare to iterate over the TBM. This will return the dsa_pointer
 		 * of the iterator state which will be used by multiple processes to
 		 * iterate jointly.
 		 */
-		pstate->tbmiterator = tbm_prepare_shared_iterate(node->tbm);
+		pstate->tbmiterator = tbm_prepare_shared_iterate(node->tbm, backends);
 
 		/* We have initialized the shared state so wake up others. */
 		BitmapDoneInitializingSharedState(pstate);
diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c
index d9e6a957de4..39a71648db9 100644
--- a/src/backend/nodes/tidbitmap.c
+++ b/src/backend/nodes/tidbitmap.c
@@ -44,6 +44,7 @@
 #include "common/int.h"
 #include "nodes/bitmapset.h"
 #include "nodes/tidbitmap.h"
+#include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
 #include "storage/lwlock.h"
 #include "utils/dsa.h"
@@ -78,8 +79,8 @@
 /*
  * The number of blocks to fetch at once when iterating through a shared
  * bitmap in a parallel query to reduce lock contention.  The number actually
- * fetched may be higher if trailing sequential neighbors are included to
- * encourage I/O combining.
+ * fetched may be lower during ramp-up/ramp-down, or higher if trailing
+ * sequential neighbors are included to encourage I/O combining.
  */
 #define TBM_SHARED_FETCH 8
 
@@ -200,6 +201,12 @@ typedef struct TBMSharedIteratorState
 	int			spageptr;		/* next spages index */
 	int			schunkptr;		/* next schunks index */
 	int			schunkbit;		/* next bit to check in current schunk */
+
+	int			backends;		/* expected number of backends */
+	int			log2_backends;	/* log2 for fast approximate division */
+	int			schunkpages;	/* number of pages left, -1 = not yet known */
+	int			schunkcounted;	/* index of the highest page counted */
+	int			schunksubtotal; /* counted so far */
 } TBMSharedIteratorState;
 
 /*
@@ -766,7 +773,7 @@ tbm_begin_private_iterate(TIDBitmap *tbm)
  * into pagetable array.
  */
 dsa_pointer
-tbm_prepare_shared_iterate(TIDBitmap *tbm)
+tbm_prepare_shared_iterate(TIDBitmap *tbm, int backends)
 {
 	dsa_pointer dp;
 	TBMSharedIteratorState *istate;
@@ -900,6 +907,13 @@ tbm_prepare_shared_iterate(TIDBitmap *tbm)
 	istate->schunkptr = 0;
 	istate->spageptr = 0;
 
+	/* Initialize the ramp-down control state */
+	istate->backends = backends;
+	istate->log2_backends = pg_ceil_log2_32(backends);
+	istate->schunkpages = -1;
+	istate->schunksubtotal = 0;
+	istate->schunkcounted = istate->nchunks;
+
 	tbm->iterating = TBM_ITERATING_SHARED;
 
 	return dp;
@@ -1089,6 +1103,100 @@ tbm_shared_iterate_include_block_p(TBMSharedIterator *iterator,
 	return *seq_count > 1 && *seq_count <= io_combine_limit;
 }
 
+/*
+ * Count pages in one more tail chunk, until we have enough information to
+ * implement fair ramp-down.  How many it takes depends on their density and
+ * how many backends there are to ramp down.  Returns true when work is done.
+ */
+static bool
+tbm_shared_iterate_popcount(TBMSharedIterator *iterator)
+{
+	TBMSharedIteratorState *istate = iterator->state;
+	PagetableEntry *ptbase = iterator->ptbase->ptentry;
+	int		   *idxchunks;
+	int			ramp_down_pages;
+	int			exact_pages;
+
+	if (iterator->ptchunks == NULL)
+	{
+		istate->schunkpages = 0;	/* no chunks */
+		return true;
+	}
+	idxchunks = iterator->ptchunks->index;
+
+	Assert(LWLockHeldByMeInMode(&istate->lock, LW_EXCLUSIVE));
+	Assert(istate->schunkpages == -1);
+	Assert(istate->schunkbit == 0);
+
+	/* The number of pages needed to ramp down by halving. */
+	ramp_down_pages = pg_nextpower2_32(iterator->result_capacity) - 1;
+	ramp_down_pages *= 2;		/* double counting */
+	ramp_down_pages *= 1 << istate->log2_backends;	/* partial scan */
+
+	/* We always know the number of exact pages left to scan. */
+	exact_pages = istate->npages - istate->spageptr;
+
+	if (exact_pages >= ramp_down_pages ||
+		istate->schunkptr == istate->schunkcounted)
+	{
+		/*
+		 * If the remaining exact pages are already enough, or the scan has
+		 * caught up with the counting (low memory limit), we stop counting.
+		 * Note that we pessimistically assumed that every page might appear
+		 * in a chunk *and* a page entry.  The ramp-down will start too soon
+		 * and happen more gently as they get deduplicated by the scan if
+		 * that's really true, but that's OK.
+		 */
+		istate->schunkpages = istate->schunksubtotal;
+	}
+	else
+	{
+		int			chunk;
+
+		/* Count the pages in one more chunk. */
+		chunk = --istate->schunkcounted;
+		istate->schunksubtotal +=
+			pg_popcount((char *) ptbase[idxchunks[chunk]].words,
+						sizeof(ptbase[idxchunks[chunk]].words));
+
+		/* Have we counted enough pages yet? */
+		if (exact_pages + istate->schunksubtotal >= ramp_down_pages)
+			istate->schunkpages = istate->schunksubtotal;
+	}
+
+	if (istate->schunkpages != -1)
+		elog(DEBUG1, "tidbitmap counted %d bits in %d tail chunks",
+			 istate->schunkpages, istate->nchunks - istate->schunkcounted);
+
+	return istate->schunkpages != -1;
+}
+
+/*
+ * For two backends we want approximately ..., 4, 4, 2, 2, 1, 1 as we approach
+ * the end of the scan.  It doesn't matter what order they are really claimed
+ * in as the unfairness risk of being wrong decreases as we approach one.
+ */
+static inline BlockNumber
+tbm_shared_iterate_ramp_down(TBMSharedIteratorState *istate, int max_results)
+{
+	BlockNumber pages;
+	BlockNumber partial_pages;
+	BlockNumber clamp;
+
+	Assert(istate->schunkpages >= 0);
+	pages = (istate->npages - istate->spageptr) + istate->schunkpages;
+	partial_pages = pages >> istate->log2_backends;
+	clamp = partial_pages / 2;
+	if (clamp < 1)
+		clamp = 1;
+
+	if (max_results > clamp)
+		elog(DEBUG1, "pages = %d, partial_pages = %d, log2_backends = %d, clamped %d->%d", pages, partial_pages, istate->log2_backends, max_results, clamp);
+	Assert(pages > 0);
+
+	return Min(max_results, clamp);
+}
+
 /*
  *	tbm_shared_iterate - scan through next page of a TIDBitmap
  *
@@ -1139,6 +1247,13 @@ tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres)
 	/* Acquire the LWLock before accessing the shared members */
 	LWLockAcquire(&istate->lock, LW_EXCLUSIVE);
 
+	/*
+	 * Ramp down at the end of scan for fair block distribution if we have
+	 * enough information.  Otherwise see below.
+	 */
+	if (istate->schunkpages != -1)
+		max_results = tbm_shared_iterate_ramp_down(istate, max_results);
+
 	do
 	{
 		/*
@@ -1161,6 +1276,14 @@ tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres)
 			istate->schunkbit = 0;
 		}
 
+		/*
+		 * When starting a new chunk, also popcount one more tail chunk, if we
+		 * haven't counted enough to implement fair ramp-down yet.
+		 */
+		if (istate->schunkbit == 0 && istate->schunkpages == -1)
+			if (tbm_shared_iterate_popcount(iterator))
+				max_results = tbm_shared_iterate_ramp_down(istate, max_results);
+
 		/*
 		 * If both chunk and per-page data remain, must output the numerically
 		 * earlier page.
@@ -1187,6 +1310,8 @@ tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres)
 				result->recheck = true;
 				result->internal_page = NULL;
 				istate->schunkbit++;
+				if (istate->schunkpages > 0)
+					istate->schunkpages--;
 				result = &iterator->results[++iterator->result_count];
 				continue;
 			}
@@ -1214,6 +1339,7 @@ tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres)
 		/* Nothing more in the bitmap. */
 		Assert(istate->spageptr == istate->npages);
 		Assert(istate->schunkptr == istate->nchunks);
+		Assert(istate->schunkpages == 0);
 		break;
 	}
 	while (iterator->result_count < max_results);
diff --git a/src/include/nodes/tidbitmap.h b/src/include/nodes/tidbitmap.h
index 99f795ceab5..8ec07cda5c1 100644
--- a/src/include/nodes/tidbitmap.h
+++ b/src/include/nodes/tidbitmap.h
@@ -100,7 +100,7 @@ extern int	tbm_extract_page_tuple(TBMIterateResult *iteritem,
 extern bool tbm_is_empty(const TIDBitmap *tbm);
 
 extern TBMPrivateIterator *tbm_begin_private_iterate(TIDBitmap *tbm);
-extern dsa_pointer tbm_prepare_shared_iterate(TIDBitmap *tbm);
+extern dsa_pointer tbm_prepare_shared_iterate(TIDBitmap *tbm, int backends);
 extern bool tbm_private_iterate(TBMPrivateIterator *iterator, TBMIterateResult *tbmres);
 extern bool tbm_shared_iterate(TBMSharedIterator *iterator, TBMIterateResult *tbmres);
 extern void tbm_end_private_iterate(TBMPrivateIterator *iterator);
-- 
2.39.5

Reply via email to