Hi,
After looking into parallel builds for BRIN and GIN indexes, I was
wondering if there's a way to do parallel builds for GiST too. I knew
next to nothing about how GiST works, but I gave it a shot and here's
what I have - the attached patch allows parallel GiST builds for the
"unsorted" case (i.e. when the opclass does not include sortsupport),
and does not support buffered builds.
unsorted builds only
--------------------
Addressing only the unsorted case may seem a bit weird, but I did it
this way for two reasons - parallel sort is a solved problem, and adding
this to the patch seems quite straightforward. It's what btree does, for
example. But I also was not very sure how common this is - we do have
sort for points, but I have no idea if the PostGIS indexes define
sorting etc. My guess was "no" but I've been told that's no longer true,
so I guess sorted builds are more widely applicable than I thought.
In any case, I'm not in a rush to parallelize sorted builds. It can be
added later, as an improvement, IMHO. In fact, it's a well isolated part
of the patch, which might make it a good choice for someone looking for
an idea for their first patch ...
buffered builds
---------------
The lack of support for buffered builds is a very different thing. The
basic idea is that we don't push the index entries all the way to the
leaf pages right away, but accumulate them in buffers half-way through.
This combines writes and reduces random I/O, which is nice.
Unfortunately, the way it's implemented does not work with parallel
builds at all - all the state is in private memory, and it assumes the
worker is the only possible backend that can split the page (at which
point the buffers need to be split too, etc.). But for parallel builds
this is obviously not true.
I'm not saying parallel builds can't do similar buffering, but it
requires moving the buffers into shared memory, and introducing locking
to coordinate accesses to the buffers. (Or perhaps it might be enough to
only "notify" the workers about page splits, with buffers still in
private memory?). Anyway, it seems far too complicated for v1.
In fact, I'm not sure the buffering is entirely necessary - maybe the
increase in amount of RAM makes this less of an issue? If the index can
fit into shared buffers (or at least page cache), maybe the amount of
extra I/O is not that bad? I'm sure there may be cases really affected
by this, but maybe it's OK to tell people to disable parallel builds in
those cases?
gistGetFakeLSN
--------------
One more thing - GiST disables WAL-logging during the build, and only
logs it once at the end. For serial builds this is fine, because there
are no concurrent splits, and so we don't need to rely on page LSNs to
detect these cases (in fact, is uses a bogus value).
But for parallel builds this would not work - we need page LSNs that
actually change, otherwise we'd miss page splits, and the index build
would either fail or produce a broken index. But the existing is_build
flag affects both things, so I had to introduce a new "is_parallel" flag
which only affects the page LSN part, using the gistGetFakeLSN()
function, previously used only for unlogged indexes.
This means we'll produce WAL during the index build (because
gistGetFakeLSN() writes a trivial message into WAL). Compared to the
serial builds this produces maybe 25-75% more WAL, but it's an order of
magnitude less than with "full" WAL logging (is_build=false).
For example, serial build of 5GB index needs ~5GB of WAL. A parallel
build may need ~7GB, while a parallel build with "full" logging would
use 50GB. I think this is a reasonable trade off.
There's one "strange" thing, though - the amount of WAL decreases with
the number of parallel workers. Consider for example an index on a
numeric field, where the index is ~9GB, but the amount of WAL changes
like this (0 workers means serial builds):
parallel workers 0 1 3 5 7
WAL (GB) 5.7 9.2 7.6 7.0 6.8
The explanation for this is fairly simple (AFAIK) - gistGetFakeLSN
determines if it needs to actually assign a new LSN (and write stuff to
WAL) by comparing the last LSN assigned (in a given worker) to the
current insert LSN. But the current insert LSN might have been updated
by some other worker, in which case we simply use that. Which means that
multiple workers may use the same fake LSN, and the likelihood increases
with the number of workers - and this is consistent with the observed
behavior of the WAL decreasing as the number of workers increases
(because more workers use the same LSN).
I'm not entirely sure if this is OK or a problem. I was worried two
workers might end up using the same LSN for the same page, leading to
other workers not noticing the split. But after a week of pretty
intensive stress testing, I haven't seen a single such failure ...
If this turns out to be a problem, the fix is IMHO quite simple - it
should be enough to force gistGetFakeLSN() to produce a new fake LSN
every time when is_parallel=true.
performance
-----------
Obviously, the primary goal of the patch is to speed up the builds, so
does it actually do that? For indexes of different sizes I got this
timings (in seconds):
scale type 0 1 3 5 7
------------------------------------------------------------------
small inet 13 7 4 4 2
numeric 239 122 67 46 36
oid 15 8 5 3 2
text 71 35 19 13 10
medium inet 207 111 59 42 32
numeric 3409 1714 885 618 490
oid 214 114 60 43 33
text 940 479 247 180 134
large inet 2167 1459 865 632 468
numeric 38125 20256 10454 7487 5846
oid 2647 1490 808 594 475
text 10987 6298 3376 2462 1961
Here small is ~100-200MB index, medium is 1-2GB and large 10-20GB index,
depending on the data type.
The raw duration is not particularly easy to interpret, so let's look at
the "actual speedup" which is calculated as
(serial duration) / (parallel duration)
and the table looks like this:
scale type 1 3 5 7
--------------------------------------------------------------
small inet 1.9 3.3 3.3 6.5
numeric 2.0 3.6 5.2 6.6
oid 1.9 3.0 5.0 7.5
text 2.0 3.7 5.5 7.1
medium inet 1.9 3.5 4.9 6.5
numeric 2.0 3.9 5.5 7.0
oid 1.9 3.6 5.0 6.5
text 2.0 3.8 5.2 7.0
large inet 1.5 2.5 3.4 4.6
numeric 1.9 3.6 5.1 6.5
oid 1.8 3.3 4.5 5.6
text 1.7 3.3 4.5 5.6
Ideally (if the build scaled linearly with the number of workers), we'd
get the number of workers + 1 (because the leader participates too).
Obviously, it's not that great - for example for text with 3 workers we
get 3.3 instead of 4.0, and 5.6 vs. 8 with 7 workers.
But I think those numbers are actually pretty good - I'd definitely not
complain if my index builds got 5x faster.
But those are synthetic tests on random data, using the btree_gist
opclasses. It'd be interesting if people could do their own testing on
real-world data sets.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From f684556a910f566a8c6a6ea0dd588173ab94a245 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <t...@fuzzy.cz>
Date: Sun, 26 May 2024 21:44:27 +0200
Subject: [PATCH v20240607] WIP parallel GiST build
Implements parallel GiST index build for the unsorted case. The build
simply starts parallel workers that insert values into the index the
usual way (as if there were multiple clients doing INSERT).
The basic infrastructure is copied from parallel BRIN builts (and also
from the nearby parallel GIN build). There's nothing particularly
special or interesting, except for the gistBuildParallelCallback()
callback. The two significant changes in the callback are:
1) disabling buffering
Buffered builds assume the worker is the only backend that can split
index pages etc. With serial workers that is trivially true, but with
parallel workers this leads to confusion.
In principle this is solvable by moving the buffers into shared memory
and coordinating the workers (locking etc.). But the patch does not do
that yet - it's clearly non-trivial, and I'm not really convinced it's
worth it.
2) generating "proper" fake LSNs
The serial builds disable all WAL-logging for the index, until the very
end when the whole index is WAL-logged. This however also means we don't
set page LSNs on the index pages - but page LSNs are used to detect
concurrent changes to the index structure (e.g. page splits). For serial
builds this does not matter, because only the build worker can modify
the index, so it just sets the same LSN "1" for all pages. Both of this
(disabling WAL-logging and using bogus page LSNs) is controlled by the
same flag is_build.
Having the same page LSN does not work for parallel builds, as it would
mean workers won't notice splits done by other workers, etc.
One option would be to set is_bild=false, which enables WAL-logging, as
if during regular inserts, and also assigns proper page LSNs. But we
don't want to WAL-log everything, that's unnecessary. We want to only
start WAL-logging the index once the build completes, just like for
serial builds. And only do the fake LSNs, as for unlogged indexes etc.
So this introduces a separate flag is_parallel, which forces generating
the "proper" fake LSN. But we can still do is_build=true, and only log
the index at the end of the build.
---
src/backend/access/gist/gist.c | 37 +-
src/backend/access/gist/gistbuild.c | 713 +++++++++++++++++++++++++-
src/backend/access/gist/gistutil.c | 10 +-
src/backend/access/gist/gistvacuum.c | 6 +-
src/backend/access/transam/parallel.c | 4 +
src/include/access/gist_private.h | 12 +-
src/tools/pgindent/typedefs.list | 2 +
7 files changed, 739 insertions(+), 45 deletions(-)
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index ed4ffa63a77..f5f56fb2503 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -75,7 +75,7 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
- amroutine->amcanbuildparallel = false;
+ amroutine->amcanbuildparallel = true;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amsummarizing = false;
@@ -182,7 +182,7 @@ gistinsert(Relation r, Datum *values, bool *isnull,
values, isnull, true /* size is currently bogus */ );
itup->t_tid = *ht_ctid;
- gistdoinsert(r, itup, 0, giststate, heapRel, false);
+ gistdoinsert(r, itup, 0, giststate, heapRel, false, false);
/* cleanup */
MemoryContextSwitchTo(oldCxt);
@@ -230,7 +230,8 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
List **splitinfo,
bool markfollowright,
Relation heapRel,
- bool is_build)
+ bool is_build,
+ bool is_parallel)
{
BlockNumber blkno = BufferGetBlockNumber(buffer);
Page page = BufferGetPage(buffer);
@@ -501,9 +502,17 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
* smaller than any real or fake unlogged LSN that might be generated
* later. (There can't be any concurrent scans during index build, so
* we don't need to be able to detect concurrent splits yet.)
+ *
+ * However, with a parallel index build, we need to assign valid LSN,
+ * as it's used to detect concurrent index modifications.
*/
if (is_build)
- recptr = GistBuildLSN;
+ {
+ if (is_parallel)
+ recptr = gistGetFakeLSN(rel, is_parallel);
+ else
+ recptr = GistBuildLSN;
+ }
else
{
if (RelationNeedsWAL(rel))
@@ -511,7 +520,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
dist, oldrlink, oldnsn, leftchildbuf,
markfollowright);
else
- recptr = gistGetFakeLSN(rel);
+ recptr = gistGetFakeLSN(rel, false);
}
for (ptr = dist; ptr; ptr = ptr->next)
@@ -570,7 +579,12 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
MarkBufferDirty(leftchildbuf);
if (is_build)
- recptr = GistBuildLSN;
+ {
+ if (is_parallel)
+ recptr = gistGetFakeLSN(rel, is_parallel);
+ else
+ recptr = GistBuildLSN;
+ }
else
{
if (RelationNeedsWAL(rel))
@@ -589,7 +603,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
leftchildbuf);
}
else
- recptr = gistGetFakeLSN(rel);
+ recptr = gistGetFakeLSN(rel, false);
}
PageSetLSN(page, recptr);
@@ -632,7 +646,8 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
*/
void
gistdoinsert(Relation r, IndexTuple itup, Size freespace,
- GISTSTATE *giststate, Relation heapRel, bool is_build)
+ GISTSTATE *giststate, Relation heapRel, bool is_build,
+ bool is_parallel)
{
ItemId iid;
IndexTuple idxtuple;
@@ -646,6 +661,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace,
state.r = r;
state.heapRel = heapRel;
state.is_build = is_build;
+ state.is_parallel = is_parallel;
/* Start from the root */
firststack.blkno = GIST_ROOT_BLKNO;
@@ -1303,7 +1319,8 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
&splitinfo,
true,
state->heapRel,
- state->is_build);
+ state->is_build,
+ state->is_parallel);
/*
* Before recursing up in case the page was split, release locks on the
@@ -1722,7 +1739,7 @@ gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
PageSetLSN(page, recptr);
}
else
- PageSetLSN(page, gistGetFakeLSN(rel));
+ PageSetLSN(page, gistGetFakeLSN(rel, false));
END_CRIT_SECTION();
}
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index ba06df30faf..c8fa67beebb 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -36,18 +36,28 @@
#include "access/genam.h"
#include "access/gist_private.h"
+#include "access/table.h"
#include "access/tableam.h"
#include "access/xloginsert.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
+#include "pgstat.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
-
+#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tuplesort.h"
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_GIST_SHARED UINT64CONST(0xB000000000000001)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000002)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000004)
+
/* Step of index tuples for check whether to switch to buffering build mode */
#define BUFFERING_MODE_SWITCH_CHECK_STEP 256
@@ -78,6 +88,106 @@ typedef enum
GIST_BUFFERING_ACTIVE, /* in buffering build mode */
} GistBuildMode;
+/*
+ * Status for index builds performed in parallel. This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct GISTShared
+{
+ /*
+ * These fields are not modified during the build. They primarily exist
+ * for the benefit of worker processes that need to create state
+ * corresponding to that used by the leader.
+ *
+ * XXX nparticipants is the number or workers we expect to participage in
+ * the build, possibly including the leader process.
+ */
+ Oid heaprelid;
+ Oid indexrelid;
+ bool isconcurrent;
+ int nparticipants;
+
+ /* Parameters determined by the leader, passed to the workers. */
+ GistBuildMode buildMode;
+ int freespace;
+
+ /*
+ * workersdonecv is used to monitor the progress of workers. All parallel
+ * participants must indicate that they are done before leader can finish
+ * building the index.
+ */
+ ConditionVariable workersdonecv;
+
+ /*
+ * mutex protects all fields before heapdesc.
+ *
+ * These fields contain status information of interest to GIST index
+ * builds that must work just the same when an index is built in parallel.
+ */
+ slock_t mutex;
+
+ /*
+ * Mutable state that is maintained by workers, and reported back to
+ * leader at end of the scans.
+ *
+ * nparticipantsdone is number of worker processes finished.
+ *
+ * reltuples is the total number of input heap tuples.
+ *
+ * indtuples is the total number of tuples that made it into the index.
+ */
+ int nparticipantsdone;
+ double reltuples;
+ double indtuples;
+
+ /*
+ * ParallelTableScanDescData data follows. Can't directly embed here, as
+ * implementations of the parallel table scan desc interface might need
+ * stronger alignment.
+ */
+} GISTShared;
+
+/*
+ * Return pointer to a GISTShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromGistShared(shared) \
+ (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GISTShared)))
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct GISTLeader
+{
+ /* parallel context itself */
+ ParallelContext *pcxt;
+
+ /*
+ * nparticipants is the exact number of worker processes successfully
+ * launched, plus one leader process if it participates as a worker (only
+ * DISABLE_LEADER_PARTICIPATION builds avoid leader participating as a
+ * worker).
+ *
+ * XXX Seems a bit redundant with nparticipants in GISTShared. Although
+ * that is the expected number, this is what we actually got.
+ */
+ int nparticipants;
+
+ /*
+ * Leader process convenience pointers to shared state (leader avoids TOC
+ * lookups).
+ *
+ * GISTShared is the shared state for entire build. snapshot is the
+ * snapshot used by the scan iff an MVCC snapshot is required.
+ */
+ GISTShared *gistshared;
+ Snapshot snapshot;
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+} GISTLeader;
+
/* Working state for gistbuild and its callback */
typedef struct
{
@@ -100,6 +210,14 @@ typedef struct
GISTBuildBuffers *gfbb;
HTAB *parentMap;
+ /*
+ * gist_leader is only present when a parallel index build is performed,
+ * and only in the leader process. (Actually, only the leader process has
+ * a GISTBuildState.)
+ */
+ bool is_parallel;
+ GISTLeader *gist_leader;
+
/*
* Extra data structures used during a sorting build.
*/
@@ -148,6 +266,12 @@ static void gistBuildCallback(Relation index,
bool *isnull,
bool tupleIsAlive,
void *state);
+static void gistBuildParallelCallback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state);
static void gistBufferingBuildInsert(GISTBuildState *buildstate,
IndexTuple itup);
static bool gistProcessItup(GISTBuildState *buildstate, IndexTuple itup,
@@ -171,6 +295,18 @@ static void gistMemorizeAllDownlinks(GISTBuildState *buildstate,
Buffer parentbuf);
static BlockNumber gistGetParent(GISTBuildState *buildstate, BlockNumber child);
+/* parallel index builds */
+static void _gist_begin_parallel(GISTBuildState *buildstate, Relation heap, Relation index,
+ bool isconcurrent, int request);
+static void _gist_end_parallel(GISTLeader *gistleader, GISTBuildState *state);
+static Size _gist_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static double _gist_parallel_heapscan(GISTBuildState *buildstate);
+static void _gist_leader_participate_as_worker(GISTBuildState *buildstate,
+ Relation heap, Relation index);
+static void _gist_parallel_scan_and_build(GISTBuildState *buildstate,
+ GISTShared *gistshared,
+ Relation heap, Relation index,
+ int workmem, bool progress);
/*
* Main entry point to GiST index build.
@@ -199,6 +335,10 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
buildstate.sortstate = NULL;
buildstate.giststate = initGISTstate(index);
+ /* assume serial build */
+ buildstate.is_parallel = false;
+ buildstate.gist_leader = NULL;
+
/*
* Create a temporary memory context that is reset once for each tuple
* processed. (Note: we don't bother to make this a child of the
@@ -309,37 +449,79 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
END_CRIT_SECTION();
- /* Scan the table, inserting all the tuples to the index. */
- reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
- gistBuildCallback,
- (void *) &buildstate, NULL);
-
/*
- * If buffering was used, flush out all the tuples that are still in
- * the buffers.
+ * Attempt to launch parallel worker scan when required
+ *
+ * XXX plan_create_index_workers makes the number of workers dependent
+ * on maintenance_work_mem, requiring 32MB for each worker. That makes
+ * sense for btree, but maybe not for GIST (at least when not using
+ * buffering)? So maybe make that somehow less strict, optionally?
*/
- if (buildstate.buildMode == GIST_BUFFERING_ACTIVE)
- {
- elog(DEBUG1, "all tuples processed, emptying buffers");
- gistEmptyAllBuffers(&buildstate);
- gistFreeBuildBuffers(buildstate.gfbb);
- }
+ if (indexInfo->ii_ParallelWorkers > 0)
+ _gist_begin_parallel(&buildstate, heap,
+ index, indexInfo->ii_Concurrent,
+ indexInfo->ii_ParallelWorkers);
/*
- * We didn't write WAL records as we built the index, so if
- * WAL-logging is required, write all pages to the WAL now.
+ * If parallel build requested and at least one worker process was
+ * successfully launched, set up coordination state, wait for workers
+ * to complete and end the parallel build.
+ *
+ * In serial mode, simply scan the table and build the index one index
+ * tuple at a time.
*/
- if (RelationNeedsWAL(index))
+ if (buildstate.gist_leader)
{
- log_newpage_range(index, MAIN_FORKNUM,
- 0, RelationGetNumberOfBlocks(index),
- true);
+ /* scan the relation and wait for parallel workers to finish */
+ reltuples = _gist_parallel_heapscan(&buildstate);
+
+ _gist_end_parallel(buildstate.gist_leader, &buildstate);
+
+ /*
+ * We didn't write WAL records as we built the index, so if WAL-logging is
+ * required, write all pages to the WAL now.
+ */
+ if (RelationNeedsWAL(index))
+ {
+ log_newpage_range(index, MAIN_FORKNUM,
+ 0, RelationGetNumberOfBlocks(index),
+ true);
+ }
}
- }
+ else
+ {
+ /* Scan the table, inserting all the tuples to the index. */
+ reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+ gistBuildCallback,
+ (void *) &buildstate, NULL);
- /* okay, all heap tuples are indexed */
- MemoryContextSwitchTo(oldcxt);
- MemoryContextDelete(buildstate.giststate->tempCxt);
+ /*
+ * If buffering was used, flush out all the tuples that are still
+ * in the buffers.
+ */
+ if (buildstate.buildMode == GIST_BUFFERING_ACTIVE)
+ {
+ elog(DEBUG1, "all tuples processed, emptying buffers");
+ gistEmptyAllBuffers(&buildstate);
+ gistFreeBuildBuffers(buildstate.gfbb);
+ }
+
+ /*
+ * We didn't write WAL records as we built the index, so if
+ * WAL-logging is required, write all pages to the WAL now.
+ */
+ if (RelationNeedsWAL(index))
+ {
+ log_newpage_range(index, MAIN_FORKNUM,
+ 0, RelationGetNumberOfBlocks(index),
+ true);
+ }
+
+ /* okay, all heap tuples are indexed */
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(buildstate.giststate->tempCxt);
+ }
+ }
freeGISTstate(buildstate.giststate);
@@ -861,7 +1043,7 @@ gistBuildCallback(Relation index,
* locked, we call gistdoinsert directly.
*/
gistdoinsert(index, itup, buildstate->freespace,
- buildstate->giststate, buildstate->heaprel, true);
+ buildstate->giststate, buildstate->heaprel, true, false);
}
MemoryContextSwitchTo(oldCtx);
@@ -900,6 +1082,48 @@ gistBuildCallback(Relation index,
}
}
+/*
+ * Per-tuple callback for table_index_build_scan.
+ *
+ * XXX Almost the same as gistBuildCallback, but with is_build=false when
+ * calling gistdoinsert. Otherwise we get assert failures due to workers
+ * modifying the index concurrently.
+ */
+static void
+gistBuildParallelCallback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state)
+{
+ GISTBuildState *buildstate = (GISTBuildState *) state;
+ IndexTuple itup;
+ MemoryContext oldCtx;
+
+ oldCtx = MemoryContextSwitchTo(buildstate->giststate->tempCxt);
+
+ /* form an index tuple and point it at the heap tuple */
+ itup = gistFormTuple(buildstate->giststate, index,
+ values, isnull,
+ true);
+ itup->t_tid = *tid;
+
+ /* Update tuple count and total size. */
+ buildstate->indtuples += 1;
+ buildstate->indtuplesSize += IndexTupleSize(itup);
+
+ /*
+ * There's no buffers (yet). Since we already have the index relation
+ * locked, we call gistdoinsert directly.
+ */
+ gistdoinsert(index, itup, buildstate->freespace,
+ buildstate->giststate, buildstate->heaprel, true, true);
+
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextReset(buildstate->giststate->tempCxt);
+}
+
/*
* Insert function for buffering index build.
*/
@@ -1068,7 +1292,8 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
InvalidBuffer,
&splitinfo,
false,
- buildstate->heaprel, true);
+ buildstate->heaprel, true,
+ buildstate->is_parallel);
/*
* If this is a root split, update the root path item kept in memory. This
@@ -1577,3 +1802,439 @@ gistGetParent(GISTBuildState *buildstate, BlockNumber child)
return entry->parentblkno;
}
+
+/*
+ * Create parallel context, and launch workers for leader.
+ *
+ * buildstate argument should be initialized
+ *
+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
+ *
+ * request is the target number of parallel worker processes to launch.
+ *
+ * Sets buildstate's gistLeader, which caller must use to shut down parallel
+ * mode by passing it to _gist_end_parallel() at the very end of its index
+ * build. If not even a single worker process can be launched, this is
+ * never set, and caller should proceed with a serial index build.
+ */
+static void
+_gist_begin_parallel(GISTBuildState *buildstate, Relation heap, Relation index,
+ bool isconcurrent, int request)
+{
+ ParallelContext *pcxt;
+ int nparticipants;
+ Snapshot snapshot;
+ Size estgistshared;
+ GISTShared *gistshared;
+ GISTLeader *gistleader = (GISTLeader *) palloc0(sizeof(GISTLeader));
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+ bool leaderparticipates = true;
+ int querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+ leaderparticipates = false;
+#endif
+
+ /*
+ * Enter parallel mode, and create context for parallel build of GIST
+ * index
+ */
+ EnterParallelMode();
+ Assert(request > 0);
+ pcxt = CreateParallelContext("postgres", "_gist_parallel_build_main",
+ request);
+
+ nparticipants = leaderparticipates ? request + 1 : request;
+
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we use
+ * SnapshotAny because we must retrieve all tuples and do our own time
+ * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+ * concurrent build, we take a regular MVCC snapshot and index whatever's
+ * live according to that.
+ */
+ if (!isconcurrent)
+ snapshot = SnapshotAny;
+ else
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ /*
+ * Estimate size for our own PARALLEL_KEY_GIST_SHARED workspace.
+ */
+ estgistshared = _gist_parallel_estimate_shared(heap, snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, estgistshared);
+
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /*
+ * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+ * and PARALLEL_KEY_BUFFER_USAGE.
+ *
+ * If there are no extensions loaded that care, we could skip this. We
+ * have no way of knowing whether anyone's looking at pgWalUsage or
+ * pgBufferUsage, so do it unconditionally.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+ if (debug_query_string)
+ {
+ querylen = strlen(debug_query_string);
+ shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ else
+ querylen = 0; /* keep compiler quiet */
+
+ /* Everyone's had a chance to ask for space, so now create the DSM */
+ InitializeParallelDSM(pcxt);
+
+ /* If no DSM segment was available, back out (do serial build) */
+ if (pcxt->seg == NULL)
+ {
+ if (IsMVCCSnapshot(snapshot))
+ UnregisterSnapshot(snapshot);
+ DestroyParallelContext(pcxt);
+ ExitParallelMode();
+ return;
+ }
+
+ /* Store shared build state, for which we reserved space */
+ gistshared = (GISTShared *) shm_toc_allocate(pcxt->toc, estgistshared);
+ /* Initialize immutable state */
+ gistshared->heaprelid = RelationGetRelid(heap);
+ gistshared->indexrelid = RelationGetRelid(index);
+ gistshared->isconcurrent = isconcurrent;
+ gistshared->nparticipants = nparticipants;
+
+ /* */
+ gistshared->buildMode = buildstate->buildMode;
+ gistshared->freespace = buildstate->freespace;
+
+ ConditionVariableInit(&gistshared->workersdonecv);
+ SpinLockInit(&gistshared->mutex);
+
+ /* Initialize mutable state */
+ gistshared->nparticipantsdone = 0;
+ gistshared->reltuples = 0.0;
+ gistshared->indtuples = 0.0;
+
+ table_parallelscan_initialize(heap,
+ ParallelTableScanFromGistShared(gistshared),
+ snapshot);
+
+ /* Store shared state, for which we reserved space. */
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIST_SHARED, gistshared);
+
+ /* Store query string for workers */
+ if (debug_query_string)
+ {
+ char *sharedquery;
+
+ sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+ memcpy(sharedquery, debug_query_string, querylen + 1);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+ }
+
+ /*
+ * Allocate space for each worker's WalUsage and BufferUsage; no need to
+ * initialize.
+ */
+ walusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+ bufferusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+ /* Launch workers, saving status for leader/caller */
+ LaunchParallelWorkers(pcxt);
+ gistleader->pcxt = pcxt;
+ gistleader->nparticipants = pcxt->nworkers_launched;
+ if (leaderparticipates)
+ gistleader->nparticipants++;
+ gistleader->gistshared = gistshared;
+ gistleader->snapshot = snapshot;
+ gistleader->walusage = walusage;
+ gistleader->bufferusage = bufferusage;
+
+ /* If no workers were successfully launched, back out (do serial build) */
+ if (pcxt->nworkers_launched == 0)
+ {
+ _gist_end_parallel(gistleader, NULL);
+ return;
+ }
+
+ /* Save leader state now that it's clear build will be parallel */
+ buildstate->is_parallel = true;
+ buildstate->gist_leader = gistleader;
+
+ /* Join heap scan ourselves */
+ if (leaderparticipates)
+ _gist_leader_participate_as_worker(buildstate, heap, index);
+
+ /*
+ * Caller needs to wait for all launched workers when we return. Make
+ * sure that the failure-to-start case will not hang forever.
+ */
+ WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_gist_end_parallel(GISTLeader *gistleader, GISTBuildState *state)
+{
+ int i;
+
+ /* Shutdown worker processes */
+ WaitForParallelWorkersToFinish(gistleader->pcxt);
+
+ /*
+ * Next, accumulate WAL usage. (This must wait for the workers to finish,
+ * or we might get incomplete data.)
+ */
+ for (i = 0; i < gistleader->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(&gistleader->bufferusage[i], &gistleader->walusage[i]);
+
+ /* Free last reference to MVCC snapshot, if one was used */
+ if (IsMVCCSnapshot(gistleader->snapshot))
+ UnregisterSnapshot(gistleader->snapshot);
+ DestroyParallelContext(gistleader->pcxt);
+ ExitParallelMode();
+}
+
+/*
+ * Within leader, wait for end of heap scan.
+ *
+ * When called, parallel heap scan started by _gist_begin_parallel() will
+ * already be underway within worker processes (when leader participates
+ * as a worker, we should end up here just as workers are finishing).
+ *
+ * Returns the total number of heap tuples scanned.
+ *
+ * FIXME Maybe needs to flush data if GIST_BUFFERING_ACTIVE, a bit like in
+ * the serial build?
+ */
+static double
+_gist_parallel_heapscan(GISTBuildState *state)
+{
+ GISTShared *gistshared = state->gist_leader->gistshared;
+ int nparticipants;
+
+ nparticipants = state->gist_leader->nparticipants;
+ for (;;)
+ {
+ SpinLockAcquire(&gistshared->mutex);
+ if (gistshared->nparticipantsdone == nparticipants)
+ {
+ /* copy the data into leader state */
+ state->indtuples = gistshared->indtuples;
+
+ SpinLockRelease(&gistshared->mutex);
+ break;
+ }
+ SpinLockRelease(&gistshared->mutex);
+
+ ConditionVariableSleep(&gistshared->workersdonecv,
+ WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
+ }
+
+ ConditionVariableCancelSleep();
+
+ return state->indtuples;
+}
+
+
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * gist index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_gist_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+ /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+ return add_size(BUFFERALIGN(sizeof(GISTShared)),
+ table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_gist_leader_participate_as_worker(GISTBuildState *buildstate,
+ Relation heap, Relation index)
+{
+ GISTLeader *gistleader = buildstate->gist_leader;
+ int workmem;
+
+ /*
+ * Might as well use reliable figure when doling out maintenance_work_mem
+ * (when requested number of workers were not launched, this will be
+ * somewhat higher than it is for other workers).
+ */
+ workmem = maintenance_work_mem / gistleader->nparticipants;
+
+ /* Perform work common to all participants */
+ _gist_parallel_scan_and_build(buildstate, gistleader->gistshared,
+ heap, index, workmem, true);
+}
+
+/*
+ * Perform a worker's portion of a parallel scan and insert.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_gist_parallel_scan_and_build(GISTBuildState *state,
+ GISTShared *gistshared,
+ Relation heap, Relation index,
+ int workmem, bool progress)
+{
+ TableScanDesc scan;
+ double reltuples;
+ IndexInfo *indexInfo;
+ MemoryContext oldcxt = CurrentMemoryContext;
+
+ /* Join parallel scan */
+ indexInfo = BuildIndexInfo(index);
+ indexInfo->ii_Concurrent = gistshared->isconcurrent;
+
+ scan = table_beginscan_parallel(heap,
+ ParallelTableScanFromGistShared(gistshared));
+
+ reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+ gistBuildParallelCallback, state, scan);
+
+ /*
+ * If buffering was used, flush out all the tuples that are still in the
+ * buffers.
+ */
+ if (state->buildMode == GIST_BUFFERING_ACTIVE)
+ {
+ elog(DEBUG1, "all tuples processed, emptying buffers");
+ gistEmptyAllBuffers(state);
+ gistFreeBuildBuffers(state->gfbb);
+ }
+
+ /* okay, all heap tuples are indexed */
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(state->giststate->tempCxt);
+
+ /* FIXME Do we need to do something else with active buffering? */
+
+ /*
+ * Done. Record ambuild statistics.
+ */
+ SpinLockAcquire(&gistshared->mutex);
+ gistshared->nparticipantsdone++;
+ gistshared->reltuples += reltuples;
+ gistshared->indtuples += state->indtuples;
+ SpinLockRelease(&gistshared->mutex);
+
+ /* Notify leader */
+ ConditionVariableSignal(&gistshared->workersdonecv);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_gist_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+ char *sharedquery;
+ GISTShared *gistshared;
+ GISTBuildState buildstate;
+ Relation heapRel;
+ Relation indexRel;
+ LOCKMODE heapLockmode;
+ LOCKMODE indexLockmode;
+ WalUsage *walusage;
+ BufferUsage *bufferusage;
+ int workmem;
+
+ /*
+ * The only possible status flag that can be set to the parallel worker is
+ * PROC_IN_SAFE_IC.
+ */
+ Assert((MyProc->statusFlags == 0) ||
+ (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+ /* Set debug_query_string for individual workers first */
+ sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+ debug_query_string = sharedquery;
+
+ /* Report the query string from leader */
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+ /* Look up GIST shared state */
+ gistshared = shm_toc_lookup(toc, PARALLEL_KEY_GIST_SHARED, false);
+
+ /* Open relations using lock modes known to be obtained by index.c */
+ if (!gistshared->isconcurrent)
+ {
+ heapLockmode = ShareLock;
+ indexLockmode = AccessExclusiveLock;
+ }
+ else
+ {
+ heapLockmode = ShareUpdateExclusiveLock;
+ indexLockmode = RowExclusiveLock;
+ }
+
+ /* Open relations within worker */
+ heapRel = table_open(gistshared->heaprelid, heapLockmode);
+ indexRel = index_open(gistshared->indexrelid, indexLockmode);
+
+ buildstate.indexrel = indexRel;
+ buildstate.heaprel = heapRel;
+ buildstate.sortstate = NULL;
+ buildstate.giststate = initGISTstate(indexRel);
+
+ buildstate.is_parallel = true;
+ buildstate.gist_leader = NULL;
+
+ /*
+ * Create a temporary memory context that is reset once for each tuple
+ * processed. (Note: we don't bother to make this a child of the
+ * giststate's scanCxt, so we have to delete it separately at the end.)
+ */
+ buildstate.giststate->tempCxt = createTempGistContext();
+
+ /* FIXME */
+ buildstate.buildMode = gistshared->buildMode;
+ buildstate.freespace = gistshared->freespace;
+
+ buildstate.indtuples = 0;
+ buildstate.indtuplesSize = 0;
+
+ /* Prepare to track buffer usage during parallel execution */
+ InstrStartParallelQuery();
+
+ /*
+ * Might as well use reliable figure when doling out maintenance_work_mem
+ * (when requested number of workers were not launched, this will be
+ * somewhat higher than it is for other workers).
+ */
+ workmem = maintenance_work_mem / gistshared->nparticipants;
+
+ _gist_parallel_scan_and_build(&buildstate, gistshared,
+ heapRel, indexRel, workmem, false);
+
+ /* Report WAL/buffer usage during parallel execution */
+ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+ walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+ &walusage[ParallelWorkerNumber]);
+
+ index_close(indexRel, indexLockmode);
+ table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 78e98d68b15..733d5849317 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -1012,7 +1012,7 @@ gistproperty(Oid index_oid, int attno,
* purpose.
*/
XLogRecPtr
-gistGetFakeLSN(Relation rel)
+gistGetFakeLSN(Relation rel, bool is_parallel)
{
if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
{
@@ -1035,8 +1035,12 @@ gistGetFakeLSN(Relation rel)
static XLogRecPtr lastlsn = InvalidXLogRecPtr;
XLogRecPtr currlsn = GetXLogInsertRecPtr();
- /* Shouldn't be called for WAL-logging relations */
- Assert(!RelationNeedsWAL(rel));
+ /*
+ * Shouldn't be called for WAL-logging relations, but parallell
+ * builds are an exception - we need the fake LSN to detect
+ * concurrent changes.
+ */
+ Assert(is_parallel || !RelationNeedsWAL(rel));
/* No need for an actual record if we already have a distinct LSN */
if (!XLogRecPtrIsInvalid(lastlsn) && lastlsn == currlsn)
diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c
index 24fb94f473e..082804e9c7d 100644
--- a/src/backend/access/gist/gistvacuum.c
+++ b/src/backend/access/gist/gistvacuum.c
@@ -181,7 +181,7 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
if (RelationNeedsWAL(rel))
vstate.startNSN = GetInsertRecPtr();
else
- vstate.startNSN = gistGetFakeLSN(rel);
+ vstate.startNSN = gistGetFakeLSN(rel, false);
/*
* The outer loop iterates over all index pages, in physical order (we
@@ -376,7 +376,7 @@ restart:
PageSetLSN(page, recptr);
}
else
- PageSetLSN(page, gistGetFakeLSN(rel));
+ PageSetLSN(page, gistGetFakeLSN(rel, false));
END_CRIT_SECTION();
@@ -664,7 +664,7 @@ gistdeletepage(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
if (RelationNeedsWAL(info->index))
recptr = gistXLogPageDelete(leafBuffer, txid, parentBuffer, downlink);
else
- recptr = gistGetFakeLSN(info->index);
+ recptr = gistGetFakeLSN(info->index, false);
PageSetLSN(parentPage, recptr);
PageSetLSN(leafPage, recptr);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 8613fc6fb54..7e09fc79c30 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/brin.h"
+#include "access/gist_private.h"
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
@@ -146,6 +147,9 @@ static const struct
{
"_brin_parallel_build_main", _brin_parallel_build_main
},
+ {
+ "_gist_parallel_build_main", _gist_parallel_build_main
+ },
{
"parallel_vacuum_main", parallel_vacuum_main
}
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 7b8749c8db0..d5b22bc1018 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -20,6 +20,7 @@
#include "lib/pairingheap.h"
#include "storage/bufmgr.h"
#include "storage/buffile.h"
+#include "storage/shm_toc.h"
#include "utils/hsearch.h"
#include "access/genam.h"
@@ -254,6 +255,7 @@ typedef struct
Relation heapRel;
Size freespace; /* free space to be left */
bool is_build;
+ bool is_parallel;
GISTInsertStack *stack;
} GISTInsertState;
@@ -413,7 +415,8 @@ extern void gistdoinsert(Relation r,
Size freespace,
GISTSTATE *giststate,
Relation heapRel,
- bool is_build);
+ bool is_build,
+ bool is_parallel);
/* A List of these is returned from gistplacetopage() in *splitinfo */
typedef struct
@@ -430,7 +433,8 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
List **splitinfo,
bool markfollowright,
Relation heapRel,
- bool is_build);
+ bool is_build,
+ bool is_parallel);
extern SplitPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate);
@@ -531,7 +535,7 @@ extern void gistMakeUnionKey(GISTSTATE *giststate, int attno,
GISTENTRY *entry2, bool isnull2,
Datum *dst, bool *dstisnull);
-extern XLogRecPtr gistGetFakeLSN(Relation rel);
+extern XLogRecPtr gistGetFakeLSN(Relation rel, bool is_parallel);
/* gistvacuum.c */
extern IndexBulkDeleteResult *gistbulkdelete(IndexVacuumInfo *info,
@@ -568,4 +572,6 @@ extern void gistRelocateBuildBuffersOnSplit(GISTBuildBuffers *gfbb,
List *splitinfo);
extern void gistUnloadNodeBuffers(GISTBuildBuffers *gfbb);
+extern void _gist_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
#endif /* GIST_PRIVATE_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 4f57078d133..a4985e98585 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -971,6 +971,7 @@ GISTInsertStack
GISTInsertState
GISTIntArrayBigOptions
GISTIntArrayOptions
+GISTLeader
GISTNodeBuffer
GISTNodeBufferPage
GISTPageOpaque
@@ -981,6 +982,7 @@ GISTScanOpaque
GISTScanOpaqueData
GISTSearchHeapItem
GISTSearchItem
+GISTShared
GISTTYPE
GIST_SPLITVEC
GMReaderTupleBuffer
--
2.45.0