On Mon, 14 Mar 2016 08:42:26 -0400
David Steele <da...@pgmasters.net> wrote:
> On 2/18/16 10:10 AM, Constantin S. Pan wrote:
> > On Wed, 17 Feb 2016 23:01:47 +0300
> > Oleg Bartunov <obartu...@gmail.com> wrote:
> >
> >> My feedback is (Mac OS X 10.11.3)
> >>
> >> set gin_parallel_workers=2;
> >> create index message_body_idx on messages using gin(body_tsvector);
> >> LOG: worker process: parallel worker for PID 5689 (PID 6906) was
> >> terminated by signal 11: Segmentation fault
> >
> > Fixed this, try the new patch. The bug was in incorrect handling
> > of some GIN categories.
>
> Oleg, it looks like Constantin has updated to patch to address the
> issue you were seeing. Do you have time to retest and review?
>
> Thanks,
Actually, there was some progress since. The patch is
attached.
1. Added another GUC parameter for changing the amount of
shared memory for parallel GIN workers.
2. Changed the way results are merged. It uses shared memory
message queue now.
3. Tested on some real data (GIN index on email message body
tsvectors). Here are the timings for different values of
'gin_shared_mem' and 'gin_parallel_workers' on a 4-CPU
machine. Seems 'gin_shared_mem' has no visible effect.
wnum mem(MB) time(s)
0 16 247
1 16 256
2 16 126
4 16 89
0 32 247
1 32 270
2 32 123
4 32 92
0 64 254
1 64 272
2 64 123
4 64 88
0 128 250
1 128 263
2 128 126
4 128 85
0 256 247
1 256 269
2 256 130
4 256 88
0 512 257
1 512 275
2 512 129
4 512 92
0 1024 255
1 1024 273
2 1024 130
4 1024 90
On Wed, 17 Feb 2016 12:26:05 -0800
Peter Geoghegan <p...@heroku.com> wrote:
> On Wed, Feb 17, 2016 at 7:55 AM, Constantin S. Pan <kva...@gmail.com>
> wrote:
> > 4. Hit the 8x speedup limit. Made some analysis of the reasons (see
> > the attached plot or the data file).
>
> Did you actually compare this to the master branch? I wouldn't like to
> assume that the one worker case was equivalent. Obviously that's the
> really interesting baseline.
Compared with the master branch. The case of 0 workers is
indeed equivalent to the master branch.
Regards,
Constantin
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..ff267b3 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,14 +16,21 @@
#include "access/gin_private.h"
#include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
#include "utils/memutils.h"
#include "utils/rel.h"
+/* GUC parameter */
+int gin_parallel_workers = 0;
+int gin_shared_mem = 0;
typedef struct
{
@@ -35,7 +42,6 @@ typedef struct
BuildAccumulator accum;
} GinBuildState;
-
/*
* Adds array of item pointers to tuple's posting list, or
* creates posting tree and tuple pointing to tree in case
@@ -265,6 +271,169 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
MemoryContextReset(buildstate->funcCtx);
}
+#define KEY_TASK 42
+#define KEY_SHM_ORIGIN 43
+#define KEY_SHM_PER_WORKER 44
+#define GIN_MAX_WORKERS 24
+#define GIN_BLOCKS_PER_WORKER 4
+
+/*
+ * The shmem message structure:
+ * Entry, Key, List
+ */
+
+typedef struct {
+ GinNullCategory category;
+ OffsetNumber attnum;
+ int nlist;
+} GinShmemEntry;
+
+typedef struct {
+ void *mq;
+ shm_mq_handle *mqh;
+ bool end_of_tree;
+ bool end_of_forest;
+
+ void *msg_body;
+ Size msg_len;
+ Datum key;
+ int skipkey;
+} WorkerResult;
+
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct {
+ int to_scan;
+ int scanned;
+ slock_t lock;
+ Oid heap_oid;
+ Oid index_oid;
+ double reltuples;
+ WorkerResult results[GIN_MAX_WORKERS];
+} PGinBuildTask;
+
+static volatile PGinBuildTask *task;
+
+static shm_mq *mq;
+static shm_mq_handle *mqh;
+
+static void ginDumpEntry(GinState *ginstate,
+ volatile WorkerResult *r,
+ OffsetNumber attnum,
+ Datum key,
+ GinNullCategory category,
+ ItemPointerData *list,
+ int nlist)
+{
+ int keylen, listlen;
+
+ bool isnull;
+ Form_pg_attribute att;
+ GinShmemEntry e;
+
+ // The message consists of 2 or 3 parts. iovec allows us to send them as
+ // one message though the parts are located at unrelated addresses.
+ shm_mq_iovec iov[3];
+ int iovlen = 0;
+
+ char *buf = NULL;
+
+ e.category = category;
+ e.attnum = attnum;
+ e.nlist = nlist;
+
+ Assert(nlist > 0);
+
+ isnull = category == GIN_CAT_NULL_KEY;
+ att = ginstate->origTupdesc->attrs[attnum - 1];
+
+ if (e.category == GIN_CAT_NORM_KEY)
+ {
+ keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+ Assert(keylen > 0);
+ listlen = e.nlist * sizeof(ItemPointerData);
+ }
+ else
+ keylen = 0;
+
+ listlen = e.nlist * sizeof(ItemPointerData);
+
+ iov[iovlen].data = (char *)&e;
+ iov[iovlen++].len = sizeof(e);
+
+ if (keylen > 0)
+ {
+ char *cursor;
+ buf = palloc(keylen);
+ cursor = buf;
+ datumSerialize(key, isnull, att->attbyval, att->attlen, &cursor);
+ iov[iovlen].data = buf;
+ iov[iovlen++].len = keylen;
+ }
+
+ iov[iovlen].data = (char *)list;
+ iov[iovlen++].len = listlen;
+
+ shm_mq_sendv(mqh, iov, iovlen, false);
+
+ if (buf)
+ pfree(buf);
+}
+
+static void ginDumpAccumulator(GinBuildState *buildstate)
+{
+ ItemPointerData *list;
+ Datum key;
+ GinNullCategory category;
+ uint32 nlist;
+ OffsetNumber attnum;
+ MemoryContext oldCtx;
+ volatile WorkerResult *r = NULL;
+
+ if (IsParallelWorker())
+ {
+ r = &task->results[ParallelWorkerNumber];
+ }
+ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+ ginBeginBAScan(&buildstate->accum);
+ while ((list = ginGetBAEntry(&buildstate->accum,
+ &attnum, &key, &category, &nlist)) != NULL)
+ {
+ /* there could be many entries, so be willing to abort here */
+ CHECK_FOR_INTERRUPTS();
+
+ if (r)
+ ginDumpEntry(&buildstate->ginstate,
+ r, attnum, key,
+ category, list, nlist);
+ else
+ ginEntryInsert(&buildstate->ginstate,
+ attnum, key, category,
+ list, nlist,
+ &buildstate->buildStats);
+ }
+
+ MemoryContextReset(buildstate->tmpCtx);
+ ginInitBA(&buildstate->accum);
+
+ if (IsParallelWorker())
+ {
+ // send empty message as an "end-of-tree" marker
+ shm_mq_result r = shm_mq_send(mqh, 0, NULL, false);
+ if (r != SHM_MQ_SUCCESS)
+ {
+ elog(ERROR, "failed to send the results from worker to backend");
+ }
+ }
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
static void
ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
@@ -283,52 +452,350 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
/* If we've maxed out our available memory, dump everything to the index */
if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
{
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- OffsetNumber attnum;
-
- ginBeginBAScan(&buildstate->accum);
- while ((list = ginGetBAEntry(&buildstate->accum,
- &attnum, &key, &category, &nlist)) != NULL)
- {
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate->ginstate, attnum, key, category,
- list, nlist, &buildstate->buildStats);
- }
-
- MemoryContextReset(buildstate->tmpCtx);
- ginInitBA(&buildstate->accum);
+ ginDumpAccumulator(buildstate);
}
MemoryContextSwitchTo(oldCtx);
}
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int claimSomeBlocks(volatile PGinBuildTask *task, int max_blocks, int *first)
+{
+ int blocks = 0;
+
+ SpinLockAcquire(&task->lock);
+
+ if (task->scanned >= task->to_scan)
+ {
+ SpinLockRelease(&task->lock);
+ return 0;
+ }
+
+ *first = task->scanned;
+ blocks = max_blocks;
+ if (blocks > task->to_scan - task->scanned)
+ blocks = task->to_scan - task->scanned;
+ task->scanned += blocks;
+
+ SpinLockRelease(&task->lock);
+ return blocks;
+}
+
+static void reportReltuples(volatile PGinBuildTask *task, double reltuples)
+{
+ SpinLockAcquire(&task->lock);
+ task->reltuples += reltuples;
+ SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ double reltuples = 0;
+
+ /*
+ * create a temporary memory context that is used to hold data not yet
+ * dumped out to the index
+ */
+ buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * create a temporary memory context that is used for calling
+ * ginExtractEntries(), and can be reset after each tuple
+ */
+ buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context for user-defined function",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ buildstate->accum.ginstate = &buildstate->ginstate;
+ ginInitBA(&buildstate->accum);
+
+ /*
+ * Do the heap scan. We disallow sync scan here because dataPlaceToPage
+ * prefers to receive tuples in TID order.
+ */
+ if (IsParallelWorker())
+ {
+ while (true)
+ {
+ double subtuples;
+ int first, blocks;
+
+ blocks = claimSomeBlocks(task, GIN_BLOCKS_PER_WORKER, &first);
+ if (blocks == 0)
+ break;
+
+ subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+ first, blocks,
+ ginBuildCallback, (void *)buildstate);
+ reltuples += subtuples;
+ }
+ }
+ else
+ {
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
+ ginBuildCallback, (void *)buildstate);
+ }
+
+ /* dump remaining entries to the index */
+ ginDumpAccumulator(buildstate);
+
+ MemoryContextDelete(buildstate->funcCtx);
+ MemoryContextDelete(buildstate->tmpCtx);
+
+ /*
+ * Update metapage stats
+ */
+ buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+ ginUpdateStats(index, &buildstate->buildStats);
+
+ return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+ GinBuildState buildstate;
+
+ Relation heap;
+ Relation index;
+ IndexInfo *indexInfo;
+
+ double reltuples;
+
+ char *shm_origin;
+ int mqsize;
+
+ task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+
+ shm_origin = (char *)shm_toc_lookup(toc, KEY_SHM_ORIGIN);
+ mqsize = *(int*)shm_toc_lookup(toc, KEY_SHM_PER_WORKER);
+ mq = (shm_mq *)(shm_origin + ParallelWorkerNumber * mqsize);
+ shm_mq_set_sender(mq, MyProc);
+ mqh = shm_mq_attach(mq, seg, NULL);
+ shm_mq_wait_for_attach(mqh);
+
+ heap = heap_open(task->heap_oid, NoLock);
+ index = index_open(task->index_oid, NoLock);
+ indexInfo = BuildIndexInfo(index);
+
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+ reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+ index_close(index, NoLock);
+ heap_close(heap, NoLock);
+
+ reportReltuples(task, reltuples);
+
+ shm_mq_detach(mq);
+}
+
+typedef struct GinEntryStack {
+ struct GinEntryStack *next;
+ Datum key;
+ GinNullCategory category;
+ OffsetNumber attnum;
+ ItemPointerData *list;
+ int nlist;
+} GinEntryStack;
+
+static GinEntryStack *pushEntry(GinEntryStack *stack)
+{
+ GinEntryStack *head = palloc(sizeof(GinEntryStack));
+ head->next = stack;
+ head->list = palloc(sizeof(ItemPointerData)); /* make ginMergeItemPointers happy */
+ head->nlist = 0;
+ return head;
+}
+
+static GinEntryStack *popEntry(GinEntryStack *stack)
+{
+ GinEntryStack *head = stack;
+ Assert(stack != NULL);
+ stack = stack->next;
+ pfree(head->list);
+ pfree(head);
+ return stack;
+}
+
+/*
+ * Wait until all unfinished workers start dumping their trees.
+ * Return the number of trees to merge.
+ */
+static int waitNextTree(ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+ int i;
+ int trees = 0;
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ volatile WorkerResult *r = &task->results[i];
+
+ if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) == SHM_MQ_SUCCESS)
+ {
+ r->end_of_tree = false;
+ trees++;
+ }
+ else
+ {
+ r->end_of_forest = true;
+ }
+ }
+ return trees;
+}
+
+/* Merge the results from all ready (but unfinished) workers. */
+static void mergeReadyAndUnfinished(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+ GinEntryStack *entry = NULL;
+ while (true)
+ {
+ int i;
+ bool merged = false;
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ GinShmemEntry *shentry;
+ ItemPointerData *oldlist;
+ int cmp;
+ char *cursor;
+ volatile WorkerResult *r = &task->results[i];
+
+ if (r->end_of_forest || r->end_of_tree)
+ {
+ continue;
+ }
+
+ if (r->msg_len == 0) /* end-of-tree */
+ {
+ r->end_of_tree = true;
+ continue;
+ }
+
+ cursor = r->msg_body;
+ Assert(cursor != NULL);
+ shentry = (GinShmemEntry*)cursor;
+ cursor += sizeof(shentry);
+
+ if (r->skipkey)
+ cursor += r->skipkey;
+ else
+ {
+ r->key = 0;
+ if (shentry->category == GIN_CAT_NORM_KEY)
+ {
+ bool isnull;
+ char *oldcursor = cursor;
+ r->key = datumRestore(&cursor, &isnull); // TODO: check if this leaks memory in a long-living context
+ r->skipkey = cursor - oldcursor;
+ Assert(!isnull);
+ Assert(r->skipkey);
+ }
+ }
+
+ cmp = -1;
+ if (entry != NULL)
+ {
+ cmp = ginCompareAttEntries(&buildstate->ginstate,
+ shentry->attnum, r->key, shentry->category,
+ entry->attnum, entry->key, entry->category);
+ }
+
+ if (cmp > 0)
+ {
+ /* The key is greater, skip the worker. */
+ continue;
+ }
+
+ if (cmp < 0)
+ {
+ /*
+ * The key is less than what we have on the stack.
+ * Push a new entry onto the stack.
+ */
+ entry = pushEntry(entry);
+ entry->key = r->key;
+ entry->category = shentry->category;
+ entry->attnum = shentry->attnum;
+ }
+
+ /* The key is less than or equal. Merge the item pointers. */
+ {
+ ItemPointerData *list = (ItemPointerData*)cursor;
+ oldlist = entry->list;
+ entry->list = ginMergeItemPointers(entry->list, entry->nlist,
+ list, shentry->nlist,
+ &entry->nlist);
+
+ pfree(oldlist);
+ }
+
+ /* Message consumed. Receive the next one. */
+ r->skipkey = 0;
+ if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) != SHM_MQ_SUCCESS)
+ r->end_of_forest = true;
+ merged = true;
+ }
+
+ if (!merged)
+ {
+ /* Nothing merged. Insert the entry into the index and pop the stack. */
+ if (entry == NULL)
+ {
+ /* Also nothing to dump - we have finished. */
+ break;
+ }
+
+ ginEntryInsert(&buildstate->ginstate,
+ entry->attnum, entry->key, entry->category,
+ entry->list, entry->nlist,
+ &buildstate->buildStats);
+ entry = popEntry(entry);
+ }
+ }
+}
+
+static void mergeResults(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+ int trees;
+ while ((trees = waitNextTree(pcxt, task)) > 0)
+ {
+ mergeReadyAndUnfinished(buildstate, pcxt, task);
+ }
+}
+
IndexBuildResult *
ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
+ GinBuildState buildstate;
+
IndexBuildResult *result;
- double reltuples;
- GinBuildState buildstate;
Buffer RootBuffer,
MetaBuffer;
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- MemoryContext oldCtx;
- OffsetNumber attnum;
+ double reltuples = 0;
+ bool parallel_workers_helped = false;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
- initGinState(&buildstate.ginstate, index);
- buildstate.indtuples = 0;
- memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
/* initialize the meta page */
MetaBuffer = GinNewBuffer(index);
@@ -363,60 +830,97 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
UnlockReleaseBuffer(RootBuffer);
END_CRIT_SECTION();
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
/* count the root as first entry page */
buildstate.buildStats.nEntryPages++;
- /*
- * create a temporary memory context that is used to hold data not yet
- * dumped out to the index
- */
- buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- /*
- * create a temporary memory context that is used for calling
- * ginExtractEntries(), and can be reset after each tuple
- */
- buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context for user-defined function",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- buildstate.accum.ginstate = &buildstate.ginstate;
- ginInitBA(&buildstate.accum);
-
- /*
- * Do the heap scan. We disallow sync scan here because dataPlaceToPage
- * prefers to receive tuples in TID order.
- */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- ginBuildCallback, (void *) &buildstate);
-
- /* dump remaining entries to the index */
- oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
- ginBeginBAScan(&buildstate.accum);
- while ((list = ginGetBAEntry(&buildstate.accum,
- &attnum, &key, &category, &nlist)) != NULL)
+ if (gin_parallel_workers > GIN_MAX_WORKERS)
+ gin_parallel_workers = GIN_MAX_WORKERS;
+
+ if (gin_parallel_workers > 0)
{
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate.ginstate, attnum, key, category,
- list, nlist, &buildstate.buildStats);
- }
- MemoryContextSwitchTo(oldCtx);
+ if (RelationUsesLocalBuffers(heap))
+ {
+ elog(WARNING, "not using parallel GIN build on temporary table %s\n", NameStr(heap->rd_rel->relname));
+ }
+ else
+ {
+ EnterParallelMode();
+ {
+ int i;
+
+ PGinBuildTask *task;
+ ParallelContext *pcxt;
+ void *shm;
+ void *ptr;
+
+ int *mqsize;
+
+ int size = 0, keys = 0;
+ keys++; size += sizeof(PGinBuildTask);
+ keys++; size += gin_shared_mem * 1024;
+ keys++; size += sizeof(int);
+
+ pcxt = CreateParallelContext(ginbuildWorker, gin_parallel_workers);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, keys);
+ InitializeParallelDSM(pcxt);
+
+ ptr = shm_toc_allocate(pcxt->toc, sizeof(PGinBuildTask));
+ shm_toc_insert(pcxt->toc, KEY_TASK, ptr);
+ task = (PGinBuildTask*)ptr;
- MemoryContextDelete(buildstate.funcCtx);
- MemoryContextDelete(buildstate.tmpCtx);
+ ptr = shm_toc_allocate(pcxt->toc, gin_shared_mem * 1024);
+ shm_toc_insert(pcxt->toc, KEY_SHM_ORIGIN, ptr);
+ shm = ptr;
- /*
- * Update metapage stats
- */
- buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
- ginUpdateStats(index, &buildstate.buildStats);
+ mqsize = (int *)shm_toc_allocate(pcxt->toc, sizeof(int));
+ *mqsize = gin_shared_mem * 1024 / pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, KEY_SHM_PER_WORKER, mqsize);
+
+ SpinLockInit(&task->lock);
+
+ for (i = 0; i < pcxt->nworkers; i++)
+ {
+ volatile WorkerResult *r = &task->results[i];
+ r->mq = shm_mq_create((char *)shm + i * (*mqsize), *mqsize);
+ shm_mq_set_receiver(r->mq, MyProc);
+ r->mqh = shm_mq_attach(r->mq, pcxt->seg, NULL);
+ r->end_of_tree = false;
+ r->end_of_forest = false;
+ r->msg_body = NULL;
+ r->msg_len = 0;
+ r->skipkey = 0;
+ }
+ task->reltuples = 0;
+ task->to_scan = RelationGetNumberOfBlocks(heap);
+ task->heap_oid = heap->rd_id;
+ task->index_oid = index->rd_id;
+ LaunchParallelWorkers(pcxt);
+ if (pcxt->nworkers_launched > 0)
+ {
+ mergeResults(&buildstate, pcxt, task);
+
+ WaitForParallelWorkersToFinish(pcxt);
+ reltuples = task->reltuples;
+
+ parallel_workers_helped = true;
+ }
+ DestroyParallelContext(pcxt);
+ }
+ ExitParallelMode();
+ }
+ }
+
+ if (!parallel_workers_helped)
+ {
+ /* Do everything myself */
+ reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+ }
/*
* Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index edcafce..e53cc93 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2744,6 +2744,31 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"gin_parallel_workers",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Maximum number of parallel workers for GIN buiding."),
+ NULL,
+ },
+ &gin_parallel_workers,
+ 0, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"gin_shared_mem",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("The size of shared memory segment for parallel GIN buiding."),
+ NULL,
+ GUC_UNIT_KB
+ },
+ &gin_shared_mem,
+ 16 * 1024, 1024, MAX_KILOBYTES,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ee3d378..a756218 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -538,6 +538,8 @@
#xmloption = 'content'
#gin_fuzzy_search_limit = 0
#gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0 # 0 disables parallel gin build
+#gin_shared_mem = 16MB
# - Locale and Formatting -
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..91e5b27 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,8 @@ typedef char GinTernaryValue;
/* GUC parameters */
extern PGDLLIMPORT int GinFuzzySearchLimit;
extern int gin_pending_list_limit;
+extern int gin_parallel_workers;
+extern int gin_shared_mem;
/* ginutil.c */
extern void ginGetStats(Relation index, GinStatsData *stats);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers