On Wed, 17 Feb 2016 23:01:47 +0300
Oleg Bartunov <obartu...@gmail.com> wrote:
> My feedback is (Mac OS X 10.11.3)
>
> set gin_parallel_workers=2;
> create index message_body_idx on messages using gin(body_tsvector);
> LOG: worker process: parallel worker for PID 5689 (PID 6906) was
> terminated by signal 11: Segmentation fault
Fixed this, try the new patch. The bug was in incorrect handling
of some GIN categories.
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index cd21e0e..2f6f142 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -16,14 +16,20 @@
#include "access/gin_private.h"
#include "access/xloginsert.h"
+#include "access/parallel.h"
+#include "access/xact.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "storage/indexfsm.h"
+#include "storage/spin.h"
+#include "utils/datum.h"
#include "utils/memutils.h"
#include "utils/rel.h"
+/* GUC parameter */
+int gin_parallel_workers = 0;
typedef struct
{
@@ -265,6 +271,148 @@ ginHeapTupleBulkInsert(GinBuildState *buildstate, OffsetNumber attnum,
MemoryContextReset(buildstate->funcCtx);
}
+#define KEY_TASK 42
+#define GIN_MAX_WORKERS 24
+#define GIN_BLOCKS_PER_WORKER 4
+#define GIN_RESULT_LEN 1024
+#define GIN_RESULT_KEYLEN 1024
+
+typedef struct {
+ bool ready;
+ bool finished;
+
+ Datum key;
+ OffsetNumber attnum;
+ GinNullCategory category;
+
+ char keycoded[GIN_RESULT_KEYLEN];
+ int keylen;
+
+ ItemPointerData list[GIN_RESULT_LEN];
+ int nlist;
+
+ Latch blatch;
+ Latch wlatch;
+} WorkerResult;
+
+/*
+ * This structure describes the GIN build task for the parallel workers. We use
+ * OIDs here because workers are separate processes and pointers may become
+ * meaningless for them. The "lock" is used to protect the "scanned" and
+ * "reltuples" fields as the workers modify them.
+ */
+typedef struct {
+ int to_scan;
+ int scanned;
+ slock_t lock;
+ Oid heap_oid;
+ Oid index_oid;
+ double reltuples;
+ WorkerResult results[GIN_MAX_WORKERS];
+} PGinBuildTask;
+
+static volatile PGinBuildTask *task;
+
+static void waitBool(volatile bool *actual, bool wanted, volatile Latch *l)
+{
+ if (*actual == wanted) return;
+
+ while (*actual != wanted)
+ WaitLatch(l, WL_LATCH_SET, 0);
+ ResetLatch(l);
+}
+
+static void setBool(volatile bool *actual, bool newvalue, volatile Latch *l)
+{
+ *actual = newvalue;
+ SetLatch(l);
+}
+
+static void ginDumpEntry(GinState *ginstate,
+ volatile WorkerResult *r,
+ OffsetNumber attnum,
+ Datum key,
+ GinNullCategory category,
+ ItemPointerData *list,
+ int nlist)
+{
+ volatile char *addr;
+ bool isnull;
+ Form_pg_attribute att;
+
+ Assert(nlist > 0);
+ waitBool(&r->ready, false, &r->wlatch);
+
+ Assert(r->keylen == 0);
+ addr = r->keycoded;
+ isnull = category == GIN_CAT_NULL_KEY;
+ att = ginstate->origTupdesc->attrs[attnum - 1];
+
+ r->attnum = attnum;
+ r->category = category;
+ if (r->category == GIN_CAT_EMPTY_ITEM || r->category == GIN_CAT_NULL_ITEM)
+ {
+ r->keylen = 1;
+ }
+ else
+ {
+ r->keylen = datumEstimateSpace(key, isnull, att->attbyval, att->attlen);
+ Assert(r->keylen > 0);
+ Assert(r->keylen <= GIN_RESULT_KEYLEN);
+
+ datumSerialize(key, isnull, att->attbyval, att->attlen, (char**)&addr);
+ }
+
+ while (nlist > 0)
+ {
+ if (nlist > GIN_RESULT_LEN)
+ r->nlist = GIN_RESULT_LEN;
+ else
+ r->nlist = nlist;
+ nlist -= r->nlist;
+
+ memcpy((void*)r->list, list, r->nlist * sizeof(ItemPointerData));
+ setBool(&r->ready, true, &r->blatch);
+ waitBool(&r->ready, false, &r->wlatch);
+ }
+}
+
+static void ginDumpAccumulator(GinBuildState *buildstate)
+{
+ ItemPointerData *list;
+ Datum key;
+ GinNullCategory category;
+ uint32 nlist;
+ OffsetNumber attnum;
+ MemoryContext oldCtx;
+
+ oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
+
+ ginBeginBAScan(&buildstate->accum);
+ while ((list = ginGetBAEntry(&buildstate->accum,
+ &attnum, &key, &category, &nlist)) != NULL)
+ {
+ /* there could be many entries, so be willing to abort here */
+ CHECK_FOR_INTERRUPTS();
+
+ if (IsParallelWorker())
+ {
+ volatile WorkerResult *r = &task->results[ParallelWorkerNumber];
+ ginDumpEntry(&buildstate->ginstate, r, attnum, key, category, list, nlist);
+ }
+ else
+ ginEntryInsert(&buildstate->ginstate,
+ attnum, key, category,
+ list, nlist,
+ &buildstate->buildStats);
+ }
+
+ MemoryContextReset(buildstate->tmpCtx);
+ ginInitBA(&buildstate->accum);
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
static void
ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
bool *isnull, bool tupleIsAlive, void *state)
@@ -283,52 +431,315 @@ ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
/* If we've maxed out our available memory, dump everything to the index */
if (buildstate->accum.allocatedMemory >= (Size)maintenance_work_mem * 1024L)
{
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- OffsetNumber attnum;
-
- ginBeginBAScan(&buildstate->accum);
- while ((list = ginGetBAEntry(&buildstate->accum,
- &attnum, &key, &category, &nlist)) != NULL)
- {
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate->ginstate, attnum, key, category,
- list, nlist, &buildstate->buildStats);
- }
-
- MemoryContextReset(buildstate->tmpCtx);
- ginInitBA(&buildstate->accum);
+ ginDumpAccumulator(buildstate);
}
MemoryContextSwitchTo(oldCtx);
}
+/*
+ * Get the next key from the specified worker. Wait until it is available or
+ * the result is exhausted. Return true if got a key, false if the result is
+ * exhausted. Fill everything in, except "list".
+ */
+static bool getKeyFromWorker(volatile WorkerResult *result)
+{
+ if (result->finished) return false;
+
+ if (result->keylen)
+ {
+ if (result->category == GIN_CAT_EMPTY_ITEM || result->category == GIN_CAT_NULL_ITEM)
+ {
+ result->key = 0;
+ }
+ else
+ {
+ bool isnull;
+ volatile char *addr = result->keycoded;
+ result->key = datumRestore((char**)&addr, &isnull);
+ if (isnull)
+ Assert(result->category == GIN_CAT_NULL_KEY);
+ else
+ Assert(result->category == GIN_CAT_NORM_KEY);
+ }
+ result->keylen = 0;
+ }
+
+ Assert(result->nlist > 0);
+ return true;
+}
+
+/*
+ * Claim "max_blocks" or less blocks. Return the actual number of claimed
+ * blocks and set "first" to point to the first block of the claimed range.
+ * 0 return value means the task has been finished.
+ */
+static int claimSomeBlocks(volatile PGinBuildTask *task, int max_blocks, int *first)
+{
+ int blocks = 0;
+
+ SpinLockAcquire(&task->lock);
+
+ if (task->scanned >= task->to_scan)
+ {
+ SpinLockRelease(&task->lock);
+ return 0;
+ }
+
+ *first = task->scanned;
+ blocks = max_blocks;
+ if (blocks > task->to_scan - task->scanned)
+ blocks = task->to_scan - task->scanned;
+ task->scanned += blocks;
+
+ SpinLockRelease(&task->lock);
+ return blocks;
+}
+
+static void reportReltuples(volatile PGinBuildTask *task, double reltuples)
+{
+ SpinLockAcquire(&task->lock);
+ task->reltuples += reltuples;
+ SpinLockRelease(&task->lock);
+}
+
+static double
+ginbuildCommon(GinBuildState *buildstate, Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ double reltuples = 0;
+
+ /*
+ * create a temporary memory context that is used to hold data not yet
+ * dumped out to the index
+ */
+ buildstate->tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * create a temporary memory context that is used for calling
+ * ginExtractEntries(), and can be reset after each tuple
+ */
+ buildstate->funcCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "Gin build temporary context for user-defined function",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ buildstate->accum.ginstate = &buildstate->ginstate;
+ ginInitBA(&buildstate->accum);
+
+ /*
+ * Do the heap scan. We disallow sync scan here because dataPlaceToPage
+ * prefers to receive tuples in TID order.
+ */
+ if (IsParallelWorker())
+ {
+ while (true)
+ {
+ double subtuples;
+ int first, blocks;
+
+ blocks = claimSomeBlocks(task, GIN_BLOCKS_PER_WORKER, &first);
+ if (blocks == 0)
+ break;
+
+ subtuples = IndexBuildHeapRangeScan(heap, index, indexInfo, false, false,
+ first, blocks,
+ ginBuildCallback, (void *)buildstate);
+ reltuples += subtuples;
+ }
+ }
+ else
+ {
+ reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
+ ginBuildCallback, (void *)buildstate);
+ }
+
+ /* dump remaining entries to the index */
+ ginDumpAccumulator(buildstate);
+
+ MemoryContextDelete(buildstate->funcCtx);
+ MemoryContextDelete(buildstate->tmpCtx);
+
+ /*
+ * Update metapage stats
+ */
+ buildstate->buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
+ ginUpdateStats(index, &buildstate->buildStats);
+
+ return reltuples;
+}
+
+/*
+ * The idea behind parallel GIN build is letting the parallel workers scan the
+ * relation and build rbtree in parallel. Each block is scanned by one of the
+ * workers.
+ */
+static void ginbuildWorker(dsm_segment *seg, shm_toc *toc)
+{
+ GinBuildState buildstate;
+
+ Relation heap;
+ Relation index;
+ IndexInfo *indexInfo;
+
+ volatile WorkerResult *r;
+ double reltuples;
+
+ task = (PGinBuildTask*)shm_toc_lookup(toc, KEY_TASK);
+ r = &task->results[ParallelWorkerNumber];
+ r->finished = false;
+
+ OwnLatch(&r->wlatch);
+
+ heap = heap_open(task->heap_oid, NoLock);
+ index = index_open(task->index_oid, NoLock);
+ indexInfo = BuildIndexInfo(index);
+
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
+ reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+
+ index_close(index, NoLock);
+ heap_close(heap, NoLock);
+
+ reportReltuples(task, reltuples);
+
+ waitBool(&r->ready, false, &r->wlatch);
+ r->finished = true;
+ setBool(&r->ready, true, &r->blatch);
+}
+
+typedef struct GinEntryStack {
+ struct GinEntryStack *next;
+ Datum key;
+ GinNullCategory category;
+ OffsetNumber attnum;
+ ItemPointerData *list;
+ int nlist;
+} GinEntryStack;
+
+static GinEntryStack *pushEntry(GinEntryStack *stack)
+{
+ GinEntryStack *head = palloc(sizeof(GinEntryStack));
+ head->next = stack;
+ head->list = palloc(sizeof(ItemPointerData)); /* make ginMergeItemPointers happy */
+ head->nlist = 0;
+ return head;
+}
+
+static GinEntryStack *popEntry(GinEntryStack *stack)
+{
+ GinEntryStack *head = stack;
+ Assert(stack != NULL);
+ stack = stack->next;
+ pfree(head->list);
+ pfree(head);
+ return stack;
+}
+
+static void mergeResults(GinBuildState *buildstate, ParallelContext *pcxt, volatile PGinBuildTask *task)
+{
+ GinEntryStack *entry = NULL;
+
+ while (true)
+ {
+ bool merged = false;
+ int i;
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ ItemPointerData *oldlist;
+ bool have_key;
+ int cmp;
+ volatile WorkerResult *r = &task->results[i];
+ if (pcxt->worker[i].error_mqh == NULL) continue;
+ waitBool(&r->ready, true, &r->blatch);
+ if (r->finished) continue;
+
+ /* The worker has something for us. */
+
+ have_key = getKeyFromWorker(r);
+ Assert(have_key);
+
+ cmp = -1;
+ if (entry != NULL)
+ {
+ cmp = ginCompareAttEntries(&buildstate->ginstate,
+ r->attnum, r->key, r->category,
+ entry->attnum, entry->key, entry->category);
+ }
+
+ if (cmp > 0)
+ {
+ /* The key is greater, skip the worker. */
+ continue;
+ }
+
+ if (cmp < 0)
+ {
+ /*
+ * The key is less than what we have on the stack.
+ * Push a new entry onto the stack.
+ */
+ entry = pushEntry(entry);
+ entry->key = r->key;
+ entry->category = r->category;
+ entry->attnum = r->attnum;
+ }
+
+ /*
+ * The key is less than or equal. Merge the item pointers.
+ * FIXME: Should we first copy the list and let the worker continue
+ * before merging?
+ */
+ oldlist = entry->list;
+ entry->list = ginMergeItemPointers(entry->list, entry->nlist,
+ (ItemPointerData*)r->list, r->nlist,
+ &entry->nlist);
+ pfree(oldlist);
+ setBool(&r->ready, false, &r->wlatch);
+ merged = true;
+ }
+
+ if (!merged)
+ {
+ /* Nothing merged. Insert the entry into the index and pop the stack. */
+ if (entry == NULL)
+ {
+ /* Also nothing to dump - we have finished. */
+ break;
+ }
+
+ ginEntryInsert(&buildstate->ginstate,
+ entry->attnum, entry->key, entry->category,
+ entry->list, entry->nlist,
+ &buildstate->buildStats);
+ entry = popEntry(entry);
+ }
+ }
+}
+
IndexBuildResult *
ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
+ GinBuildState buildstate;
+
IndexBuildResult *result;
- double reltuples;
- GinBuildState buildstate;
Buffer RootBuffer,
MetaBuffer;
- ItemPointerData *list;
- Datum key;
- GinNullCategory category;
- uint32 nlist;
- MemoryContext oldCtx;
- OffsetNumber attnum;
+ double reltuples = 0;
+ bool parallel_workers_helped = false;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
- initGinState(&buildstate.ginstate, index);
- buildstate.indtuples = 0;
- memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
-
/* initialize the meta page */
MetaBuffer = GinNewBuffer(index);
@@ -363,60 +774,76 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
UnlockReleaseBuffer(RootBuffer);
END_CRIT_SECTION();
+ initGinState(&buildstate.ginstate, index);
+ buildstate.indtuples = 0;
+ memset(&buildstate.buildStats, 0, sizeof(GinStatsData));
+
/* count the root as first entry page */
buildstate.buildStats.nEntryPages++;
- /*
- * create a temporary memory context that is used to hold data not yet
- * dumped out to the index
- */
- buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- /*
- * create a temporary memory context that is used for calling
- * ginExtractEntries(), and can be reset after each tuple
- */
- buildstate.funcCtx = AllocSetContextCreate(CurrentMemoryContext,
- "Gin build temporary context for user-defined function",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- buildstate.accum.ginstate = &buildstate.ginstate;
- ginInitBA(&buildstate.accum);
-
- /*
- * Do the heap scan. We disallow sync scan here because dataPlaceToPage
- * prefers to receive tuples in TID order.
- */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
- ginBuildCallback, (void *) &buildstate);
-
- /* dump remaining entries to the index */
- oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
- ginBeginBAScan(&buildstate.accum);
- while ((list = ginGetBAEntry(&buildstate.accum,
- &attnum, &key, &category, &nlist)) != NULL)
+ if (gin_parallel_workers > GIN_MAX_WORKERS)
+ gin_parallel_workers = GIN_MAX_WORKERS;
+
+ if (gin_parallel_workers > 0)
{
- /* there could be many entries, so be willing to abort here */
- CHECK_FOR_INTERRUPTS();
- ginEntryInsert(&buildstate.ginstate, attnum, key, category,
- list, nlist, &buildstate.buildStats);
- }
- MemoryContextSwitchTo(oldCtx);
+ if (RelationUsesLocalBuffers(heap))
+ {
+ fprintf(stderr, "not using parallel GIN build on temporary table %s\n", NameStr(heap->rd_rel->relname));
+ }
+ else
+ {
+ EnterParallelMode();
+ {
+ int i;
+ int size = sizeof(PGinBuildTask);
+ int keys = 1;
+ PGinBuildTask *task;
+ ParallelContext *pcxt;
+
+ pcxt = CreateParallelContext(ginbuildWorker, gin_parallel_workers);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, keys);
+ InitializeParallelDSM(pcxt);
+ task = (PGinBuildTask*)shm_toc_allocate(pcxt->toc, size);
+ shm_toc_insert(pcxt->toc, KEY_TASK, (void*)task);
+ SpinLockInit(&task->lock);
+ for (i = 0; i < pcxt->nworkers; i++)
+ {
+ volatile WorkerResult *r = &task->results[i];
+
+ InitSharedLatch(&r->blatch);
+ OwnLatch(&r->blatch);
+ InitSharedLatch(&r->wlatch);
- MemoryContextDelete(buildstate.funcCtx);
- MemoryContextDelete(buildstate.tmpCtx);
+ r->keylen = 0;
+ r->ready = false;
+ }
+ task->reltuples = 0;
+ task->to_scan = RelationGetNumberOfBlocks(heap);
+ task->heap_oid = heap->rd_id;
+ task->index_oid = index->rd_id;
+ LaunchParallelWorkers(pcxt);
+ if (pcxt->nworkers_launched > 0)
+ {
+ mergeResults(&buildstate, pcxt, task);
- /*
- * Update metapage stats
- */
- buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
- ginUpdateStats(index, &buildstate.buildStats);
+ WaitForParallelWorkersToFinish(pcxt);
+ reltuples = task->reltuples;
+
+ parallel_workers_helped = true;
+ }
+ DestroyParallelContext(pcxt);
+ }
+ ExitParallelMode();
+ }
+ }
+
+ if (!parallel_workers_helped)
+ {
+ /* Do everything myself */
+ reltuples = ginbuildCommon(&buildstate, heap, index, indexInfo);
+ }
/*
* Return statistics
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 31a69ca..b26e362 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2697,6 +2697,19 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"gin_parallel_workers",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Maximum number of parallel workers for GIN buiding."),
+ NULL,
+ },
+ &gin_parallel_workers,
+ 0, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 09b2003..70cb37a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -537,6 +537,7 @@
#xmloption = 'content'
#gin_fuzzy_search_limit = 0
#gin_pending_list_limit = 4MB
+#gin_parallel_workers = 0 # 0 disables parallel gin build
# - Locale and Formatting -
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..c487677 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -68,6 +68,7 @@ typedef char GinTernaryValue;
/* GUC parameters */
extern PGDLLIMPORT int GinFuzzySearchLimit;
extern int gin_pending_list_limit;
+extern int gin_parallel_workers;
/* ginutil.c */
extern void ginGetStats(Relation index, GinStatsData *stats);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers