Hi, I've done a number of experiments with the GiST parallel builds, both with the sorted and unsorted cases, so let me share some of the results and conclusions from that.
In the first post I did some benchmarks using btree_gist, but that seemed not very realistic - there certainly are much more widely used GiST indexes in the GIS world. So this time I used OpenStreetMap, loaded using osm2pgsql, with two dataset sizes: - small - "north america" (121GB without indexes) - large - "planet" (688GB without indexes) And then I created indexes using either gist_geometry_ops_2d (with sort) or gist_geometry_ops_nd (no sorting). On 6/7/24 19:41, Tomas Vondra wrote: > Hi, > > After looking into parallel builds for BRIN and GIN indexes, I was > wondering if there's a way to do parallel builds for GiST too. I knew > next to nothing about how GiST works, but I gave it a shot and here's > what I have - the attached patch allows parallel GiST builds for the > "unsorted" case (i.e. when the opclass does not include sortsupport), > and does not support buffered builds. > > > unsorted builds only > -------------------- > > Addressing only the unsorted case may seem a bit weird, but I did it > this way for two reasons - parallel sort is a solved problem, and adding > this to the patch seems quite straightforward. It's what btree does, for > example. But I also was not very sure how common this is - we do have > sort for points, but I have no idea if the PostGIS indexes define > sorting etc. My guess was "no" but I've been told that's no longer true, > so I guess sorted builds are more widely applicable than I thought. For sorted builds, I made the claim that parallelizing sorted builds is "solved problem" because we can use a parallel tuplesort. I was thinking that maybe it'd be better to do that in the initial patch, and only then introduce the more complex stuff in the unsorted case, so I gave this a try, and it turned to be rather pointless. Yes, parallel tuplesort does improve the duration, but it's not a very significant improvement - maybe 10% or so. Most of the build time is spent in gist_indexsortbuild(), so this is the part that would need to be parallelized for any substantial improvement. Only then is it useful to improve the tuplesort, I think. And parallelizing gist_indexsortbuild() is not trivial - most of the time is spent in gist_indexsortbuild_levelstate_flush() / gistSplit(), so ISTM a successful parallel implementation would need to divide this work between multiple workers. I don't have a clear idea how, though. I do have a PoC/WIP patch doing the paralle tuplesort in my github branch at [1] (and then also some ugly experiments on top of that), but I'm not going to attach it here because of the reasons I just explained. It'd be just a pointless distraction. > In any case, I'm not in a rush to parallelize sorted builds. It can be > added later, as an improvement, IMHO. In fact, it's a well isolated part > of the patch, which might make it a good choice for someone looking for > an idea for their first patch ... > I still think this assessment is correct - it's fine to not parallelize sorted builds. It can be improved in the future, or even not at all. > > buffered builds > --------------- > > The lack of support for buffered builds is a very different thing. The > basic idea is that we don't push the index entries all the way to the > leaf pages right away, but accumulate them in buffers half-way through. > This combines writes and reduces random I/O, which is nice. > > Unfortunately, the way it's implemented does not work with parallel > builds at all - all the state is in private memory, and it assumes the > worker is the only possible backend that can split the page (at which > point the buffers need to be split too, etc.). But for parallel builds > this is obviously not true. > > I'm not saying parallel builds can't do similar buffering, but it > requires moving the buffers into shared memory, and introducing locking > to coordinate accesses to the buffers. (Or perhaps it might be enough to > only "notify" the workers about page splits, with buffers still in > private memory?). Anyway, it seems far too complicated for v1. > > In fact, I'm not sure the buffering is entirely necessary - maybe the > increase in amount of RAM makes this less of an issue? If the index can > fit into shared buffers (or at least page cache), maybe the amount of > extra I/O is not that bad? I'm sure there may be cases really affected > by this, but maybe it's OK to tell people to disable parallel builds in > those cases? > For unsorted builds, here's the results from one of the machines for duration of CREATE INDEX with the requested number of workers (0 means serial build) for different tables in the OSM databases: db type size (MB) | 0 1 2 3 4 -----------------------------|---------------------------------- small line 4889 | 811 429 294 223 186 point 2625 | 485 262 179 141 125 polygon 7644 | 1230 623 418 318 261 roads 273 | 40 22 16 14 12 -----------------------------|---------------------------------- large line 20592 | 3916 2157 1479 1137 948 point 13080 | 2636 1442 981 770 667 polygon 50598 | 10990 5648 3860 2991 2504 roads 1322 | 228 123 85 67 56 There's also the size of the index. If we calculate the speedup compared to serial build, we get this: db type | 1 2 3 4 -----------------|-------------------------------- small line | 1.9 2.8 3.6 4.4 point | 1.9 2.7 3.4 3.9 polygon | 2.0 2.9 3.9 4.7 roads | 1.8 2.5 2.9 3.3 -----------------|-------------------------------- large line | 1.8 2.6 3.4 4.1 point | 1.8 2.7 3.4 4.0 polygon | 1.9 2.8 3.7 4.4 roads | 1.9 2.7 3.4 4.1 Remember, the leader participates in the build, so K workers means K+1 processes are doing the work. And the speedup is pretty close to the ideal speedup. There's the question about buffering, though - as mentioned in the first patch, the parallel builds do not support buffering, so the question is how bad is the impact of that. Clearly, the duration improves a lot, so that's good, but maybe it did write out far more buffers and the NVMe drive handled it well? So I used pg_stat_statements to track the number of buffer writes (shared_blks_written) for the CREATE INDEX, and for the large data set it looks like this (this is in MBs written): size | 0 1 2 3 4 ----------------|-------------------------------------------- line 20592 | 43577 47580 49574 50388 50734 point 13080 | 23331 25721 26399 26745 26889 polygon 50598 | 113108 125095 129599 130170 131249 roads 1322 | 1322 1310 1305 1300 1295 The serial builds (0 workers) are buffered, but the buffering only applies for indexes that exceed effective_cache_size (4GB). Which means the "roads" buffer is too small to activate buffering, and there should be very little differences - which is the case (but the index also fits into shared buffers in this case). The other indexes do activate buffering, so the question is how many more buffers get written out compared to serial builds (with buffering). And the comparison looks like this: 1 2 3 4 ------------------------------------------ line 109% 114% 116% 116% point 110% 113% 115% 115% polygon 111% 115% 115% 116% roads 99% 99% 98% 98% So it writes about 15-20% more buffers during the index build, which is not that much IMHO. I was wondering if this might change with smaller shared buffers, so I tried building indexes on the smaller data set with 128MB shared buffers, but the difference remained to be ~15-20%. My conclusion from this is that it's OK to have parallel builds without buffering, and then maybe improve that later. The thing I'm not sure about is how this should interact with the "buffering" option. Right now we just ignore that entirely if we decide to do parallel build. But maybe it'd be better to disable parallel builds when the user specifies "buffering=on" (and only allow parallel builds with off/auto)? I did check how parallelism affects the amount of WAL produced, but that's pretty much exactly how I described that in the initial message, including the "strange" decrease with more workers due to reusing the fake LSN etc. regards [1] https://github.com/tvondra/postgres/tree/parallel-gist-20240625 -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
From 4a00f2505e333f1c160040478ed81a0450514b11 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <t...@fuzzy.cz> Date: Sun, 26 May 2024 21:44:27 +0200 Subject: [PATCH v20240701] WIP parallel GiST build Implements parallel GiST index build for the unsorted case. The build simply starts parallel workers that insert values into the index the usual way (as if there were multiple clients doing INSERT). The basic infrastructure is copied from parallel BRIN builds (and also from the nearby parallel GIN build). There's nothing particularly special or interesting, except for the gistBuildParallelCallback() callback. The two significant changes in the callback are: 1) disabling buffering Buffered builds assume the worker is the only backend that can split index pages etc. With serial workers that is trivially true, but with parallel workers this leads to confusion. In principle this is solvable by moving the buffers into shared memory and coordinating the workers (locking etc.). But the patch does not do that yet - it's clearly non-trivial, and I'm not really convinced it's worth it. 2) generating "proper" fake LSNs The serial builds disable all WAL-logging for the index, until the very end when the whole index is WAL-logged. This however also means we don't set page LSNs on the index pages - but page LSNs are used to detect concurrent changes to the index structure (e.g. page splits). For serial builds this does not matter, because only the build worker can modify the index, so it just sets the same LSN "1" for all pages. Both of this (disabling WAL-logging and using bogus page LSNs) is controlled by the same flag is_build. Having the same page LSN does not work for parallel builds, as it would mean workers won't notice splits done by other workers, etc. One option would be to set is_bild=false, which enables WAL-logging, as if during regular inserts, and also assigns proper page LSNs. But we don't want to WAL-log everything, that's unnecessary. We want to only start WAL-logging the index once the build completes, just like for serial builds. And only do the fake LSNs, as for unlogged indexes etc. So this introduces a separate flag is_parallel, which forces generating the "proper" fake LSN. But we can still do is_build=true, and only log the index at the end of the build. --- src/backend/access/gist/gist.c | 37 +- src/backend/access/gist/gistbuild.c | 713 +++++++++++++++++++++++++- src/backend/access/gist/gistutil.c | 10 +- src/backend/access/gist/gistvacuum.c | 6 +- src/backend/access/transam/parallel.c | 4 + src/include/access/gist_private.h | 12 +- src/tools/pgindent/typedefs.list | 2 + 7 files changed, 739 insertions(+), 45 deletions(-) diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index ed4ffa63a77..f5f56fb2503 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -75,7 +75,7 @@ gisthandler(PG_FUNCTION_ARGS) amroutine->amclusterable = true; amroutine->ampredlocks = true; amroutine->amcanparallel = false; - amroutine->amcanbuildparallel = false; + amroutine->amcanbuildparallel = true; amroutine->amcaninclude = true; amroutine->amusemaintenanceworkmem = false; amroutine->amsummarizing = false; @@ -182,7 +182,7 @@ gistinsert(Relation r, Datum *values, bool *isnull, values, isnull, true /* size is currently bogus */ ); itup->t_tid = *ht_ctid; - gistdoinsert(r, itup, 0, giststate, heapRel, false); + gistdoinsert(r, itup, 0, giststate, heapRel, false, false); /* cleanup */ MemoryContextSwitchTo(oldCxt); @@ -230,7 +230,8 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, List **splitinfo, bool markfollowright, Relation heapRel, - bool is_build) + bool is_build, + bool is_parallel) { BlockNumber blkno = BufferGetBlockNumber(buffer); Page page = BufferGetPage(buffer); @@ -501,9 +502,17 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, * smaller than any real or fake unlogged LSN that might be generated * later. (There can't be any concurrent scans during index build, so * we don't need to be able to detect concurrent splits yet.) + * + * However, with a parallel index build, we need to assign valid LSN, + * as it's used to detect concurrent index modifications. */ if (is_build) - recptr = GistBuildLSN; + { + if (is_parallel) + recptr = gistGetFakeLSN(rel, is_parallel); + else + recptr = GistBuildLSN; + } else { if (RelationNeedsWAL(rel)) @@ -511,7 +520,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, dist, oldrlink, oldnsn, leftchildbuf, markfollowright); else - recptr = gistGetFakeLSN(rel); + recptr = gistGetFakeLSN(rel, false); } for (ptr = dist; ptr; ptr = ptr->next) @@ -570,7 +579,12 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, MarkBufferDirty(leftchildbuf); if (is_build) - recptr = GistBuildLSN; + { + if (is_parallel) + recptr = gistGetFakeLSN(rel, is_parallel); + else + recptr = GistBuildLSN; + } else { if (RelationNeedsWAL(rel)) @@ -589,7 +603,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, leftchildbuf); } else - recptr = gistGetFakeLSN(rel); + recptr = gistGetFakeLSN(rel, false); } PageSetLSN(page, recptr); @@ -632,7 +646,8 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, */ void gistdoinsert(Relation r, IndexTuple itup, Size freespace, - GISTSTATE *giststate, Relation heapRel, bool is_build) + GISTSTATE *giststate, Relation heapRel, bool is_build, + bool is_parallel) { ItemId iid; IndexTuple idxtuple; @@ -646,6 +661,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, state.r = r; state.heapRel = heapRel; state.is_build = is_build; + state.is_parallel = is_parallel; /* Start from the root */ firststack.blkno = GIST_ROOT_BLKNO; @@ -1303,7 +1319,8 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, &splitinfo, true, state->heapRel, - state->is_build); + state->is_build, + state->is_parallel); /* * Before recursing up in case the page was split, release locks on the @@ -1722,7 +1739,7 @@ gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel) PageSetLSN(page, recptr); } else - PageSetLSN(page, gistGetFakeLSN(rel)); + PageSetLSN(page, gistGetFakeLSN(rel, false)); END_CRIT_SECTION(); } diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c index ba06df30faf..c8fa67beebb 100644 --- a/src/backend/access/gist/gistbuild.c +++ b/src/backend/access/gist/gistbuild.c @@ -36,18 +36,28 @@ #include "access/genam.h" #include "access/gist_private.h" +#include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" +#include "catalog/index.h" +#include "commands/vacuum.h" #include "miscadmin.h" #include "nodes/execnodes.h" +#include "pgstat.h" #include "optimizer/optimizer.h" #include "storage/bufmgr.h" #include "storage/bulk_write.h" - +#include "tcop/tcopprot.h" /* pgrminclude ignore */ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/tuplesort.h" +/* Magic numbers for parallel state sharing */ +#define PARALLEL_KEY_GIST_SHARED UINT64CONST(0xB000000000000001) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000002) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000004) + /* Step of index tuples for check whether to switch to buffering build mode */ #define BUFFERING_MODE_SWITCH_CHECK_STEP 256 @@ -78,6 +88,106 @@ typedef enum GIST_BUFFERING_ACTIVE, /* in buffering build mode */ } GistBuildMode; +/* + * Status for index builds performed in parallel. This is allocated in a + * dynamic shared memory segment. + */ +typedef struct GISTShared +{ + /* + * These fields are not modified during the build. They primarily exist + * for the benefit of worker processes that need to create state + * corresponding to that used by the leader. + * + * XXX nparticipants is the number or workers we expect to participage in + * the build, possibly including the leader process. + */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int nparticipants; + + /* Parameters determined by the leader, passed to the workers. */ + GistBuildMode buildMode; + int freespace; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before leader can finish + * building the index. + */ + ConditionVariable workersdonecv; + + /* + * mutex protects all fields before heapdesc. + * + * These fields contain status information of interest to GIST index + * builds that must work just the same when an index is built in parallel. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers, and reported back to + * leader at end of the scans. + * + * nparticipantsdone is number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * ParallelTableScanDescData data follows. Can't directly embed here, as + * implementations of the parallel table scan desc interface might need + * stronger alignment. + */ +} GISTShared; + +/* + * Return pointer to a GISTShared's parallel table scan. + * + * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just + * MAXALIGN. + */ +#define ParallelTableScanFromGistShared(shared) \ + (ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(GISTShared))) + +/* + * Status for leader in parallel index build. + */ +typedef struct GISTLeader +{ + /* parallel context itself */ + ParallelContext *pcxt; + + /* + * nparticipants is the exact number of worker processes successfully + * launched, plus one leader process if it participates as a worker (only + * DISABLE_LEADER_PARTICIPATION builds avoid leader participating as a + * worker). + * + * XXX Seems a bit redundant with nparticipants in GISTShared. Although + * that is the expected number, this is what we actually got. + */ + int nparticipants; + + /* + * Leader process convenience pointers to shared state (leader avoids TOC + * lookups). + * + * GISTShared is the shared state for entire build. snapshot is the + * snapshot used by the scan iff an MVCC snapshot is required. + */ + GISTShared *gistshared; + Snapshot snapshot; + WalUsage *walusage; + BufferUsage *bufferusage; +} GISTLeader; + /* Working state for gistbuild and its callback */ typedef struct { @@ -100,6 +210,14 @@ typedef struct GISTBuildBuffers *gfbb; HTAB *parentMap; + /* + * gist_leader is only present when a parallel index build is performed, + * and only in the leader process. (Actually, only the leader process has + * a GISTBuildState.) + */ + bool is_parallel; + GISTLeader *gist_leader; + /* * Extra data structures used during a sorting build. */ @@ -148,6 +266,12 @@ static void gistBuildCallback(Relation index, bool *isnull, bool tupleIsAlive, void *state); +static void gistBuildParallelCallback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *state); static void gistBufferingBuildInsert(GISTBuildState *buildstate, IndexTuple itup); static bool gistProcessItup(GISTBuildState *buildstate, IndexTuple itup, @@ -171,6 +295,18 @@ static void gistMemorizeAllDownlinks(GISTBuildState *buildstate, Buffer parentbuf); static BlockNumber gistGetParent(GISTBuildState *buildstate, BlockNumber child); +/* parallel index builds */ +static void _gist_begin_parallel(GISTBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request); +static void _gist_end_parallel(GISTLeader *gistleader, GISTBuildState *state); +static Size _gist_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static double _gist_parallel_heapscan(GISTBuildState *buildstate); +static void _gist_leader_participate_as_worker(GISTBuildState *buildstate, + Relation heap, Relation index); +static void _gist_parallel_scan_and_build(GISTBuildState *buildstate, + GISTShared *gistshared, + Relation heap, Relation index, + int workmem, bool progress); /* * Main entry point to GiST index build. @@ -199,6 +335,10 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) buildstate.sortstate = NULL; buildstate.giststate = initGISTstate(index); + /* assume serial build */ + buildstate.is_parallel = false; + buildstate.gist_leader = NULL; + /* * Create a temporary memory context that is reset once for each tuple * processed. (Note: we don't bother to make this a child of the @@ -309,37 +449,79 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) END_CRIT_SECTION(); - /* Scan the table, inserting all the tuples to the index. */ - reltuples = table_index_build_scan(heap, index, indexInfo, true, true, - gistBuildCallback, - (void *) &buildstate, NULL); - /* - * If buffering was used, flush out all the tuples that are still in - * the buffers. + * Attempt to launch parallel worker scan when required + * + * XXX plan_create_index_workers makes the number of workers dependent + * on maintenance_work_mem, requiring 32MB for each worker. That makes + * sense for btree, but maybe not for GIST (at least when not using + * buffering)? So maybe make that somehow less strict, optionally? */ - if (buildstate.buildMode == GIST_BUFFERING_ACTIVE) - { - elog(DEBUG1, "all tuples processed, emptying buffers"); - gistEmptyAllBuffers(&buildstate); - gistFreeBuildBuffers(buildstate.gfbb); - } + if (indexInfo->ii_ParallelWorkers > 0) + _gist_begin_parallel(&buildstate, heap, + index, indexInfo->ii_Concurrent, + indexInfo->ii_ParallelWorkers); /* - * We didn't write WAL records as we built the index, so if - * WAL-logging is required, write all pages to the WAL now. + * If parallel build requested and at least one worker process was + * successfully launched, set up coordination state, wait for workers + * to complete and end the parallel build. + * + * In serial mode, simply scan the table and build the index one index + * tuple at a time. */ - if (RelationNeedsWAL(index)) + if (buildstate.gist_leader) { - log_newpage_range(index, MAIN_FORKNUM, - 0, RelationGetNumberOfBlocks(index), - true); + /* scan the relation and wait for parallel workers to finish */ + reltuples = _gist_parallel_heapscan(&buildstate); + + _gist_end_parallel(buildstate.gist_leader, &buildstate); + + /* + * We didn't write WAL records as we built the index, so if WAL-logging is + * required, write all pages to the WAL now. + */ + if (RelationNeedsWAL(index)) + { + log_newpage_range(index, MAIN_FORKNUM, + 0, RelationGetNumberOfBlocks(index), + true); + } } - } + else + { + /* Scan the table, inserting all the tuples to the index. */ + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + gistBuildCallback, + (void *) &buildstate, NULL); - /* okay, all heap tuples are indexed */ - MemoryContextSwitchTo(oldcxt); - MemoryContextDelete(buildstate.giststate->tempCxt); + /* + * If buffering was used, flush out all the tuples that are still + * in the buffers. + */ + if (buildstate.buildMode == GIST_BUFFERING_ACTIVE) + { + elog(DEBUG1, "all tuples processed, emptying buffers"); + gistEmptyAllBuffers(&buildstate); + gistFreeBuildBuffers(buildstate.gfbb); + } + + /* + * We didn't write WAL records as we built the index, so if + * WAL-logging is required, write all pages to the WAL now. + */ + if (RelationNeedsWAL(index)) + { + log_newpage_range(index, MAIN_FORKNUM, + 0, RelationGetNumberOfBlocks(index), + true); + } + + /* okay, all heap tuples are indexed */ + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(buildstate.giststate->tempCxt); + } + } freeGISTstate(buildstate.giststate); @@ -861,7 +1043,7 @@ gistBuildCallback(Relation index, * locked, we call gistdoinsert directly. */ gistdoinsert(index, itup, buildstate->freespace, - buildstate->giststate, buildstate->heaprel, true); + buildstate->giststate, buildstate->heaprel, true, false); } MemoryContextSwitchTo(oldCtx); @@ -900,6 +1082,48 @@ gistBuildCallback(Relation index, } } +/* + * Per-tuple callback for table_index_build_scan. + * + * XXX Almost the same as gistBuildCallback, but with is_build=false when + * calling gistdoinsert. Otherwise we get assert failures due to workers + * modifying the index concurrently. + */ +static void +gistBuildParallelCallback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *state) +{ + GISTBuildState *buildstate = (GISTBuildState *) state; + IndexTuple itup; + MemoryContext oldCtx; + + oldCtx = MemoryContextSwitchTo(buildstate->giststate->tempCxt); + + /* form an index tuple and point it at the heap tuple */ + itup = gistFormTuple(buildstate->giststate, index, + values, isnull, + true); + itup->t_tid = *tid; + + /* Update tuple count and total size. */ + buildstate->indtuples += 1; + buildstate->indtuplesSize += IndexTupleSize(itup); + + /* + * There's no buffers (yet). Since we already have the index relation + * locked, we call gistdoinsert directly. + */ + gistdoinsert(index, itup, buildstate->freespace, + buildstate->giststate, buildstate->heaprel, true, true); + + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(buildstate->giststate->tempCxt); +} + /* * Insert function for buffering index build. */ @@ -1068,7 +1292,8 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level, InvalidBuffer, &splitinfo, false, - buildstate->heaprel, true); + buildstate->heaprel, true, + buildstate->is_parallel); /* * If this is a root split, update the root path item kept in memory. This @@ -1577,3 +1802,439 @@ gistGetParent(GISTBuildState *buildstate, BlockNumber child) return entry->parentblkno; } + +/* + * Create parallel context, and launch workers for leader. + * + * buildstate argument should be initialized + * + * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY. + * + * request is the target number of parallel worker processes to launch. + * + * Sets buildstate's gistLeader, which caller must use to shut down parallel + * mode by passing it to _gist_end_parallel() at the very end of its index + * build. If not even a single worker process can be launched, this is + * never set, and caller should proceed with a serial index build. + */ +static void +_gist_begin_parallel(GISTBuildState *buildstate, Relation heap, Relation index, + bool isconcurrent, int request) +{ + ParallelContext *pcxt; + int nparticipants; + Snapshot snapshot; + Size estgistshared; + GISTShared *gistshared; + GISTLeader *gistleader = (GISTLeader *) palloc0(sizeof(GISTLeader)); + WalUsage *walusage; + BufferUsage *bufferusage; + bool leaderparticipates = true; + int querylen; + +#ifdef DISABLE_LEADER_PARTICIPATION + leaderparticipates = false; +#endif + + /* + * Enter parallel mode, and create context for parallel build of GIST + * index + */ + EnterParallelMode(); + Assert(request > 0); + pcxt = CreateParallelContext("postgres", "_gist_parallel_build_main", + request); + + nparticipants = leaderparticipates ? request + 1 : request; + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, we take a regular MVCC snapshot and index whatever's + * live according to that. + */ + if (!isconcurrent) + snapshot = SnapshotAny; + else + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + /* + * Estimate size for our own PARALLEL_KEY_GIST_SHARED workspace. + */ + estgistshared = _gist_parallel_estimate_shared(heap, snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, estgistshared); + + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE + * and PARALLEL_KEY_BUFFER_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgWalUsage or + * pgBufferUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + /* Everyone's had a chance to ask for space, so now create the DSM */ + InitializeParallelDSM(pcxt); + + /* If no DSM segment was available, back out (do serial build) */ + if (pcxt->seg == NULL) + { + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); + return; + } + + /* Store shared build state, for which we reserved space */ + gistshared = (GISTShared *) shm_toc_allocate(pcxt->toc, estgistshared); + /* Initialize immutable state */ + gistshared->heaprelid = RelationGetRelid(heap); + gistshared->indexrelid = RelationGetRelid(index); + gistshared->isconcurrent = isconcurrent; + gistshared->nparticipants = nparticipants; + + /* */ + gistshared->buildMode = buildstate->buildMode; + gistshared->freespace = buildstate->freespace; + + ConditionVariableInit(&gistshared->workersdonecv); + SpinLockInit(&gistshared->mutex); + + /* Initialize mutable state */ + gistshared->nparticipantsdone = 0; + gistshared->reltuples = 0.0; + gistshared->indtuples = 0.0; + + table_parallelscan_initialize(heap, + ParallelTableScanFromGistShared(gistshared), + snapshot); + + /* Store shared state, for which we reserved space. */ + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GIST_SHARED, gistshared); + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); + } + + /* + * Allocate space for each worker's WalUsage and BufferUsage; no need to + * initialize. + */ + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); + bufferusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + + /* Launch workers, saving status for leader/caller */ + LaunchParallelWorkers(pcxt); + gistleader->pcxt = pcxt; + gistleader->nparticipants = pcxt->nworkers_launched; + if (leaderparticipates) + gistleader->nparticipants++; + gistleader->gistshared = gistshared; + gistleader->snapshot = snapshot; + gistleader->walusage = walusage; + gistleader->bufferusage = bufferusage; + + /* If no workers were successfully launched, back out (do serial build) */ + if (pcxt->nworkers_launched == 0) + { + _gist_end_parallel(gistleader, NULL); + return; + } + + /* Save leader state now that it's clear build will be parallel */ + buildstate->is_parallel = true; + buildstate->gist_leader = gistleader; + + /* Join heap scan ourselves */ + if (leaderparticipates) + _gist_leader_participate_as_worker(buildstate, heap, index); + + /* + * Caller needs to wait for all launched workers when we return. Make + * sure that the failure-to-start case will not hang forever. + */ + WaitForParallelWorkersToAttach(pcxt); +} + +/* + * Shut down workers, destroy parallel context, and end parallel mode. + */ +static void +_gist_end_parallel(GISTLeader *gistleader, GISTBuildState *state) +{ + int i; + + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(gistleader->pcxt); + + /* + * Next, accumulate WAL usage. (This must wait for the workers to finish, + * or we might get incomplete data.) + */ + for (i = 0; i < gistleader->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&gistleader->bufferusage[i], &gistleader->walusage[i]); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(gistleader->snapshot)) + UnregisterSnapshot(gistleader->snapshot); + DestroyParallelContext(gistleader->pcxt); + ExitParallelMode(); +} + +/* + * Within leader, wait for end of heap scan. + * + * When called, parallel heap scan started by _gist_begin_parallel() will + * already be underway within worker processes (when leader participates + * as a worker, we should end up here just as workers are finishing). + * + * Returns the total number of heap tuples scanned. + * + * FIXME Maybe needs to flush data if GIST_BUFFERING_ACTIVE, a bit like in + * the serial build? + */ +static double +_gist_parallel_heapscan(GISTBuildState *state) +{ + GISTShared *gistshared = state->gist_leader->gistshared; + int nparticipants; + + nparticipants = state->gist_leader->nparticipants; + for (;;) + { + SpinLockAcquire(&gistshared->mutex); + if (gistshared->nparticipantsdone == nparticipants) + { + /* copy the data into leader state */ + state->indtuples = gistshared->indtuples; + + SpinLockRelease(&gistshared->mutex); + break; + } + SpinLockRelease(&gistshared->mutex); + + ConditionVariableSleep(&gistshared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); + + return state->indtuples; +} + + + +/* + * Returns size of shared memory required to store state for a parallel + * gist index build based on the snapshot its parallel scan will use. + */ +static Size +_gist_parallel_estimate_shared(Relation heap, Snapshot snapshot) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(sizeof(GISTShared)), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Within leader, participate as a parallel worker. + */ +static void +_gist_leader_participate_as_worker(GISTBuildState *buildstate, + Relation heap, Relation index) +{ + GISTLeader *gistleader = buildstate->gist_leader; + int workmem; + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + workmem = maintenance_work_mem / gistleader->nparticipants; + + /* Perform work common to all participants */ + _gist_parallel_scan_and_build(buildstate, gistleader->gistshared, + heap, index, workmem, true); +} + +/* + * Perform a worker's portion of a parallel scan and insert. + * + * When this returns, workers are done, and need only release resources. + */ +static void +_gist_parallel_scan_and_build(GISTBuildState *state, + GISTShared *gistshared, + Relation heap, Relation index, + int workmem, bool progress) +{ + TableScanDesc scan; + double reltuples; + IndexInfo *indexInfo; + MemoryContext oldcxt = CurrentMemoryContext; + + /* Join parallel scan */ + indexInfo = BuildIndexInfo(index); + indexInfo->ii_Concurrent = gistshared->isconcurrent; + + scan = table_beginscan_parallel(heap, + ParallelTableScanFromGistShared(gistshared)); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + gistBuildParallelCallback, state, scan); + + /* + * If buffering was used, flush out all the tuples that are still in the + * buffers. + */ + if (state->buildMode == GIST_BUFFERING_ACTIVE) + { + elog(DEBUG1, "all tuples processed, emptying buffers"); + gistEmptyAllBuffers(state); + gistFreeBuildBuffers(state->gfbb); + } + + /* okay, all heap tuples are indexed */ + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(state->giststate->tempCxt); + + /* FIXME Do we need to do something else with active buffering? */ + + /* + * Done. Record ambuild statistics. + */ + SpinLockAcquire(&gistshared->mutex); + gistshared->nparticipantsdone++; + gistshared->reltuples += reltuples; + gistshared->indtuples += state->indtuples; + SpinLockRelease(&gistshared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&gistshared->workersdonecv); +} + +/* + * Perform work within a launched parallel process. + */ +void +_gist_parallel_build_main(dsm_segment *seg, shm_toc *toc) +{ + char *sharedquery; + GISTShared *gistshared; + GISTBuildState buildstate; + Relation heapRel; + Relation indexRel; + LOCKMODE heapLockmode; + LOCKMODE indexLockmode; + WalUsage *walusage; + BufferUsage *bufferusage; + int workmem; + + /* + * The only possible status flag that can be set to the parallel worker is + * PROC_IN_SAFE_IC. + */ + Assert((MyProc->statusFlags == 0) || + (MyProc->statusFlags == PROC_IN_SAFE_IC)); + + /* Set debug_query_string for individual workers first */ + sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + + /* Report the query string from leader */ + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* Look up GIST shared state */ + gistshared = shm_toc_lookup(toc, PARALLEL_KEY_GIST_SHARED, false); + + /* Open relations using lock modes known to be obtained by index.c */ + if (!gistshared->isconcurrent) + { + heapLockmode = ShareLock; + indexLockmode = AccessExclusiveLock; + } + else + { + heapLockmode = ShareUpdateExclusiveLock; + indexLockmode = RowExclusiveLock; + } + + /* Open relations within worker */ + heapRel = table_open(gistshared->heaprelid, heapLockmode); + indexRel = index_open(gistshared->indexrelid, indexLockmode); + + buildstate.indexrel = indexRel; + buildstate.heaprel = heapRel; + buildstate.sortstate = NULL; + buildstate.giststate = initGISTstate(indexRel); + + buildstate.is_parallel = true; + buildstate.gist_leader = NULL; + + /* + * Create a temporary memory context that is reset once for each tuple + * processed. (Note: we don't bother to make this a child of the + * giststate's scanCxt, so we have to delete it separately at the end.) + */ + buildstate.giststate->tempCxt = createTempGistContext(); + + /* FIXME */ + buildstate.buildMode = gistshared->buildMode; + buildstate.freespace = gistshared->freespace; + + buildstate.indtuples = 0; + buildstate.indtuplesSize = 0; + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* + * Might as well use reliable figure when doling out maintenance_work_mem + * (when requested number of workers were not launched, this will be + * somewhat higher than it is for other workers). + */ + workmem = maintenance_work_mem / gistshared->nparticipants; + + _gist_parallel_scan_and_build(&buildstate, gistshared, + heapRel, indexRel, workmem, false); + + /* Report WAL/buffer usage during parallel execution */ + bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); + + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 78e98d68b15..733d5849317 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -1012,7 +1012,7 @@ gistproperty(Oid index_oid, int attno, * purpose. */ XLogRecPtr -gistGetFakeLSN(Relation rel) +gistGetFakeLSN(Relation rel, bool is_parallel) { if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) { @@ -1035,8 +1035,12 @@ gistGetFakeLSN(Relation rel) static XLogRecPtr lastlsn = InvalidXLogRecPtr; XLogRecPtr currlsn = GetXLogInsertRecPtr(); - /* Shouldn't be called for WAL-logging relations */ - Assert(!RelationNeedsWAL(rel)); + /* + * Shouldn't be called for WAL-logging relations, but parallell + * builds are an exception - we need the fake LSN to detect + * concurrent changes. + */ + Assert(is_parallel || !RelationNeedsWAL(rel)); /* No need for an actual record if we already have a distinct LSN */ if (!XLogRecPtrIsInvalid(lastlsn) && lastlsn == currlsn) diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index 24fb94f473e..082804e9c7d 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -181,7 +181,7 @@ gistvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, if (RelationNeedsWAL(rel)) vstate.startNSN = GetInsertRecPtr(); else - vstate.startNSN = gistGetFakeLSN(rel); + vstate.startNSN = gistGetFakeLSN(rel, false); /* * The outer loop iterates over all index pages, in physical order (we @@ -376,7 +376,7 @@ restart: PageSetLSN(page, recptr); } else - PageSetLSN(page, gistGetFakeLSN(rel)); + PageSetLSN(page, gistGetFakeLSN(rel, false)); END_CRIT_SECTION(); @@ -664,7 +664,7 @@ gistdeletepage(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, if (RelationNeedsWAL(info->index)) recptr = gistXLogPageDelete(leafBuffer, txid, parentBuffer, downlink); else - recptr = gistGetFakeLSN(info->index); + recptr = gistGetFakeLSN(info->index, false); PageSetLSN(parentPage, recptr); PageSetLSN(leafPage, recptr); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 8613fc6fb54..7e09fc79c30 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/brin.h" +#include "access/gist_private.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -146,6 +147,9 @@ static const struct { "_brin_parallel_build_main", _brin_parallel_build_main }, + { + "_gist_parallel_build_main", _gist_parallel_build_main + }, { "parallel_vacuum_main", parallel_vacuum_main } diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 7b8749c8db0..d5b22bc1018 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -20,6 +20,7 @@ #include "lib/pairingheap.h" #include "storage/bufmgr.h" #include "storage/buffile.h" +#include "storage/shm_toc.h" #include "utils/hsearch.h" #include "access/genam.h" @@ -254,6 +255,7 @@ typedef struct Relation heapRel; Size freespace; /* free space to be left */ bool is_build; + bool is_parallel; GISTInsertStack *stack; } GISTInsertState; @@ -413,7 +415,8 @@ extern void gistdoinsert(Relation r, Size freespace, GISTSTATE *giststate, Relation heapRel, - bool is_build); + bool is_build, + bool is_parallel); /* A List of these is returned from gistplacetopage() in *splitinfo */ typedef struct @@ -430,7 +433,8 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, List **splitinfo, bool markfollowright, Relation heapRel, - bool is_build); + bool is_build, + bool is_parallel); extern SplitPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate); @@ -531,7 +535,7 @@ extern void gistMakeUnionKey(GISTSTATE *giststate, int attno, GISTENTRY *entry2, bool isnull2, Datum *dst, bool *dstisnull); -extern XLogRecPtr gistGetFakeLSN(Relation rel); +extern XLogRecPtr gistGetFakeLSN(Relation rel, bool is_parallel); /* gistvacuum.c */ extern IndexBulkDeleteResult *gistbulkdelete(IndexVacuumInfo *info, @@ -568,4 +572,6 @@ extern void gistRelocateBuildBuffersOnSplit(GISTBuildBuffers *gfbb, List *splitinfo); extern void gistUnloadNodeBuffers(GISTBuildBuffers *gfbb); +extern void _gist_parallel_build_main(dsm_segment *seg, shm_toc *toc); + #endif /* GIST_PRIVATE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6c1caf6498..54376940b74 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -971,6 +971,7 @@ GISTInsertStack GISTInsertState GISTIntArrayBigOptions GISTIntArrayOptions +GISTLeader GISTNodeBuffer GISTNodeBufferPage GISTPageOpaque @@ -981,6 +982,7 @@ GISTScanOpaque GISTScanOpaqueData GISTSearchHeapItem GISTSearchItem +GISTShared GISTTYPE GIST_SPLITVEC GMReaderTupleBuffer -- 2.45.2
gist-i5.tgz
Description: application/compressed-tar
gist-xeon.tgz
Description: application/compressed-tar