On 11/28/23 16:39, Matthias van de Meent wrote:
> On Thu, 23 Nov 2023 at 14:35, Tomas Vondra
> <tomas.von...@enterprisedb.com> wrote:
>> On 11/23/23 13:33, Matthias van de Meent wrote:
>>> The union operator may leak (lots of) memory, so I think it makes
>>> sense to keep a context around that can be reset after we've extracted
>>> the merge result.
>>>
>>
>> But does the current code actually achieve that? It does create a "brin
>> union" context, but then it only does this:
>>
>> /* Use our own memory context to avoid retail pfree */
>> cxt = AllocSetContextCreate(CurrentMemoryContext,
>> "brin union",
>> ALLOCSET_DEFAULT_SIZES);
>> oldcxt = MemoryContextSwitchTo(cxt);
>> db = brin_deform_tuple(bdesc, b, NULL);
>> MemoryContextSwitchTo(oldcxt);
>>
>> Surely that does not limit the amount of memory used by the actual union
>> functions in any way?
>
> Oh, yes, of course. For some reason I thought that covered the calls
> to the union operator function too, but it indeed only covers
> deserialization. I do think it is still worthwhile to not do the
> create/delete cycle, but won't hold the patch back for that.
>
I think the union_tuples() changes are better left for a separate patch.
>>>> However, I don't think the number of union_tuples calls is likely to be
>>>> very high, especially for large tables. Because we split the table into
>>>> 2048 chunks, and then cap the chunk size by 8192. For large tables
>>>> (where this matters) we're likely close to 8192.
>>>
>>> I agree that the merging part of the index creation is the last part,
>>> and usually has no high impact on the total performance of the reindex
>>> operation, but in memory-constrained environments releasing and then
>>> requesting the same chunk of memory over and over again just isn't
>>> great.
>>
>> OK, I'll take a look at the scratch context you suggested.
>>
>> My point however was we won't actually do that very often, because on
>> large tables the BRIN ranges are likely smaller than the parallel scan
>> chunk size, so few overlaps. OTOH if the table is small, or if the BRIN
>> ranges are large, there'll be few of them.
>
> That's true, so maybe I'm concerned about something that amounts to
> only marginal gains.
>
However, after thinking about this a bit more, I think we actually do
need to do something about the memory management when merging tuples.
AFAIK the general assumption was that union_tuple() only runs for a
single range, and then the whole context gets freed. But the way the
merging was implemented, it all runs in a single context. And while a
single union_tuple() may not need a lot memory, in total it may be
annoying. I just added a palloc(1MB) into union_tuples and ended up with
~160MB allocated in the PortalContext on just 2GB table. In practice the
memory will grow more slowly, but not great :-/
The attached 0003 patch adds a memory context that's reset after
producing a merged BRIN tuple for each page range.
> I noticed that the v4 patch doesn't yet update the documentation in
> indexam.sgml with am->amcanbuildparallel.
Should be fixed by 0002. I decided to add a simple note to ambuild(),
not sure if something more is needed.
> Once that is included and reviewed I think this will be ready, unless
> you want to address any of my comments upthread (that I marked with
> 'not in this patch') in this patch.
>
Thanks. I believe the attached version addresses it. There's also 0004
with some indentation tweaks per pgindent.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 219a451250cbbbc1235c60f4776c2ee6bc84432d Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 17:04:28 +0100
Subject: [PATCH v20231128 1/4] parallel CREATE INDEX for BRIN v3
---
contrib/bloom/blutils.c | 1 +
src/backend/access/brin/brin.c | 866 +++++++++++++++++-
src/backend/access/gin/ginutil.c | 1 +
src/backend/access/gist/gist.c | 1 +
src/backend/access/hash/hash.c | 1 +
src/backend/access/nbtree/nbtree.c | 1 +
src/backend/access/spgist/spgutils.c | 1 +
src/backend/access/transam/parallel.c | 4 +
src/backend/catalog/index.c | 2 +-
src/backend/utils/sort/tuplesortvariants.c | 207 +++++
src/include/access/amapi.h | 2 +
src/include/access/brin.h | 3 +
src/include/utils/tuplesort.h | 11 +
.../modules/dummy_index_am/dummy_index_am.c | 1 +
14 files changed, 1096 insertions(+), 6 deletions(-)
diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 4830cb3fee6..a781c5d98d6 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -122,6 +122,7 @@ blhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = false;
amroutine->amparallelvacuumoptions =
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 4f2dfdd17b9..0c6ca1ac18c 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -33,6 +33,7 @@
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
+#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/datum.h"
@@ -40,7 +41,118 @@
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
#include "utils/rel.h"
+#include "utils/tuplesort.h"
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
+#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
+#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
+
+/*
+ * Status record for spooling/sorting phase.
+ */
+typedef struct BrinSpool
+{
+ Tuplesortstate *sortstate; /* state data for tuplesort.c */
+ Relation heap;
+ Relation index;
+} BrinSpool;
+
+/*
+ * Status for index builds performed in parallel. This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+{
+ /*
+ * These fields are not modified during the build. They primarily exist for
+ * the benefit of worker processes that need to create state corresponding
+ * to that used by the leader.
+ */
+ Oid heaprelid;
+ Oid indexrelid;
+ bool isconcurrent;
+ BlockNumber pagesPerRange;
+ int scantuplesortstates;
+
+ /*
+ * workersdonecv is used to monitor the progress of workers. All parallel
+ * participants must indicate that they are done before leader can use
+ * results built by the workers (and before leader can write the data into
+ * the index).
+ */
+ ConditionVariable workersdonecv;
+
+ /*
+ * mutex protects all fields before heapdesc.
+ *
+ * These fields contain status information of interest to BRIN index
+ * builds that must work just the same when an index is built in parallel.
+ */
+ slock_t mutex;
+
+ /*
+ * Mutable state that is maintained by workers, and reported back to
+ * leader at end of the scans.
+ *
+ * nparticipantsdone is number of worker processes finished.
+ *
+ * reltuples is the total number of input heap tuples.
+ *
+ * indtuples is the total number of tuples that made it into the index.
+ */
+ int nparticipantsdone;
+ double reltuples;
+ double indtuples;
+
+ /*
+ * ParallelTableScanDescData data follows. Can't directly embed here, as
+ * implementations of the parallel table scan desc interface might need
+ * stronger alignment.
+ */
+} BrinShared;
+
+/*
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+ (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+{
+ /* parallel context itself */
+ ParallelContext *pcxt;
+
+ /*
+ * nparticipanttuplesorts is the exact number of worker processes
+ * successfully launched, plus one leader process if it participates as a
+ * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+ * participating as a worker).
+ */
+ int nparticipanttuplesorts;
+
+ /*
+ * Leader process convenience pointers to shared state (leader avoids TOC
+ * lookups).
+ *
+ * brinshared is the shared state for entire build. sharedsort is the
+ * shared, tuplesort-managed state passed to each process tuplesort.
+ * snapshot is the snapshot used by the scan iff an MVCC snapshot is required.
+ */
+ BrinShared *brinshared;
+ Sharedsort *sharedsort;
+ Snapshot snapshot;
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+} BrinLeader;
/*
* We use a BrinBuildState during initial construction of a BRIN index.
@@ -49,13 +161,23 @@
typedef struct BrinBuildState
{
Relation bs_irel;
- int bs_numtuples;
+ double bs_numtuples;
+ double bs_reltuples;
Buffer bs_currentInsertBuf;
BlockNumber bs_pagesPerRange;
BlockNumber bs_currRangeStart;
BrinRevmap *bs_rmAccess;
BrinDesc *bs_bdesc;
BrinMemTuple *bs_dtuple;
+
+ /*
+ * bs_leader is only present when a parallel index build is performed, and
+ * only in the leader process. (Actually, only the leader process has a
+ * BrinBuildState.)
+ */
+ BrinLeader *bs_leader;
+ int bs_worker_id;
+ BrinSpool *bs_spool;
} BrinBuildState;
/*
@@ -88,6 +210,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
bool include_partial, double *numSummarized, double *numExisting);
static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
BrinTuple *b);
static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -95,6 +218,20 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
BrinMemTuple *dtup, const Datum *values, const bool *nulls);
static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+ bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+ Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+ BrinSpool *brinspool,
+ BrinShared *brinshared,
+ Sharedsort *sharedsort,
+ Relation heap, Relation index,
+ int sortmem, bool progress);
+
/*
* BRIN handler function: return IndexAmRoutine with access method parameters
* and callbacks.
@@ -119,6 +256,7 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = true;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = true;
@@ -874,6 +1012,63 @@ brinbuildCallback(Relation index,
values, isnull);
}
+/*
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a shared tuplesort, and leave the insertion up to the leader (which may
+ * reorder them a bit etc.). The callback also does not generate empty ranges,
+ * those may be added by the leader when merging results from workers.
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *brstate)
+{
+ BrinBuildState *state = (BrinBuildState *) brstate;
+ BlockNumber thisblock;
+
+ thisblock = ItemPointerGetBlockNumber(tid);
+
+ /*
+ * If we're in a block that belongs to a future range, summarize what
+ * we've got and start afresh. Note the scan might have skipped many
+ * pages, if they were devoid of live tuples; we do not create emptry
+ * BRIN ranges here - the leader is responsible for filling them in.
+ */
+ if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+ {
+
+ BRIN_elog((DEBUG2,
+ "brinbuildCallback: completed a range: %u--%u",
+ state->bs_currRangeStart,
+ state->bs_currRangeStart + state->bs_pagesPerRange));
+
+ /* create the index tuple and write it into the tuplesort */
+ form_and_spill_tuple(state);
+
+ /*
+ * Set state to correspond to the next range (for this block).
+ *
+ * This skips ranges that are either empty (and so we don't get any
+ * tuples to summarize), or processes by other workers. We can't
+ * differentiate those cases here easily, so we leave it up to the
+ * leader to fill empty ranges where needed.
+ */
+ state->bs_currRangeStart
+ = state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
+
+ /* re-initialize state for it */
+ brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+ }
+
+ /* Accumulate the current tuple into the running state */
+ (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+ values, isnull);
+}
+
/*
* brinbuild() -- build a new BRIN index.
*/
@@ -935,18 +1130,89 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
revmap = brinRevmapInitialize(index, &pagesPerRange);
state = initialize_brin_buildstate(index, revmap, pagesPerRange);
+ state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+ state->bs_spool->heap = heap;
+ state->bs_spool->index = index;
+
+ /*
+ * Attempt to launch parallel worker scan when required
+ *
+ * XXX plan_create_index_workers makes the number of workers dependent on
+ * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+ * for btree, but not for BRIN, which can do away with much less memory.
+ * So maybe make that somehow less strict, optionally?
+ */
+ if (indexInfo->ii_ParallelWorkers > 0)
+ _brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+ indexInfo->ii_ParallelWorkers);
+
/*
* Now scan the relation. No syncscan allowed here because we want the
* heap blocks in physical order.
*/
- reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
- brinbuildCallback, (void *) state, NULL);
- /* process the final batch */
- form_and_insert_tuple(state);
+ /*
+ * If parallel build requested and at least one worker process was
+ * successfully launched, set up coordination state
+ */
+ if (state->bs_leader)
+ {
+ SortCoordinate coordinate;
+
+ coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = false;
+ coordinate->nParticipants =
+ state->bs_leader->nparticipanttuplesorts;
+ coordinate->sharedsort = state->bs_leader->sharedsort;
+
+
+ /*
+ * Begin serial/leader tuplesort.
+ *
+ * In cases where parallelism is involved, the leader receives the same
+ * share of maintenance_work_mem as a serial sort (it is generally treated
+ * in the same way as a serial sort once we return). Parallel worker
+ * Tuplesortstates will have received only a fraction of
+ * maintenance_work_mem, though.
+ *
+ * We rely on the lifetime of the Leader Tuplesortstate almost not
+ * overlapping with any worker Tuplesortstate's lifetime. There may be
+ * some small overlap, but that's okay because we rely on leader
+ * Tuplesortstate only allocating a small, fixed amount of memory here.
+ * When its tuplesort_performsort() is called (by our caller), and
+ * significant amounts of memory are likely to be used, all workers must
+ * have already freed almost all memory held by their Tuplesortstates
+ * (they are about to go away completely, too). The overall effect is
+ * that maintenance_work_mem always represents an absolute high watermark
+ * on the amount of memory used by a CREATE INDEX operation, regardless of
+ * the use of parallelism or any other factor.
+ */
+ state->bs_spool->sortstate =
+ tuplesort_begin_index_brin(heap, index,
+ maintenance_work_mem, coordinate,
+ TUPLESORT_NONE);
+
+ /*
+ * In parallel mode, wait for workers to complete, and then read all
+ * tuples from the shared tuplesort and insert them into the index.
+ */
+ _brin_end_parallel(state->bs_leader, state);
+ }
+ else /* no parallel index build, just do the usual thing */
+ {
+ reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+ brinbuildCallback, (void *) state, NULL);
+
+ /* process the final batch */
+ form_and_insert_tuple(state);
+
+ /* track the number of relation tuples */
+ state->bs_reltuples = reltuples;
+ }
/* release resources */
idxtuples = state->bs_numtuples;
+ reltuples = state->bs_reltuples;
brinRevmapTerminate(state->bs_rmAccess);
terminate_brin_buildstate(state);
@@ -1366,12 +1632,16 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
state->bs_irel = idxRel;
state->bs_numtuples = 0;
+ state->bs_reltuples = 0;
state->bs_currentInsertBuf = InvalidBuffer;
state->bs_pagesPerRange = pagesPerRange;
state->bs_currRangeStart = 0;
state->bs_rmAccess = revmap;
state->bs_bdesc = brin_build_desc(idxRel);
state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+ state->bs_leader = NULL;
+ state->bs_worker_id = 0;
+ state->bs_spool = NULL;
return state;
}
@@ -1663,6 +1933,32 @@ form_and_insert_tuple(BrinBuildState *state)
pfree(tup);
}
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a (shared) tuplesort (the leader will insert it
+ * into the index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+{
+ BrinTuple *tup;
+ Size size;
+
+ /* don't insert empty tuples in parallel build */
+ if (state->bs_dtuple->bt_empty_range)
+ return;
+
+ tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+ state->bs_dtuple, &size);
+
+ /* write the BRIN tuple to the tuplesort */
+ tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size);
+
+ state->bs_numtuples++;
+
+ pfree(tup);
+}
+
/*
* Given two deformed tuples, adjust the first one so that it's consistent
* with the summary values in both.
@@ -1982,3 +2278,563 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
return true;
}
+
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+ bool isconcurrent, int request)
+{
+ ParallelContext *pcxt;
+ int scantuplesortstates;
+ Snapshot snapshot;
+ Size estbrinshared;
+ Size estsort;
+ BrinShared *brinshared;
+ Sharedsort *sharedsort;
+ BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+ bool leaderparticipates = true;
+ int querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+ leaderparticipates = false;
+#endif
+
+ /*
+ * Enter parallel mode, and create context for parallel build of brin
+ * index
+ */
+ EnterParallelMode();
+ Assert(request > 0);
+ pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+ request);
+
+ scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we use
+ * SnapshotAny because we must retrieve all tuples and do our own time
+ * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+ * concurrent build, we take a regular MVCC snapshot and index whatever's
+ * live according to that.
+ */
+ if (!isconcurrent)
+ snapshot = SnapshotAny;
+ else
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ /*
+ * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+ */
+ estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+ estsort = tuplesort_estimate_shared(scantuplesortstates);
+ shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+
+ /*
+ * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+ * and PARALLEL_KEY_BUFFER_USAGE.
+ *
+ * If there are no extensions loaded that care, we could skip this. We
+ * have no way of knowing whether anyone's looking at pgWalUsage or
+ * pgBufferUsage, so do it unconditionally.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+ if (debug_query_string)
+ {
+ querylen = strlen(debug_query_string);
+ shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ else
+ querylen = 0; /* keep compiler quiet */
+
+ /* Everyone's had a chance to ask for space, so now create the DSM */
+ InitializeParallelDSM(pcxt);
+
+ /* If no DSM segment was available, back out (do serial build) */
+ if (pcxt->seg == NULL)
+ {
+ if (IsMVCCSnapshot(snapshot))
+ UnregisterSnapshot(snapshot);
+ DestroyParallelContext(pcxt);
+ ExitParallelMode();
+ return;
+ }
+
+ /* Store shared build state, for which we reserved space */
+ brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+ /* Initialize immutable state */
+ brinshared->heaprelid = RelationGetRelid(heap);
+ brinshared->indexrelid = RelationGetRelid(index);
+ brinshared->isconcurrent = isconcurrent;
+ brinshared->scantuplesortstates = scantuplesortstates;
+ brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+ ConditionVariableInit(&brinshared->workersdonecv);
+ SpinLockInit(&brinshared->mutex);
+
+ /* Initialize mutable state */
+ brinshared->nparticipantsdone = 0;
+ brinshared->reltuples = 0.0;
+ brinshared->indtuples = 0.0;
+
+ table_parallelscan_initialize(heap,
+ ParallelTableScanFromBrinShared(brinshared),
+ snapshot);
+
+ /*
+ * Store shared tuplesort-private state, for which we reserved space.
+ * Then, initialize opaque state using tuplesort routine.
+ */
+ sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+ tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+ pcxt->seg);
+
+ /*
+ * Store shared tuplesort-private state, for which we reserved space.
+ * Then, initialize opaque state using tuplesort routine.
+ */
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+ /* Store query string for workers */
+ if (debug_query_string)
+ {
+ char *sharedquery;
+
+ sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+ memcpy(sharedquery, debug_query_string, querylen + 1);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+ }
+
+ /*
+ * Allocate space for each worker's WalUsage and BufferUsage; no need to
+ * initialize.
+ */
+ walusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+ bufferusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+ /* Launch workers, saving status for leader/caller */
+ LaunchParallelWorkers(pcxt);
+ brinleader->pcxt = pcxt;
+ brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+ if (leaderparticipates)
+ brinleader->nparticipanttuplesorts++;
+ brinleader->brinshared = brinshared;
+ brinleader->sharedsort = sharedsort;
+ brinleader->snapshot = snapshot;
+ brinleader->walusage = walusage;
+ brinleader->bufferusage = bufferusage;
+
+ /* If no workers were successfully launched, back out (do serial build) */
+ if (pcxt->nworkers_launched == 0)
+ {
+ _brin_end_parallel(brinleader, NULL);
+ return;
+ }
+
+ /* Save leader state now that it's clear build will be parallel */
+ buildstate->bs_leader = brinleader;
+
+ /* Join heap scan ourselves */
+ if (leaderparticipates)
+ _brin_leader_participate_as_worker(buildstate, heap, index);
+
+ /*
+ * Caller needs to wait for all launched workers when we return. Make
+ * sure that the failure-to-start case will not hang forever.
+ */
+ WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+{
+ int i;
+ BrinTuple *btup;
+ BrinMemTuple *memtuple = NULL;
+ Size tuplen;
+ BrinShared *brinshared = brinleader->brinshared;
+ BlockNumber prevblkno = InvalidBlockNumber;
+ BrinTuple *emptyTuple = NULL;
+ Size emptySize;
+ BrinSpool *spool;
+
+ /* Shutdown worker processes */
+ WaitForParallelWorkersToFinish(brinleader->pcxt);
+
+ if (!state)
+ return;
+
+ /* copy the data into leader state (we have to wait for the workers ) */
+ state->bs_reltuples = brinshared->reltuples;
+ state->bs_numtuples = brinshared->indtuples;
+
+ /* do the actual sort in the leader */
+ spool = state->bs_spool;
+ tuplesort_performsort(spool->sortstate);
+
+ /*
+ * Initialize BrinMemTuple we'll use to union summaries from workers
+ * (in case they happened to produce parts of the same paga range).
+ */
+ memtuple = brin_new_memtuple(state->bs_bdesc);
+
+ /*
+ * Read the BRIN tuples from the shared tuplesort, sorted by block number.
+ * That probably gives us an index that is cheaper to scan, thanks to mostly
+ * getting data from the same index page as before.
+ */
+ while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
+ {
+ /* Ranges should be multiples of pages_per_range for the index. */
+ Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
+ /*
+ * Do we need to union summaries for the same page range?
+ *
+ * If this is the first brin tuple we read, then just deform it into
+ * the memtuple, and continue with the next one from tuplesort. We
+ * however may need to insert empty summaries into the index.
+ *
+ * If it's the same block as the last we saw, we simply union the
+ * brin tuple into it, and we're done - we don't even need to insert
+ * empty ranges, because that was done earlier when we saw the first
+ * brin tuple (for this range).
+ *
+ * Finally, if it's not the first brin tuple, and it's not the same
+ * page range, we need to do the insert and then deform the tuple
+ * into the memtuple. Then we'll insert empty ranges before the
+ * new brin tuple, if needed.
+ */
+ if (prevblkno == InvalidBlockNumber)
+ {
+ /* First brin tuples, just deform into memtuple. */
+ memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+ /* continue to insert empty pages before thisblock */
+ }
+ else if (memtuple->bt_blkno == btup->bt_blkno)
+ {
+ /*
+ * Not the first brin tuple, but same page range as the previous
+ * one, so we can merge it into the memtuple.
+ */
+ union_tuples(state->bs_bdesc, memtuple, btup);
+ continue;
+ }
+ else
+ {
+ BrinTuple *tmp;
+ Size len;
+
+ /*
+ * We got brin tuple for a different page range, so form a brin
+ * tuple from the memtuple, insert it, and re-init the memtuple
+ * from the new brin tuple.
+ */
+ tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+ memtuple, &len);
+
+ brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+ &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+ /* free the formed on-disk tuple */
+ pfree(tmp);
+
+ memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+ /* continue to insert empty pages before thisblock */
+ }
+
+ /* Fill empty ranges for all ranges missing in the tuplesort. */
+ prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+ while (prevblkno + state->bs_pagesPerRange < btup->bt_blkno)
+ {
+ /* the missing range */
+ prevblkno += state->bs_pagesPerRange;
+
+ /* Did we already build the empty range? If not, do it now. */
+ if (emptyTuple == NULL)
+ {
+ BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+ emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+ }
+ else
+ {
+ /* we already have am "empty range" tuple, just set the block */
+ emptyTuple->bt_blkno = prevblkno;
+ }
+
+ brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+ &state->bs_currentInsertBuf,
+ emptyTuple->bt_blkno, emptyTuple, emptySize);
+ }
+
+ prevblkno = btup->bt_blkno;
+ }
+
+ tuplesort_end(spool->sortstate);
+
+ /* Fill empty ranges for all ranges missing in the tuplesort. */
+ prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+ while (prevblkno + state->bs_pagesPerRange < memtuple->bt_blkno)
+ {
+ /* the missing range */
+ prevblkno += state->bs_pagesPerRange;
+
+ /* Did we already build the empty range? If not, do it now. */
+ if (emptyTuple == NULL)
+ {
+ BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+ emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+ }
+ else
+ {
+ /* we already have am "empty range" tuple, just set the block */
+ emptyTuple->bt_blkno = prevblkno;
+ }
+
+ brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+ &state->bs_currentInsertBuf,
+ emptyTuple->bt_blkno, emptyTuple, emptySize);
+ }
+
+ /**/
+ if (prevblkno != InvalidBlockNumber)
+ {
+ BrinTuple *tmp;
+ Size len;
+
+ tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+ memtuple, &len);
+
+ brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+ &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+ pfree(tmp);
+ }
+
+ /*
+ * Next, accumulate WAL usage. (This must wait for the workers to finish,
+ * or we might get incomplete data.)
+ */
+ for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+ /* Free last reference to MVCC snapshot, if one was used */
+ if (IsMVCCSnapshot(brinleader->snapshot))
+ UnregisterSnapshot(brinleader->snapshot);
+ DestroyParallelContext(brinleader->pcxt);
+ ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+ /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+ return add_size(BUFFERALIGN(sizeof(BrinShared)),
+ table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+{
+ BrinLeader *brinleader = buildstate->bs_leader;
+ int sortmem;
+
+ /* Allocate memory and initialize private spool */
+ buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+ buildstate->bs_spool->heap = buildstate->bs_spool->heap;
+ buildstate->bs_spool->index = buildstate->bs_spool->index;
+
+ /*
+ * Might as well use reliable figure when doling out maintenance_work_mem
+ * (when requested number of workers were not launched, this will be
+ * somewhat higher than it is for other workers).
+ */
+ sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
+
+ /* Perform work common to all participants */
+ _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared,
+ brinleader->sharedsort, heap, index, sortmem, true);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds). All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
+ BrinShared *brinshared, Sharedsort *sharedsort,
+ Relation heap, Relation index, int sortmem,
+ bool progress)
+{
+ SortCoordinate coordinate;
+ TableScanDesc scan;
+ double reltuples;
+ IndexInfo *indexInfo;
+
+ /* Initialize local tuplesort coordination state */
+ coordinate = palloc0(sizeof(SortCoordinateData));
+ coordinate->isWorker = true;
+ coordinate->nParticipants = -1;
+ coordinate->sharedsort = sharedsort;
+
+ /* Begin "partial" tuplesort */
+ brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap,
+ brinspool->index,
+ sortmem, coordinate,
+ TUPLESORT_NONE);
+
+ /* Join parallel scan */
+ indexInfo = BuildIndexInfo(index);
+ indexInfo->ii_Concurrent = brinshared->isconcurrent;
+
+ scan = table_beginscan_parallel(heap,
+ ParallelTableScanFromBrinShared(brinshared));
+
+ reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+ brinbuildCallbackParallel, state, scan);
+
+ /* insert the last item */
+ form_and_spill_tuple(state);
+
+ /* sort the BRIN ranges built by this worker */
+ tuplesort_performsort(brinspool->sortstate);
+
+ state->bs_reltuples += reltuples;
+
+ /*
+ * Done. Record ambuild statistics.
+ */
+ SpinLockAcquire(&brinshared->mutex);
+ brinshared->nparticipantsdone++;
+ brinshared->reltuples += state->bs_reltuples;
+ brinshared->indtuples += state->bs_numtuples;
+ SpinLockRelease(&brinshared->mutex);
+
+ /* Notify leader */
+ ConditionVariableSignal(&brinshared->workersdonecv);
+
+ tuplesort_end(brinspool->sortstate);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+ char *sharedquery;
+ BrinShared *brinshared;
+ Sharedsort *sharedsort;
+ BrinBuildState *buildstate;
+ Relation heapRel;
+ Relation indexRel;
+ LOCKMODE heapLockmode;
+ LOCKMODE indexLockmode;
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+ int sortmem;
+
+ /*
+ * The only possible status flag that can be set to the parallel worker is
+ * PROC_IN_SAFE_IC.
+ */
+ Assert((MyProc->statusFlags == 0) ||
+ (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+ /* Set debug_query_string for individual workers first */
+ sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+ debug_query_string = sharedquery;
+
+ /* Report the query string from leader */
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+ /* Look up brin shared state */
+ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+
+ /* Open relations using lock modes known to be obtained by index.c */
+ if (!brinshared->isconcurrent)
+ {
+ heapLockmode = ShareLock;
+ indexLockmode = AccessExclusiveLock;
+ }
+ else
+ {
+ heapLockmode = ShareUpdateExclusiveLock;
+ indexLockmode = RowExclusiveLock;
+ }
+
+ /* Open relations within worker */
+ heapRel = table_open(brinshared->heaprelid, heapLockmode);
+ indexRel = index_open(brinshared->indexrelid, indexLockmode);
+
+ buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+
+ /* Initialize worker's own spool */
+ buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+ buildstate->bs_spool->heap = heapRel;
+ buildstate->bs_spool->index = indexRel;
+
+ /* Look up shared state private to tuplesort.c */
+ sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+ tuplesort_attach_shared(sharedsort, seg);
+
+ /* Prepare to track buffer usage during parallel execution */
+ InstrStartParallelQuery();
+
+ /*
+ * Might as well use reliable figure when doling out maintenance_work_mem
+ * (when requested number of workers were not launched, this will be
+ * somewhat higher than it is for other workers).
+ */
+ sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
+
+ _brin_parallel_scan_and_build(buildstate, buildstate->bs_spool,
+ brinshared, sharedsort,
+ heapRel, indexRel, sortmem, false);
+
+ /* Report WAL/buffer usage during parallel execution */
+ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+ walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+ &walusage[ParallelWorkerNumber]);
+
+ index_close(indexRel, indexLockmode);
+ table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index a875c5d3d7a..9b1a0ac345d 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -54,6 +54,7 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = true;
amroutine->amsummarizing = false;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 9a1bf8f66cb..e052ba8bda2 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -76,6 +76,7 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 6443ff21bda..905519692c6 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -73,6 +73,7 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 0930f9b37e3..6c8cd93fa0a 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -112,6 +112,7 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = true;
+ amroutine->amcanbuildparallel = true;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 30c00876a56..fd4b6157101 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -60,6 +60,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 194a1207be6..d78314062e0 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/brin.h"
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
@@ -145,6 +146,9 @@ static const struct
{
"_bt_parallel_build_main", _bt_parallel_build_main
},
+ {
+ "_brin_parallel_build_main", _brin_parallel_build_main
+ },
{
"parallel_vacuum_main", parallel_vacuum_main
}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 143fae01ebd..40abbaf476b 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2982,7 +2982,7 @@ index_build(Relation heapRelation,
* Note that planner considers parallel safety for us.
*/
if (parallel && IsNormalProcessingMode() &&
- indexRelation->rd_rel->relam == BTREE_AM_OID)
+ indexRelation->rd_indam->amcanbuildparallel)
indexInfo->ii_ParallelWorkers =
plan_create_index_workers(RelationGetRelid(heapRelation),
RelationGetRelid(indexRelation));
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 2cd508e5130..9b3a70e6ccf 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -19,6 +19,7 @@
#include "postgres.h"
+#include "access/brin_tuple.h"
#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
@@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups,
int count);
static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups,
int count);
+static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups,
+ int count);
static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups,
int count);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
@@ -69,10 +72,16 @@ static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
+static int comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int len);
+static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b,
@@ -128,6 +137,16 @@ typedef struct
uint32 max_buckets;
} TuplesortIndexHashArg;
+/*
+ * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
+ */
+typedef struct
+{
+ TuplesortIndexArg index;
+
+ /* XXX do we need something here? */
+} TuplesortIndexBrinArg;
+
/*
* Data struture pointed by "TuplesortPublic.arg" for the Datum case.
* Set by tuplesort_begin_datum and used only by the DatumTuple routines.
@@ -140,6 +159,21 @@ typedef struct
int datumTypeLen;
} TuplesortDatumArg;
+/*
+ * Computing BrinTuple size with only the tuple is difficult, so we want to track
+ * the length referenced by the SortTuple. That's what BrinSortTuple is meant
+ * to do - it's essentially a BrinTuple prefixed by its length.
+ */
+typedef struct BrinSortTuple
+{
+ Size tuplen;
+ BrinTuple tuple;
+} BrinSortTuple;
+
+/* Size of the BrinSortTuple, given length of the BrinTuple. */
+#define BRINSORTTUPLE_SIZE(len) (offsetof(BrinSortTuple, tuple) + (len))
+
+
Tuplesortstate *
tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys, AttrNumber *attNums,
@@ -527,6 +561,47 @@ tuplesort_begin_index_gist(Relation heapRel,
return state;
}
+Tuplesortstate *
+tuplesort_begin_index_brin(Relation heapRel,
+ Relation indexRel,
+ int workMem,
+ SortCoordinate coordinate,
+ int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ MemoryContext oldcontext;
+ TuplesortIndexBrinArg *arg;
+
+ oldcontext = MemoryContextSwitchTo(base->maincontext);
+ arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin index sort: workMem = %d, randomAccess = %c",
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ base->nKeys = 1; /* Only one sort column, the block number */
+
+ base->removeabbrev = removeabbrev_index_brin;
+ base->comparetup = comparetup_index_brin;
+ base->writetup = writetup_index_brin;
+ base->readtup = readtup_index_brin;
+ base->haveDatum1 = true;
+ base->arg = arg;
+
+ arg->index.heapRel = heapRel;
+ arg->index.indexRel = indexRel;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
bool nullsFirstFlag, int workMem,
@@ -707,6 +782,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
!stup.isnull1);
}
+/*
+ * Collect one BRIN tuple while collecting input data for sort.
+ */
+void
+tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
+{
+ SortTuple stup;
+ BrinSortTuple *bstup;
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext);
+
+ /* allocate space for the whole BRIN sort tuple */
+ bstup = palloc(BRINSORTTUPLE_SIZE(size));
+
+ bstup->tuplen = size;
+ memcpy(&bstup->tuple, tuple, size);
+
+ stup.tuple = bstup;
+ stup.datum1 = tuple->bt_blkno;
+ stup.isnull1 = false;
+
+ tuplesort_puttuple_common(state, &stup,
+ base->sortKeys &&
+ base->sortKeys->abbrev_converter &&
+ !stup.isnull1);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Accept one Datum while collecting input data for sort.
*
@@ -850,6 +954,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward)
return (IndexTuple) stup.tuple;
}
+/*
+ * Fetch the next BRIN tuple in either forward or back direction.
+ * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+BrinTuple *
+tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext);
+ SortTuple stup;
+ BrinSortTuple *btup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ stup.tuple = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (!stup.tuple)
+ return NULL;
+
+ btup = (BrinSortTuple *) stup.tuple;
+
+ *len = btup->tuplen;
+
+ return &btup->tuple;
+}
+
/*
* Fetch the next Datum in either forward or back direction.
* Returns false if no more datums.
@@ -1564,6 +1697,80 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
+/*
+ * Routines specialized for BrinTuple case
+ */
+
+static void
+removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count)
+{
+ int i;
+
+ for (i = 0; i < count; i++)
+ {
+ BrinSortTuple *tuple;
+
+ tuple = stups[i].tuple;
+ stups[i].datum1 = tuple->tuple.bt_blkno;
+ }
+}
+
+static int
+comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ Assert(TuplesortstateGetPublic(state)->haveDatum1);
+
+ if (DatumGetUInt32(a->datum1) > DatumGetUInt32(b->datum1))
+ return 1;
+
+ if (DatumGetUInt32(a->datum1) < DatumGetUInt32(b->datum1))
+ return -1;
+
+ /* silence compilers */
+ return 0;
+}
+
+static void
+writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple;
+ unsigned int tuplen = tuple->tuplen;
+
+ tuplen = tuplen + sizeof(tuplen);
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen);
+ if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+}
+
+static void
+readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len)
+{
+ TuplesortPublic *base = TuplesortstateGetPublic(state);
+ unsigned int tuplen = len - sizeof(unsigned int);
+
+ /*
+ * Allocate space for the BRIN sort tuple, which is BrinTuple with an
+ * extra length field.
+ */
+ BrinSortTuple *tuple
+ = (BrinSortTuple *) tuplesort_readtup_alloc(state,
+ BRINSORTTUPLE_SIZE(tuplen));
+
+ tuple->tuplen = tuplen;
+
+ LogicalTapeReadExact(tape, &tuple->tuple, tuplen);
+ if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+ stup->tuple = (void *) tuple;
+
+ /* set up first-column key value, which is block number */
+ stup->datum1 = tuple->tuple.bt_blkno;
+}
+
/*
* Routines specialized for DatumTuple case
*/
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 244459587fc..df85ae3aace 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -243,6 +243,8 @@ typedef struct IndexAmRoutine
bool ampredlocks;
/* does AM support parallel scan? */
bool amcanparallel;
+ /* does AM support parallel build? */
+ bool amcanbuildparallel;
/* does AM support columns included with clause INCLUDE? */
bool amcaninclude;
/* does AM use maintenance_work_mem? */
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
#define BRIN_H
#include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
#include "utils/relcache.h"
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
extern void brinGetStats(Relation index, BrinStatsData *stats);
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
#endif /* BRIN_H */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 9ed2de76cd6..357eb35311d 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -21,6 +21,7 @@
#ifndef TUPLESORT_H
#define TUPLESORT_H
+#include "access/brin_tuple.h"
#include "access/itup.h"
#include "executor/tuptable.h"
#include "storage/dsm.h"
@@ -282,6 +283,9 @@ typedef struct
* The "index_hash" API is similar to index_btree, but the tuples are
* actually sorted by their hash codes not the raw data.
*
+ * The "index_brin" API is similar to index_btree, but the tuples are
+ * BrinTuple and are sorted by their block number not the raw data.
+ *
* Parallel sort callers are required to coordinate multiple tuplesort states
* in a leader process and one or more worker processes. The leader process
* must launch workers, and have each perform an independent "partial"
@@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
Relation indexRel,
int workMem, SortCoordinate coordinate,
int sortopt);
+extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel,
+ Relation indexRel,
+ int workMem, SortCoordinate coordinate,
+ int sortopt);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation,
bool nullsFirstFlag,
@@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup);
extern void tuplesort_putindextuplevalues(Tuplesortstate *state,
Relation rel, ItemPointer self,
const Datum *values, const bool *isnull);
+extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len);
extern void tuplesort_putdatum(Tuplesortstate *state, Datum val,
bool isNull);
@@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
bool copy, TupleTableSlot *slot, Datum *abbrev);
extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward);
extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward);
+extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len,
+ bool forward);
extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy,
Datum *val, bool *isNull, Datum *abbrev);
diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c
index cbdae7ab7a5..eaa0c483b7e 100644
--- a/src/test/modules/dummy_index_am/dummy_index_am.c
+++ b/src/test/modules/dummy_index_am/dummy_index_am.c
@@ -294,6 +294,7 @@ dihandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
+ amroutine->amcanbuildparallel = false;
amroutine->amcaninclude = false;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
--
2.42.0
From e3f42c711c0426b85edf3a88a538e58bc0588b6e Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 28 Nov 2023 18:37:42 +0100
Subject: [PATCH v20231128 2/4] add docs for amcanbuildparallel
---
doc/src/sgml/indexam.sgml | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index f107c43d6a6..cc4135e3940 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -123,6 +123,8 @@ typedef struct IndexAmRoutine
bool ampredlocks;
/* does AM support parallel scan? */
bool amcanparallel;
+ /* does AM support parallel build? */
+ bool amcanbuildparallel;
/* does AM support columns included with clause INCLUDE? */
bool amcaninclude;
/* does AM use maintenance_work_mem? */
@@ -286,6 +288,11 @@ ambuild (Relation heapRelation,
and compute the keys that need to be inserted into the index.
The function must return a palloc'd struct containing statistics about
the new index.
+ The <structfield>amcanbuildparallel</structfield> flag indicates whether
+ the access method supports parallel index builds. When set to <literal>true</literal>,
+ the system will attempt to allocate parallel workers for the build.
+ Access methods supporting only non-parallel index builds should leave
+ this flag set to <literal>false</literal>.
</para>
<para>
--
2.42.0
From 0b431ec837f00f852e28b7b5e8e59dbdf61af27f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 28 Nov 2023 18:38:32 +0100
Subject: [PATCH v20231128 3/4] use per-range memory context for merging in
leader
---
src/backend/access/brin/brin.c | 34 ++++++++++++++++++++++++++++++----
1 file changed, 30 insertions(+), 4 deletions(-)
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 0c6ca1ac18c..8d96d2ac9be 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2475,6 +2475,8 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
BrinTuple *emptyTuple = NULL;
Size emptySize;
BrinSpool *spool;
+ MemoryContext rangeCxt,
+ oldCxt;
/* Shutdown worker processes */
WaitForParallelWorkersToFinish(brinleader->pcxt);
@@ -2496,6 +2498,19 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
*/
memtuple = brin_new_memtuple(state->bs_bdesc);
+ /*
+ * Create a memory context we'll reset to combine results for a single
+ * page range (received from the workers). We don't expect huge number
+ * of overlaps under regular circumstances, because for large tables
+ * the chunk size is likely larger than the BRIN page range), but it
+ * can happen, and the union functions may do all kinds of stuff. So
+ * we better reset the context once in a while.
+ */
+ rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
+ "brin union",
+ ALLOCSET_DEFAULT_SIZES);
+ oldCxt = MemoryContextSwitchTo(rangeCxt);
+
/*
* Read the BRIN tuples from the shared tuplesort, sorted by block number.
* That probably gives us an index that is cheaper to scan, thanks to mostly
@@ -2555,8 +2570,12 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
&state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
- /* free the formed on-disk tuple */
- pfree(tmp);
+ /*
+ * Reset the per-output-range context. This frees all the memory
+ * possibly allocated by the union functions, and also the BRIN
+ * tuple we just formed and inserted.
+ */
+ MemoryContextReset(rangeCxt);
memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
@@ -2593,7 +2612,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
tuplesort_end(spool->sortstate);
- /* Fill empty ranges for all ranges missing in the tuplesort. */
+ /* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
while (prevblkno + state->bs_pagesPerRange < memtuple->bt_blkno)
{
@@ -2618,7 +2637,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
emptyTuple->bt_blkno, emptyTuple, emptySize);
}
- /**/
+ /* Fill the BRIN tuple for the last page range. */
if (prevblkno != InvalidBlockNumber)
{
BrinTuple *tmp;
@@ -2633,6 +2652,13 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
pfree(tmp);
}
+ /*
+ * Switch back to the originam memory context, and destroy the one we
+ * created to isolate the union_tuple calls.
+ */
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextDelete(rangeCxt);
+
/*
* Next, accumulate WAL usage. (This must wait for the workers to finish,
* or we might get incomplete data.)
--
2.42.0
From d6c08a8426907292a352d85b8fecc12d3adc8cbc Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 28 Nov 2023 18:48:21 +0100
Subject: [PATCH v20231128 4/4] pgindent
---
src/backend/access/brin/brin.c | 96 +++++++++++-----------
src/backend/utils/sort/tuplesortvariants.c | 16 ++--
src/tools/pgindent/typedefs.list | 5 ++
3 files changed, 62 insertions(+), 55 deletions(-)
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 8d96d2ac9be..001cf04aac5 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -67,14 +67,14 @@ typedef struct BrinSpool
typedef struct BrinShared
{
/*
- * These fields are not modified during the build. They primarily exist for
- * the benefit of worker processes that need to create state corresponding
- * to that used by the leader.
+ * These fields are not modified during the build. They primarily exist
+ * for the benefit of worker processes that need to create state
+ * corresponding to that used by the leader.
*/
Oid heaprelid;
Oid indexrelid;
bool isconcurrent;
- BlockNumber pagesPerRange;
+ BlockNumber pagesPerRange;
int scantuplesortstates;
/*
@@ -145,9 +145,10 @@ typedef struct BrinLeader
*
* brinshared is the shared state for entire build. sharedsort is the
* shared, tuplesort-managed state passed to each process tuplesort.
- * snapshot is the snapshot used by the scan iff an MVCC snapshot is required.
+ * snapshot is the snapshot used by the scan iff an MVCC snapshot is
+ * required.
*/
- BrinShared *brinshared;
+ BrinShared *brinshared;
Sharedsort *sharedsort;
Snapshot snapshot;
WalUsage *walusage;
@@ -1035,8 +1036,8 @@ brinbuildCallbackParallel(Relation index,
/*
* If we're in a block that belongs to a future range, summarize what
* we've got and start afresh. Note the scan might have skipped many
- * pages, if they were devoid of live tuples; we do not create emptry
- * BRIN ranges here - the leader is responsible for filling them in.
+ * pages, if they were devoid of live tuples; we do not create emptry BRIN
+ * ranges here - the leader is responsible for filling them in.
*/
if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
{
@@ -1169,23 +1170,24 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/*
* Begin serial/leader tuplesort.
*
- * In cases where parallelism is involved, the leader receives the same
- * share of maintenance_work_mem as a serial sort (it is generally treated
- * in the same way as a serial sort once we return). Parallel worker
- * Tuplesortstates will have received only a fraction of
- * maintenance_work_mem, though.
+ * In cases where parallelism is involved, the leader receives the
+ * same share of maintenance_work_mem as a serial sort (it is
+ * generally treated in the same way as a serial sort once we return).
+ * Parallel worker Tuplesortstates will have received only a fraction
+ * of maintenance_work_mem, though.
*
* We rely on the lifetime of the Leader Tuplesortstate almost not
- * overlapping with any worker Tuplesortstate's lifetime. There may be
- * some small overlap, but that's okay because we rely on leader
- * Tuplesortstate only allocating a small, fixed amount of memory here.
- * When its tuplesort_performsort() is called (by our caller), and
- * significant amounts of memory are likely to be used, all workers must
- * have already freed almost all memory held by their Tuplesortstates
- * (they are about to go away completely, too). The overall effect is
- * that maintenance_work_mem always represents an absolute high watermark
- * on the amount of memory used by a CREATE INDEX operation, regardless of
- * the use of parallelism or any other factor.
+ * overlapping with any worker Tuplesortstate's lifetime. There may
+ * be some small overlap, but that's okay because we rely on leader
+ * Tuplesortstate only allocating a small, fixed amount of memory
+ * here. When its tuplesort_performsort() is called (by our caller),
+ * and significant amounts of memory are likely to be used, all
+ * workers must have already freed almost all memory held by their
+ * Tuplesortstates (they are about to go away completely, too). The
+ * overall effect is that maintenance_work_mem always represents an
+ * absolute high watermark on the amount of memory used by a CREATE
+ * INDEX operation, regardless of the use of parallelism or any other
+ * factor.
*/
state->bs_spool->sortstate =
tuplesort_begin_index_brin(heap, index,
@@ -1198,7 +1200,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
*/
_brin_end_parallel(state->bs_leader, state);
}
- else /* no parallel index build, just do the usual thing */
+ else /* no parallel index build */
{
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
brinbuildCallback, (void *) state, NULL);
@@ -2288,9 +2290,9 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
Snapshot snapshot;
Size estbrinshared;
Size estsort;
- BrinShared *brinshared;
- Sharedsort *sharedsort;
- BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+ BrinShared *brinshared;
+ Sharedsort *sharedsort;
+ BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
WalUsage *walusage;
BufferUsage *bufferusage;
bool leaderparticipates = true;
@@ -2471,12 +2473,12 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
BrinMemTuple *memtuple = NULL;
Size tuplen;
BrinShared *brinshared = brinleader->brinshared;
- BlockNumber prevblkno = InvalidBlockNumber;
+ BlockNumber prevblkno = InvalidBlockNumber;
BrinTuple *emptyTuple = NULL;
Size emptySize;
BrinSpool *spool;
- MemoryContext rangeCxt,
- oldCxt;
+ MemoryContext rangeCxt,
+ oldCxt;
/* Shutdown worker processes */
WaitForParallelWorkersToFinish(brinleader->pcxt);
@@ -2493,18 +2495,18 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
tuplesort_performsort(spool->sortstate);
/*
- * Initialize BrinMemTuple we'll use to union summaries from workers
- * (in case they happened to produce parts of the same paga range).
+ * Initialize BrinMemTuple we'll use to union summaries from workers (in
+ * case they happened to produce parts of the same paga range).
*/
memtuple = brin_new_memtuple(state->bs_bdesc);
/*
* Create a memory context we'll reset to combine results for a single
- * page range (received from the workers). We don't expect huge number
- * of overlaps under regular circumstances, because for large tables
- * the chunk size is likely larger than the BRIN page range), but it
- * can happen, and the union functions may do all kinds of stuff. So
- * we better reset the context once in a while.
+ * page range (received from the workers). We don't expect huge number of
+ * overlaps under regular circumstances, because for large tables the
+ * chunk size is likely larger than the BRIN page range), but it can
+ * happen, and the union functions may do all kinds of stuff. So we better
+ * reset the context once in a while.
*/
rangeCxt = AllocSetContextCreate(CurrentMemoryContext,
"brin union",
@@ -2513,8 +2515,8 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
/*
* Read the BRIN tuples from the shared tuplesort, sorted by block number.
- * That probably gives us an index that is cheaper to scan, thanks to mostly
- * getting data from the same index page as before.
+ * That probably gives us an index that is cheaper to scan, thanks to
+ * mostly getting data from the same index page as before.
*/
while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
{
@@ -2528,15 +2530,15 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
* the memtuple, and continue with the next one from tuplesort. We
* however may need to insert empty summaries into the index.
*
- * If it's the same block as the last we saw, we simply union the
- * brin tuple into it, and we're done - we don't even need to insert
- * empty ranges, because that was done earlier when we saw the first
- * brin tuple (for this range).
+ * If it's the same block as the last we saw, we simply union the brin
+ * tuple into it, and we're done - we don't even need to insert empty
+ * ranges, because that was done earlier when we saw the first brin
+ * tuple (for this range).
*
* Finally, if it's not the first brin tuple, and it's not the same
- * page range, we need to do the insert and then deform the tuple
- * into the memtuple. Then we'll insert empty ranges before the
- * new brin tuple, if needed.
+ * page range, we need to do the insert and then deform the tuple into
+ * the memtuple. Then we'll insert empty ranges before the new brin
+ * tuple, if needed.
*/
if (prevblkno == InvalidBlockNumber)
{
@@ -2730,7 +2732,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
bool progress)
{
SortCoordinate coordinate;
- TableScanDesc scan;
+ TableScanDesc scan;
double reltuples;
IndexInfo *indexInfo;
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 9b3a70e6ccf..90fc605f1ca 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -965,8 +965,8 @@ tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward)
{
TuplesortPublic *base = TuplesortstateGetPublic(state);
MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext);
- SortTuple stup;
- BrinSortTuple *btup;
+ SortTuple stup;
+ BrinSortTuple *btup;
if (!tuplesort_gettuple_common(state, forward, &stup))
stup.tuple = NULL;
@@ -1708,7 +1708,7 @@ removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count)
for (i = 0; i < count; i++)
{
- BrinSortTuple *tuple;
+ BrinSortTuple *tuple;
tuple = stups[i].tuple;
stups[i].datum1 = tuple->tuple.bt_blkno;
@@ -1735,8 +1735,8 @@ static void
writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
{
TuplesortPublic *base = TuplesortstateGetPublic(state);
- BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple;
- unsigned int tuplen = tuple->tuplen;
+ BrinSortTuple *tuple = (BrinSortTuple *) stup->tuple;
+ unsigned int tuplen = tuple->tuplen;
tuplen = tuplen + sizeof(tuplen);
LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
@@ -1749,6 +1749,7 @@ static void
readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
LogicalTape *tape, unsigned int len)
{
+ BrinSortTuple *tuple;
TuplesortPublic *base = TuplesortstateGetPublic(state);
unsigned int tuplen = len - sizeof(unsigned int);
@@ -1756,9 +1757,8 @@ readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
* Allocate space for the BRIN sort tuple, which is BrinTuple with an
* extra length field.
*/
- BrinSortTuple *tuple
- = (BrinSortTuple *) tuplesort_readtup_alloc(state,
- BRINSORTTUPLE_SIZE(tuplen));
+ tuple = (BrinSortTuple *) tuplesort_readtup_alloc(state,
+ BRINSORTTUPLE_SIZE(tuplen));
tuple->tuplen = tuplen;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 86a9886d4f7..001fef58652 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -297,13 +297,17 @@ BpChar
BrinBuildState
BrinDesc
BrinInsertState
+BrinLeader
BrinMemTuple
BrinMetaPageData
BrinOpaque
BrinOpcInfo
BrinOptions
BrinRevmap
+BrinShared
+BrinSortTuple
BrinSpecialSpace
+BrinSpool
BrinStatsData
BrinTuple
BrinValues
@@ -2879,6 +2883,7 @@ TupleTableSlotOps
TuplesortClusterArg
TuplesortDatumArg
TuplesortIndexArg
+TuplesortIndexBrinArg
TuplesortIndexBTreeArg
TuplesortIndexHashArg
TuplesortInstrumentation
--
2.42.0