On Mon, 14 Mar 2016 08:42:26 -0400
David Steele <da...@pgmasters.net> wrote:

> On 2/18/16 10:10 AM, Constantin S. Pan wrote:
> > On Wed, 17 Feb 2016 23:01:47 +0300
> > Oleg Bartunov <obartu...@gmail.com> wrote:
> >
> >> My feedback is (Mac OS X 10.11.3)
> >>
> >> set gin_parallel_workers=2;
> >> create index message_body_idx on messages using gin(body_tsvector);
> >> LOG:  worker process: parallel worker for PID 5689 (PID 6906) was
> >> terminated by signal 11: Segmentation fault
> >
> > Fixed this, try the new patch. The bug was in incorrect handling
> > of some GIN categories.
> 
> Oleg, it looks like Constantin has updated to patch to address the
> issue you were seeing.  Do you have time to retest and review?
> 
> Thanks,

Actually, there was some progress since. The patch is
attached.

1. Added another GUC parameter for changing the amount of
shared memory for parallel GIN workers.

2. Changed the way results are merged. It uses shared memory
message queue now.

3. Tested on some real data (GIN index on email message body
tsvectors). Here are the timings for different values of
'gin_shared_mem' and 'gin_parallel_workers' on a 4-CPU
machine. Seems 'gin_shared_mem' has no visible effect.

wnum mem(MB) time(s)
   0      16     247
   1      16     256
   2      16     126
   4      16      89
   0      32     247
   1      32     270
   2      32     123
   4      32      92
   0      64     254
   1      64     272
   2      64     123
   4      64      88
   0     128     250
   1     128     263
   2     128     126
   4     128      85
   0     256     247
   1     256     269
   2     256     130
   4     256      88
   0     512     257
   1     512     275
   2     512     129
   4     512      92
   0    1024     255
   1    1024     273
   2    1024     130
   4    1024      90

On Wed, 17 Feb 2016 12:26:05 -0800
Peter Geoghegan <p...@heroku.com> wrote:

> On Wed, Feb 17, 2016 at 7:55 AM, Constantin S. Pan <kva...@gmail.com>
> wrote:
> > 4. Hit the 8x speedup limit. Made some analysis of the reasons (see
> > the attached plot or the data file).  
> 
> Did you actually compare this to the master branch? I wouldn't like to
> assume that the one worker case was equivalent. Obviously that's the
> really interesting baseline.

Compared with the master branch. The case of 0 workers is
indeed equivalent to the master branch.

Regards,
Constantin
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..ff267b3 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,14 +16,21 @@
 
 #include "access/gin_private.h"
 #include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
 #include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/* GUC parameter */
+int gin_parallel_workers = 0;
+int gin_shared_mem = 0;
 
 typedef struct
 {
@@ -35,7 +42,6 @@ typedef struct
 	BuildAccumulator accum;
 } GinBuildState;
 
-
 /*
  * Adds array of item pointers to tuple's posting list, or
  * creates posting tree and tuple pointing to tree in case
@@ -265,6 +271,169 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
 	MemoryContextReset(buildstate->funcCtx);
 }
 
+#define KEY_TASK 42
+#define KEY_SHM_ORIGIN 43
+#define KEY_SHM_PER_WORKER 44
+#define GIN_MAX_WORKERS 24
+#define GIN_BLOCKS_PER_WORKER 4
+
+/*
+ * The shmem message structure:
+ * Entry, Key, List
+ */
+
+typedef struct {
+	GinNullCategory category;
+	OffsetNumber attnum;
+	int nlist;
+} GinShmemEntry;
+
+typedef struct {
+	void *mq;
+	shm_mq_handle *mqh;
+	bool end_of_tree;
+	bool end_of_forest;
+
+	void *msg_body;
+	Size msg_len;
+	Datum key;
+	int skipkey;
+} WorkerResult;
+
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct {
+	int		to_scan;
+	int		scanned;
+	slock_t	lock;
+	Oid		heap_oid;
+	Oid		index_oid;
+	double	reltuples;
+	WorkerResult results[GIN_MAX_WORKERS];
+} PGinBuildTask;
+
+static volatile PGinBuildTask *task;
+
+static shm_mq *mq;
+static shm_mq_handle *mqh;
+
+static void ginDumpEntry(GinState *ginstate,
+						 volatile WorkerResult *r,
+						 OffsetNumber attnum,
+						 Datum key,
+						 GinNullCategory category,
+						 ItemPointerData *list,
+						 int nlist)
+{
+	int keylen, listlen;
+
+	bool isnull;
+	Form_pg_attribute att;
+	GinShmemEntry e;
+
+	// The message consists of 2 or 3 parts. iovec allows us to send them as
+	// one message though the parts are located at unrelated addresses.
+	shm_mq_iovec iov[3];
+	int iovlen = 0;
+
+	char *buf = NULL;
+
+	e.category = category;
+	e.attnum = attnum;
+	e.nlist = nlist;
+
+	Assert(nlist > 0);
+
+	isnull = category == GIN_CAT_NULL_KEY;
+	att = ginstate->origTupdesc->attrs[attnum - 1];
+
+	if (e.category == GIN_CAT_NORM_KEY)
+	{
+		keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+		Assert(keylen > 0);
+		listlen = e.nlist * sizeof(ItemPointerData);
+	}
+	else
+		keylen = 0;
+
+	listlen = e.nlist * sizeof(ItemPointerData);
+
+	iov[iovlen].data = (char *)&e;
+	iov[iovlen++].len = sizeof(e);
+
+	if (keylen > 0)
+	{
+		char *cursor;
+		buf = palloc(keylen);
+		cursor = buf;
+		datumSerialize(key, isnull, att->attbyval, att->attlen, &cursor);
+		iov[iovlen].data = buf;
+		iov[iovlen++].len = keylen;
+	}
+
+	iov[iovlen].data = (char *)list;
+	iov[iovlen++].len = listlen;
+
+	shm_mq_sendv(mqh, iov, iovlen, false);
+
+	if (buf)
+		pfree(buf);
+}
+
+static void ginDumpAccumulator(GinBuildState *buildstate)
+{
+	ItemPointerData *list;
+	Datum		key;
+	GinNullCategory category;
+	uint32		nlist;
+	OffsetNumber attnum;
+	MemoryContext oldCtx;
+	volatile WorkerResult *r = NULL;
+
+	if (IsParallelWorker())
+	{
+		r = &task->results[ParallelWorkerNumber];
+	}
+	oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+	ginBeginBAScan(&buildstate->accum);
+	while ((list = ginGetBAEntry(&buildstate->accum,
+							  &attnum, &key, &category, &nlist)) != NULL)
+	{
+		/* there could be many entries, so be willing to abort here */
+		CHECK_FOR_INTERRUPTS();
+
+		if (r)
+			ginDumpEntry(&buildstate->ginstate,
+						 r, attnum, key,
+						 category, list, nlist);
+		else
+			ginEntryInsert(&buildstate->ginstate,
+						   attnum, key, category,
+						   list, nlist,
+						   &buildstate->buildStats);
+	}
+
+	MemoryContextReset(buildstate->tmpCtx);
+	ginInitBA(&buildstate->accum);
+
+	if (IsParallelWorker())
+	{
+		// send empty message as an "end-of-tree" marker
+		shm_mq_result r = shm_mq_send(mqh, 0, NULL, false);
+		if (r != SHM_MQ_SUCCESS)
+		{
+			elog(ERROR, "failed to send the results from worker to backend");
+		}
+	}
+
+	MemoryContextSwitchTo(oldCtx);
+}
+
 static void
 ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 				 bool *isnull, bool tupleIsAlive, void *state)
@@ -283,52 +452,350 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 	/* If we've maxed out our available memory, dump everything to the index */
 	if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
 	{
-		ItemPointerData *list;
-		Datum		key;
-		GinNullCategory category;
-		uint32		nlist;
-		OffsetNumber attnum;
-
-		ginBeginBAScan(&buildstate->accum);
-		while ((list = ginGetBAEntry(&buildstate->accum,
-								  &attnum, &key, &category, &nlist)) != NULL)
-		{
-			/* there could be many entries, so be willing to abort here */
-			CHECK_FOR_INTERRUPTS();
-			ginEntryInsert(&buildstate->ginstate, attnum, key, category,
-						   list, nlist, &buildstate->buildStats);
-		}
-
-		MemoryContextReset(buildstate->tmpCtx);
-		ginInitBA(&buildstate->accum);
+		ginDumpAccumulator(buildstate);
 	}
 
 	MemoryContextSwitchTo(oldCtx);
 }
 
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int claimSomeBlocks(volatile PGinBuildTask *task, int max_blocks, int *first)
+{
+	int blocks = 0;
+
+	SpinLockAcquire(&task->lock);
+
+	if (task->scanned >= task->to_scan)
+	{
+		SpinLockRelease(&task->lock);
+		return 0;
+	}
+
+	*first = task->scanned;
+	blocks = max_blocks;
+	if (blocks > task->to_scan - task->scanned)
+		blocks = task->to_scan - task->scanned;
+	task->scanned += blocks;
+
+	SpinLockRelease(&task->lock);
+	return blocks;
+}
+
+static void reportReltuples(volatile PGinBuildTask *task, double reltuples)
+{
+	SpinLockAcquire(&task->lock);
+	task->reltuples += reltuples;
+	SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+	double reltuples = 0;
+
+	/*
+	 * create a temporary memory context that is used to hold data not yet
+	 * dumped out to the index
+	 */
+	buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+											   "Gin build temporary context",
+											   ALLOCSET_DEFAULT_MINSIZE,
+											   ALLOCSET_DEFAULT_INITSIZE,
+											   ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * create a temporary memory context that is used for calling
+	 * ginExtractEntries(), and can be reset after each tuple
+	 */
+	buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+												"Gin build temporary context for user-defined function",
+												ALLOCSET_DEFAULT_MINSIZE,
+												ALLOCSET_DEFAULT_INITSIZE,
+												ALLOCSET_DEFAULT_MAXSIZE);
+
+	buildstate->accum.ginstate = &buildstate->ginstate;
+	ginInitBA(&buildstate->accum);
+
+	/*
+	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
+	 * prefers to receive tuples in TID order.
+	 */
+	if (IsParallelWorker())
+	{
+		while (true)
+		{
+			double subtuples;
+			int first, blocks;
+
+			blocks = claimSomeBlocks(task, GIN_BLOCKS_PER_WORKER, &first);
+			if (blocks == 0)
+				break;
+
+			subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+												first, blocks,
+												ginBuildCallback, (void *)buildstate);
+			reltuples += subtuples;
+		}
+	}
+	else
+	{
+		reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
+									   ginBuildCallback, (void *)buildstate);
+	}
+
+	/* dump remaining entries to the index */
+	ginDumpAccumulator(buildstate);
+
+	MemoryContextDelete(buildstate->funcCtx);
+	MemoryContextDelete(buildstate->tmpCtx);
+
+	/*
+	 * Update metapage stats
+	 */
+	buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+	ginUpdateStats(index, &buildstate->buildStats);
+
+	return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+	GinBuildState buildstate;
+
+	Relation heap;
+	Relation index;
+	IndexInfo *indexInfo;
+
+	double reltuples;
+
+	char *shm_origin;
+	int mqsize;
+
+	task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+
+	shm_origin = (char *)shm_toc_lookup(toc, KEY_SHM_ORIGIN);
+	mqsize = *(int*)shm_toc_lookup(toc, KEY_SHM_PER_WORKER);
+	mq = (shm_mq *)(shm_origin + ParallelWorkerNumber * mqsize);
+	shm_mq_set_sender(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+	shm_mq_wait_for_attach(mqh);
+
+	heap = heap_open(task->heap_oid, NoLock);
+	index = index_open(task->index_oid, NoLock);
+	indexInfo = BuildIndexInfo(index);
+
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+	reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+	index_close(index, NoLock);
+	heap_close(heap, NoLock);
+
+	reportReltuples(task, reltuples);
+
+	shm_mq_detach(mq);
+}
+
+typedef struct GinEntryStack {
+	struct GinEntryStack *next;
+	Datum			key;
+	GinNullCategory	category;
+	OffsetNumber	attnum;
+	ItemPointerData	*list;
+	int				nlist;
+} GinEntryStack;
+
+static GinEntryStack *pushEntry(GinEntryStack *stack)
+{
+	GinEntryStack *head = palloc(sizeof(GinEntryStack));
+	head->next = stack;
+	head->list = palloc(sizeof(ItemPointerData)); /* make ginMergeItemPointers happy */
+	head->nlist = 0;
+	return head;
+}
+
+static GinEntryStack *popEntry(GinEntryStack *stack)
+{
+	GinEntryStack *head = stack;
+	Assert(stack != NULL);
+	stack = stack->next;
+	pfree(head->list);
+	pfree(head);
+	return stack;
+}
+
+/*
+ * Wait until all unfinished workers start dumping their trees.
+ * Return the number of trees to merge.
+ */
+static int waitNextTree(ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+	int i;
+	int trees = 0;
+
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		volatile WorkerResult *r = &task->results[i];
+
+		if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) == SHM_MQ_SUCCESS)
+		{
+			r->end_of_tree = false;
+			trees++;
+		}
+		else
+		{
+			r->end_of_forest = true;
+		}
+	}
+	return trees;
+}
+
+/* Merge the results from all ready (but unfinished) workers. */
+static void mergeReadyAndUnfinished(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+	GinEntryStack *entry = NULL;
+	while (true)
+	{
+		int i;
+		bool merged = false;
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			GinShmemEntry *shentry;
+			ItemPointerData *oldlist;
+			int cmp;
+			char *cursor;
+			volatile WorkerResult *r = &task->results[i];
+
+			if (r->end_of_forest || r->end_of_tree)
+			{
+				continue;
+			}
+
+			if (r->msg_len == 0) /* end-of-tree */
+			{
+				r->end_of_tree = true;
+				continue;
+			}
+
+			cursor = r->msg_body;
+			Assert(cursor != NULL);
+			shentry = (GinShmemEntry*)cursor;
+			cursor += sizeof(shentry);
+
+			if (r->skipkey)
+				cursor += r->skipkey;
+			else
+			{
+				r->key = 0;
+				if (shentry->category == GIN_CAT_NORM_KEY)
+				{
+					bool isnull;
+					char *oldcursor = cursor;
+					r->key = datumRestore(&cursor, &isnull); // TODO: check if this leaks memory in a long-living context
+					r->skipkey = cursor - oldcursor;
+					Assert(!isnull);
+					Assert(r->skipkey);
+				}
+			}
+
+			cmp = -1;
+			if (entry != NULL)
+			{
+				cmp = ginCompareAttEntries(&buildstate->ginstate,
+										   shentry->attnum, r->key, shentry->category,
+										   entry->attnum, entry->key, entry->category);
+			}
+
+			if (cmp > 0)
+			{
+				/* The key is greater, skip the worker. */
+				continue;
+			}
+
+			if (cmp < 0)
+			{
+				/*
+				 * The key is less than what we have on the stack.
+				 * Push a new entry onto the stack.
+				 */
+				entry = pushEntry(entry);
+				entry->key      = r->key;
+				entry->category = shentry->category;
+				entry->attnum   = shentry->attnum;
+			}
+
+			/* The key is less than or equal. Merge the item pointers. */
+			{
+				ItemPointerData *list = (ItemPointerData*)cursor;
+				oldlist = entry->list;
+				entry->list = ginMergeItemPointers(entry->list, entry->nlist,
+												   list, shentry->nlist,
+												   &entry->nlist);
+
+				pfree(oldlist);
+			}
+
+			/* Message consumed. Receive the next one. */
+			r->skipkey = 0;
+			if (shm_mq_receive(r->mqh, (Size *)&r->msg_len, (void *)&r->msg_body, false) != SHM_MQ_SUCCESS)
+				r->end_of_forest = true;
+			merged = true;
+		}
+
+		if (!merged)
+		{
+			/* Nothing merged. Insert the entry into the index and pop the stack. */
+			if (entry == NULL)
+			{
+				/* Also nothing to dump - we have finished. */
+				break;
+			}
+
+			ginEntryInsert(&buildstate->ginstate,
+						   entry->attnum, entry->key, entry->category,
+						   entry->list, entry->nlist,
+						   &buildstate->buildStats);
+			entry = popEntry(entry);
+		}
+	}
+}
+
+static void mergeResults(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+	int trees;
+	while ((trees = waitNextTree(pcxt, task)) > 0)
+	{
+		mergeReadyAndUnfinished(buildstate, pcxt, task);
+	}
+}
+
 IndexBuildResult *
 ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 {
+	GinBuildState buildstate;
+
 	IndexBuildResult *result;
-	double		reltuples;
-	GinBuildState buildstate;
 	Buffer		RootBuffer,
 				MetaBuffer;
-	ItemPointerData *list;
-	Datum		key;
-	GinNullCategory category;
-	uint32		nlist;
-	MemoryContext oldCtx;
-	OffsetNumber attnum;
+	double reltuples = 0;
+	bool parallel_workers_helped = false;
 
 	if (RelationGetNumberOfBlocks(index) != 0)
 		elog(ERROR, "index \"%s\" already contains data",
 			 RelationGetRelationName(index));
 
-	initGinState(&buildstate.ginstate, index);
-	buildstate.indtuples = 0;
-	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
 	/* initialize the meta page */
 	MetaBuffer = GinNewBuffer(index);
 
@@ -363,60 +830,97 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	UnlockReleaseBuffer(RootBuffer);
 	END_CRIT_SECTION();
 
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
 	/* count the root as first entry page */
 	buildstate.buildStats.nEntryPages++;
 
-	/*
-	 * create a temporary memory context that is used to hold data not yet
-	 * dumped out to the index
-	 */
-	buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
-											  "Gin build temporary context",
-											  ALLOCSET_DEFAULT_MINSIZE,
-											  ALLOCSET_DEFAULT_INITSIZE,
-											  ALLOCSET_DEFAULT_MAXSIZE);
-
-	/*
-	 * create a temporary memory context that is used for calling
-	 * ginExtractEntries(), and can be reset after each tuple
-	 */
-	buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
-					 "Gin build temporary context for user-defined function",
-											   ALLOCSET_DEFAULT_MINSIZE,
-											   ALLOCSET_DEFAULT_INITSIZE,
-											   ALLOCSET_DEFAULT_MAXSIZE);
-
-	buildstate.accum.ginstate = &buildstate.ginstate;
-	ginInitBA(&buildstate.accum);
-
-	/*
-	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
-	 * prefers to receive tuples in TID order.
-	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   ginBuildCallback, (void *) &buildstate);
-
-	/* dump remaining entries to the index */
-	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
-	ginBeginBAScan(&buildstate.accum);
-	while ((list = ginGetBAEntry(&buildstate.accum,
-								 &attnum, &key, &category, &nlist)) != NULL)
+	if (gin_parallel_workers > GIN_MAX_WORKERS)
+		gin_parallel_workers = GIN_MAX_WORKERS;
+
+	if (gin_parallel_workers > 0)
 	{
-		/* there could be many entries, so be willing to abort here */
-		CHECK_FOR_INTERRUPTS();
-		ginEntryInsert(&buildstate.ginstate, attnum, key, category,
-					   list, nlist, &buildstate.buildStats);
-	}
-	MemoryContextSwitchTo(oldCtx);
+		if (RelationUsesLocalBuffers(heap))
+		{
+			elog(WARNING, "not using parallel GIN build on temporary table %s\n", NameStr(heap->rd_rel->relname));
+		}
+		else
+		{
+			EnterParallelMode();
+			{
+				int i;
+
+				PGinBuildTask   *task;
+				ParallelContext *pcxt;
+				void *shm;
+				void *ptr;
+
+				int *mqsize;
+
+				int size = 0, keys = 0;
+				keys++; size += sizeof(PGinBuildTask);
+				keys++; size += gin_shared_mem * 1024;
+				keys++; size += sizeof(int);
+
+				pcxt = CreateParallelContext(ginbuildWorker, gin_parallel_workers);
+
+				shm_toc_estimate_chunk(&pcxt->estimator, size);
+				shm_toc_estimate_keys(&pcxt->estimator, keys);
+				InitializeParallelDSM(pcxt);
+
+				ptr = shm_toc_allocate(pcxt->toc, sizeof(PGinBuildTask));
+				shm_toc_insert(pcxt->toc, KEY_TASK, ptr);
+				task = (PGinBuildTask*)ptr;
 
-	MemoryContextDelete(buildstate.funcCtx);
-	MemoryContextDelete(buildstate.tmpCtx);
+				ptr = shm_toc_allocate(pcxt->toc, gin_shared_mem * 1024);
+				shm_toc_insert(pcxt->toc, KEY_SHM_ORIGIN, ptr);
+				shm = ptr;
 
-	/*
-	 * Update metapage stats
-	 */
-	buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
-	ginUpdateStats(index, &buildstate.buildStats);
+				mqsize = (int *)shm_toc_allocate(pcxt->toc, sizeof(int));
+				*mqsize = gin_shared_mem * 1024 / pcxt->nworkers;
+				shm_toc_insert(pcxt->toc, KEY_SHM_PER_WORKER, mqsize);
+
+				SpinLockInit(&task->lock);
+
+				for (i = 0; i < pcxt->nworkers; i++)
+				{
+					volatile WorkerResult *r = &task->results[i];
+					r->mq = shm_mq_create((char *)shm + i * (*mqsize), *mqsize);
+					shm_mq_set_receiver(r->mq, MyProc);
+					r->mqh = shm_mq_attach(r->mq, pcxt->seg, NULL);
+					r->end_of_tree = false;
+					r->end_of_forest = false;
+					r->msg_body = NULL;
+					r->msg_len = 0;
+					r->skipkey = 0;
+				}
+				task->reltuples = 0;
+				task->to_scan = RelationGetNumberOfBlocks(heap);
+				task->heap_oid = heap->rd_id;
+				task->index_oid = index->rd_id;
+				LaunchParallelWorkers(pcxt);
+				if (pcxt->nworkers_launched > 0)
+				{
+					mergeResults(&buildstate, pcxt, task);
+
+					WaitForParallelWorkersToFinish(pcxt);
+					reltuples = task->reltuples;
+
+					parallel_workers_helped = true;
+				}
+				DestroyParallelContext(pcxt);
+			}
+			ExitParallelMode();
+		}
+	}
+
+	if (!parallel_workers_helped)
+	{
+		/* Do everything myself */
+		reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+	}
 
 	/*
 	 * Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index edcafce..e53cc93 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2744,6 +2744,31 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"gin_parallel_workers",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Maximum number of parallel workers for GIN buiding."),
+			NULL,
+		},
+		&gin_parallel_workers,
+		0, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"gin_shared_mem",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("The size of shared memory segment for parallel GIN buiding."),
+			NULL,
+			GUC_UNIT_KB
+		},
+		&gin_shared_mem,
+		16 * 1024, 1024, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ee3d378..a756218 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -538,6 +538,8 @@
 #xmloption = 'content'
 #gin_fuzzy_search_limit = 0
 #gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0		# 0 disables parallel gin build
+#gin_shared_mem = 16MB
 
 # - Locale and Formatting -
 
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..91e5b27 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,8 @@ typedef char GinTernaryValue;
 /* GUC parameters */
 extern PGDLLIMPORT int GinFuzzySearchLimit;
 extern int	gin_pending_list_limit;
+extern int	gin_parallel_workers;
+extern int	gin_shared_mem;
 
 /* ginutil.c */
 extern void ginGetStats(Relation index, GinStatsData *stats);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to