On Wed, 17 Feb 2016 23:01:47 +0300
Oleg Bartunov <obartu...@gmail.com> wrote:

> My feedback is (Mac OS X 10.11.3)
> 
> set gin_parallel_workers=2;
> create index message_body_idx on messages using gin(body_tsvector);
> LOG:  worker process: parallel worker for PID 5689 (PID 6906) was
> terminated by signal 11: Segmentation fault

Fixed this, try the new patch. The bug was in incorrect handling
of some GIN categories.
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..2f6f142 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,14 +16,20 @@
 
 #include "access/gin_private.h"
 #include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
 #include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/* GUC parameter */
+int gin_parallel_workers = 0;
 
 typedef struct
 {
@@ -265,6 +271,148 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
 	MemoryContextReset(buildstate->funcCtx);
 }
 
+#define KEY_TASK 42
+#define GIN_MAX_WORKERS 24
+#define GIN_BLOCKS_PER_WORKER 4
+#define GIN_RESULT_LEN 1024
+#define GIN_RESULT_KEYLEN 1024
+
+typedef struct {
+	bool ready;
+	bool finished;
+
+	Datum			key;
+	OffsetNumber	attnum;
+	GinNullCategory	category;
+
+	char keycoded[GIN_RESULT_KEYLEN];
+	int keylen;
+
+	ItemPointerData list[GIN_RESULT_LEN];
+	int			nlist;
+
+	Latch	blatch;
+	Latch	wlatch;
+} WorkerResult;
+
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct {
+	int		to_scan;
+	int		scanned;
+	slock_t	lock;
+	Oid		heap_oid;
+	Oid		index_oid;
+	double	reltuples;
+	WorkerResult results[GIN_MAX_WORKERS];
+} PGinBuildTask;
+
+static volatile PGinBuildTask *task;
+
+static void waitBool(volatile bool *actual, bool wanted, volatile Latch *l)
+{
+	if (*actual == wanted) return;
+
+	while (*actual != wanted)
+		WaitLatch(l, WL_LATCH_SET, 0);
+	ResetLatch(l);
+}
+
+static void setBool(volatile bool *actual, bool newvalue, volatile Latch *l)
+{
+	*actual = newvalue;
+	SetLatch(l);
+}
+
+static void ginDumpEntry(GinState *ginstate,
+						 volatile WorkerResult *r,
+						 OffsetNumber attnum,
+						 Datum key,
+						 GinNullCategory category,
+						 ItemPointerData *list,
+						 int nlist)
+{
+	volatile char *addr;
+	bool isnull;
+	Form_pg_attribute att;
+
+	Assert(nlist > 0);
+	waitBool(&r->ready, false, &r->wlatch);
+
+	Assert(r->keylen == 0);
+	addr = r->keycoded;
+	isnull = category == GIN_CAT_NULL_KEY;
+	att = ginstate->origTupdesc->attrs[attnum - 1];
+
+	r->attnum = attnum;
+	r->category = category;
+	if (r->category == GIN_CAT_EMPTY_ITEM || r->category == GIN_CAT_NULL_ITEM)
+	{
+		r->keylen = 1;
+	}
+	else
+	{
+		r->keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+		Assert(r->keylen > 0);
+		Assert(r->keylen <= GIN_RESULT_KEYLEN);
+
+		datumSerialize(key, isnull, att->attbyval, att->attlen, (char**)&addr);
+	}
+
+	while (nlist > 0)
+	{
+		if (nlist > GIN_RESULT_LEN)
+			r->nlist = GIN_RESULT_LEN;
+		else
+			r->nlist = nlist;
+		nlist -= r->nlist;
+
+		memcpy((void*)r->list, list, r->nlist * sizeof(ItemPointerData));
+		setBool(&r->ready, true, &r->blatch);
+		waitBool(&r->ready, false, &r->wlatch);
+	}
+}
+
+static void ginDumpAccumulator(GinBuildState *buildstate)
+{
+	ItemPointerData *list;
+	Datum		key;
+	GinNullCategory category;
+	uint32		nlist;
+	OffsetNumber attnum;
+	MemoryContext oldCtx;
+
+	oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+	ginBeginBAScan(&buildstate->accum);
+	while ((list = ginGetBAEntry(&buildstate->accum,
+							  &attnum, &key, &category, &nlist)) != NULL)
+	{
+		/* there could be many entries, so be willing to abort here */
+		CHECK_FOR_INTERRUPTS();
+
+		if (IsParallelWorker())
+		{
+			volatile WorkerResult *r = &task->results[ParallelWorkerNumber];
+			ginDumpEntry(&buildstate->ginstate, r, attnum, key, category, list, nlist);
+		}
+		else
+			ginEntryInsert(&buildstate->ginstate,
+						   attnum, key, category,
+						   list, nlist,
+						   &buildstate->buildStats);
+	}
+
+	MemoryContextReset(buildstate->tmpCtx);
+	ginInitBA(&buildstate->accum);
+
+	MemoryContextSwitchTo(oldCtx);
+}
+
 static void
 ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 				 bool *isnull, bool tupleIsAlive, void *state)
@@ -283,52 +431,315 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
 	/* If we've maxed out our available memory, dump everything to the index */
 	if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
 	{
-		ItemPointerData *list;
-		Datum		key;
-		GinNullCategory category;
-		uint32		nlist;
-		OffsetNumber attnum;
-
-		ginBeginBAScan(&buildstate->accum);
-		while ((list = ginGetBAEntry(&buildstate->accum,
-								  &attnum, &key, &category, &nlist)) != NULL)
-		{
-			/* there could be many entries, so be willing to abort here */
-			CHECK_FOR_INTERRUPTS();
-			ginEntryInsert(&buildstate->ginstate, attnum, key, category,
-						   list, nlist, &buildstate->buildStats);
-		}
-
-		MemoryContextReset(buildstate->tmpCtx);
-		ginInitBA(&buildstate->accum);
+		ginDumpAccumulator(buildstate);
 	}
 
 	MemoryContextSwitchTo(oldCtx);
 }
 
+/*
+ * Get the next key from the specified worker. Wait until it is available or
+ * the result is exhausted. Return true if got a key, false if the result is
+ * exhausted. Fill everything in, except "list".
+ */
+static bool getKeyFromWorker(volatile WorkerResult *result)
+{
+	if (result->finished) return false;
+
+	if (result->keylen)
+	{
+		if (result->category == GIN_CAT_EMPTY_ITEM || result->category == GIN_CAT_NULL_ITEM)
+		{
+			result->key = 0;
+		}
+		else
+		{
+			bool isnull;
+			volatile char *addr = result->keycoded;
+			result->key = datumRestore((char**)&addr, &isnull);
+			if (isnull)
+				Assert(result->category == GIN_CAT_NULL_KEY);
+			else
+				Assert(result->category == GIN_CAT_NORM_KEY);
+		}
+		result->keylen = 0;
+	}
+
+	Assert(result->nlist > 0);
+	return true;
+}
+
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int claimSomeBlocks(volatile PGinBuildTask *task, int max_blocks, int *first)
+{
+	int blocks = 0;
+
+	SpinLockAcquire(&task->lock);
+
+	if (task->scanned >= task->to_scan)
+	{
+		SpinLockRelease(&task->lock);
+		return 0;
+	}
+
+	*first = task->scanned;
+	blocks = max_blocks;
+	if (blocks > task->to_scan - task->scanned)
+		blocks = task->to_scan - task->scanned;
+	task->scanned += blocks;
+
+	SpinLockRelease(&task->lock);
+	return blocks;
+}
+
+static void reportReltuples(volatile PGinBuildTask *task, double reltuples)
+{
+	SpinLockAcquire(&task->lock);
+	task->reltuples += reltuples;
+	SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+	double reltuples = 0;
+
+	/*
+	 * create a temporary memory context that is used to hold data not yet
+	 * dumped out to the index
+	 */
+	buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+											   "Gin build temporary context",
+											   ALLOCSET_DEFAULT_MINSIZE,
+											   ALLOCSET_DEFAULT_INITSIZE,
+											   ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * create a temporary memory context that is used for calling
+	 * ginExtractEntries(), and can be reset after each tuple
+	 */
+	buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+												"Gin build temporary context for user-defined function",
+												ALLOCSET_DEFAULT_MINSIZE,
+												ALLOCSET_DEFAULT_INITSIZE,
+												ALLOCSET_DEFAULT_MAXSIZE);
+
+	buildstate->accum.ginstate = &buildstate->ginstate;
+	ginInitBA(&buildstate->accum);
+
+	/*
+	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
+	 * prefers to receive tuples in TID order.
+	 */
+	if (IsParallelWorker())
+	{
+		while (true)
+		{
+			double subtuples;
+			int first, blocks;
+
+			blocks = claimSomeBlocks(task, GIN_BLOCKS_PER_WORKER, &first);
+			if (blocks == 0)
+				break;
+
+			subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+												first, blocks,
+												ginBuildCallback, (void *)buildstate);
+			reltuples += subtuples;
+		}
+	}
+	else
+	{
+		reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
+									   ginBuildCallback, (void *)buildstate);
+	}
+
+	/* dump remaining entries to the index */
+	ginDumpAccumulator(buildstate);
+
+	MemoryContextDelete(buildstate->funcCtx);
+	MemoryContextDelete(buildstate->tmpCtx);
+
+	/*
+	 * Update metapage stats
+	 */
+	buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+	ginUpdateStats(index, &buildstate->buildStats);
+
+	return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+	GinBuildState buildstate;
+
+	Relation heap;
+	Relation index;
+	IndexInfo *indexInfo;
+
+	volatile WorkerResult *r;
+	double reltuples;
+
+	task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+	r = &task->results[ParallelWorkerNumber];
+	r->finished = false;
+
+	OwnLatch(&r->wlatch);
+
+	heap = heap_open(task->heap_oid, NoLock);
+	index = index_open(task->index_oid, NoLock);
+	indexInfo = BuildIndexInfo(index);
+
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+	reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+	index_close(index, NoLock);
+	heap_close(heap, NoLock);
+
+	reportReltuples(task, reltuples);
+
+	waitBool(&r->ready, false, &r->wlatch);
+	r->finished = true;
+	setBool(&r->ready, true, &r->blatch);
+}
+
+typedef struct GinEntryStack {
+	struct GinEntryStack *next;
+	Datum			key;
+	GinNullCategory	category;
+	OffsetNumber	attnum;
+	ItemPointerData	*list;
+	int				nlist;
+} GinEntryStack;
+
+static GinEntryStack *pushEntry(GinEntryStack *stack)
+{
+	GinEntryStack *head = palloc(sizeof(GinEntryStack));
+	head->next = stack;
+	head->list = palloc(sizeof(ItemPointerData)); /* make ginMergeItemPointers happy */
+	head->nlist = 0;
+	return head;
+}
+
+static GinEntryStack *popEntry(GinEntryStack *stack)
+{
+	GinEntryStack *head = stack;
+	Assert(stack != NULL);
+	stack = stack->next;
+	pfree(head->list);
+	pfree(head);
+	return stack;
+}
+
+static void mergeResults(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+	GinEntryStack *entry = NULL;
+
+	while (true)
+	{
+		bool merged = false;
+		int i;
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			ItemPointerData *oldlist;
+			bool have_key;
+			int cmp;
+			volatile WorkerResult *r = &task->results[i];
+			if (pcxt->worker[i].error_mqh == NULL) continue;
+			waitBool(&r->ready, true, &r->blatch);
+			if (r->finished) continue;
+
+			/* The worker has something for us. */
+
+			have_key = getKeyFromWorker(r);
+			Assert(have_key);
+
+			cmp = -1;
+			if (entry != NULL)
+			{
+				cmp = ginCompareAttEntries(&buildstate->ginstate,
+										   r->attnum, r->key, r->category,
+										   entry->attnum, entry->key, entry->category);
+			}
+
+			if (cmp > 0)
+			{
+				/* The key is greater, skip the worker. */
+				continue;
+			}
+
+			if (cmp < 0)
+			{
+				/*
+				 * The key is less than what we have on the stack.
+				 * Push a new entry onto the stack.
+				 */
+				entry = pushEntry(entry);
+				entry->key      = r->key;
+				entry->category = r->category;
+				entry->attnum   = r->attnum;
+			}
+
+			/*
+			 * The key is less than or equal. Merge the item pointers.
+			 * FIXME: Should we first copy the list and let the worker continue
+			 * before merging?
+			 */
+			oldlist = entry->list;
+			entry->list = ginMergeItemPointers(entry->list, entry->nlist,
+											   (ItemPointerData*)r->list, r->nlist,
+											   &entry->nlist);
+			pfree(oldlist);
+			setBool(&r->ready, false, &r->wlatch);
+			merged = true;
+		}
+
+		if (!merged)
+		{
+			/* Nothing merged. Insert the entry into the index and pop the stack. */
+			if (entry == NULL)
+			{
+				/* Also nothing to dump - we have finished. */
+				break;
+			}
+
+			ginEntryInsert(&buildstate->ginstate,
+						   entry->attnum, entry->key, entry->category,
+						   entry->list, entry->nlist,
+						   &buildstate->buildStats);
+			entry = popEntry(entry);
+		}
+	}
+}
+
 IndexBuildResult *
 ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 {
+	GinBuildState buildstate;
+
 	IndexBuildResult *result;
-	double		reltuples;
-	GinBuildState buildstate;
 	Buffer		RootBuffer,
 				MetaBuffer;
-	ItemPointerData *list;
-	Datum		key;
-	GinNullCategory category;
-	uint32		nlist;
-	MemoryContext oldCtx;
-	OffsetNumber attnum;
+	double reltuples = 0;
+	bool parallel_workers_helped = false;
 
 	if (RelationGetNumberOfBlocks(index) != 0)
 		elog(ERROR, "index \"%s\" already contains data",
 			 RelationGetRelationName(index));
 
-	initGinState(&buildstate.ginstate, index);
-	buildstate.indtuples = 0;
-	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
 	/* initialize the meta page */
 	MetaBuffer = GinNewBuffer(index);
 
@@ -363,60 +774,76 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	UnlockReleaseBuffer(RootBuffer);
 	END_CRIT_SECTION();
 
+	initGinState(&buildstate.ginstate, index);
+	buildstate.indtuples = 0;
+	memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
 	/* count the root as first entry page */
 	buildstate.buildStats.nEntryPages++;
 
-	/*
-	 * create a temporary memory context that is used to hold data not yet
-	 * dumped out to the index
-	 */
-	buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
-											  "Gin build temporary context",
-											  ALLOCSET_DEFAULT_MINSIZE,
-											  ALLOCSET_DEFAULT_INITSIZE,
-											  ALLOCSET_DEFAULT_MAXSIZE);
-
-	/*
-	 * create a temporary memory context that is used for calling
-	 * ginExtractEntries(), and can be reset after each tuple
-	 */
-	buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
-					 "Gin build temporary context for user-defined function",
-											   ALLOCSET_DEFAULT_MINSIZE,
-											   ALLOCSET_DEFAULT_INITSIZE,
-											   ALLOCSET_DEFAULT_MAXSIZE);
-
-	buildstate.accum.ginstate = &buildstate.ginstate;
-	ginInitBA(&buildstate.accum);
-
-	/*
-	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
-	 * prefers to receive tuples in TID order.
-	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   ginBuildCallback, (void *) &buildstate);
-
-	/* dump remaining entries to the index */
-	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
-	ginBeginBAScan(&buildstate.accum);
-	while ((list = ginGetBAEntry(&buildstate.accum,
-								 &attnum, &key, &category, &nlist)) != NULL)
+	if (gin_parallel_workers > GIN_MAX_WORKERS)
+		gin_parallel_workers = GIN_MAX_WORKERS;
+
+	if (gin_parallel_workers > 0)
 	{
-		/* there could be many entries, so be willing to abort here */
-		CHECK_FOR_INTERRUPTS();
-		ginEntryInsert(&buildstate.ginstate, attnum, key, category,
-					   list, nlist, &buildstate.buildStats);
-	}
-	MemoryContextSwitchTo(oldCtx);
+		if (RelationUsesLocalBuffers(heap))
+		{
+			fprintf(stderr, "not using parallel GIN build on temporary table %s\n", NameStr(heap->rd_rel->relname));
+		}
+		else
+		{
+			EnterParallelMode();
+			{
+				int i;
+				int size = sizeof(PGinBuildTask);
+				int keys = 1;
+				PGinBuildTask   *task;
+				ParallelContext *pcxt;
+
+				pcxt = CreateParallelContext(ginbuildWorker, gin_parallel_workers);
+
+				shm_toc_estimate_chunk(&pcxt->estimator, size);
+				shm_toc_estimate_keys(&pcxt->estimator, keys);
+				InitializeParallelDSM(pcxt);
+				task = (PGinBuildTask*)shm_toc_allocate(pcxt->toc, size);
+				shm_toc_insert(pcxt->toc, KEY_TASK, (void*)task);
+				SpinLockInit(&task->lock);
+				for (i = 0; i < pcxt->nworkers; i++)
+				{
+					volatile WorkerResult *r = &task->results[i];
+
+					InitSharedLatch(&r->blatch);
+					OwnLatch(&r->blatch);
+					InitSharedLatch(&r->wlatch);
 
-	MemoryContextDelete(buildstate.funcCtx);
-	MemoryContextDelete(buildstate.tmpCtx);
+					r->keylen = 0;
+					r->ready = false;
+				}
+				task->reltuples = 0;
+				task->to_scan = RelationGetNumberOfBlocks(heap);
+				task->heap_oid = heap->rd_id;
+				task->index_oid = index->rd_id;
+				LaunchParallelWorkers(pcxt);
+				if (pcxt->nworkers_launched > 0)
+				{
+					mergeResults(&buildstate, pcxt, task);
 
-	/*
-	 * Update metapage stats
-	 */
-	buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
-	ginUpdateStats(index, &buildstate.buildStats);
+					WaitForParallelWorkersToFinish(pcxt);
+					reltuples = task->reltuples;
+
+					parallel_workers_helped = true;
+				}
+				DestroyParallelContext(pcxt);
+			}
+			ExitParallelMode();
+		}
+	}
+
+	if (!parallel_workers_helped)
+	{
+		/* Do everything myself */
+		reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+	}
 
 	/*
 	 * Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 31a69ca..b26e362 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2697,6 +2697,19 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"gin_parallel_workers",
+			PGC_USERSET,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Maximum number of parallel workers for GIN buiding."),
+			NULL,
+		},
+		&gin_parallel_workers,
+		0, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 09b2003..70cb37a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -537,6 +537,7 @@
 #xmloption = 'content'
 #gin_fuzzy_search_limit = 0
 #gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0		# 0 disables parallel gin build
 
 # - Locale and Formatting -
 
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..c487677 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,7 @@ typedef char GinTernaryValue;
 /* GUC parameters */
 extern PGDLLIMPORT int GinFuzzySearchLimit;
 extern int	gin_pending_list_limit;
+extern int	gin_parallel_workers;
 
 /* ginutil.c */
 extern void ginGetStats(Relation index, GinStatsData *stats);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to