I pushed the two smaller parts today.

Here's the remaining two parts, to keep cfbot happy. I don't expect to
get these into PG18, though.


regards

-- 
Tomas Vondra
From bea52f76255830af45b7122b0fa5786997182cf5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Tue, 25 Feb 2025 16:12:37 +0100
Subject: [PATCH v20250304 1/2] Use a single GIN tuplesort

The previous approach was to sort the data on a private sort, then read
it back, merge the GinTuples, and write it into the shared sort, to
later be used by the shared tuple sort.

The new approach is to use a single sort, merging tuples as we write
them to disk.  This reduces temporary disk space.

An optimization was added to GinBuffer in which we don't deserialize
tuples unless we need access to the itemIds.

This modifies TUplesort to have a new flushwrites callback. Sort's
writetup can now decide to buffer writes until the next flushwrites()
callback.
---
 src/backend/access/gin/gininsert.c         | 411 +++++++++------------
 src/backend/utils/sort/tuplesort.c         |   5 +
 src/backend/utils/sort/tuplesortvariants.c | 102 ++++-
 src/include/access/gin_private.h           |   3 +
 src/include/access/gin_tuple.h             |  10 +
 src/include/utils/tuplesort.h              |  10 +-
 src/tools/pgindent/typedefs.list           |   1 +
 7 files changed, 302 insertions(+), 240 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index b2f89cad880..e873443784a 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -164,14 +164,6 @@ typedef struct
 	 * build callback etc.
 	 */
 	Tuplesortstate *bs_sortstate;
-
-	/*
-	 * The sortstate used only within a single worker for the first merge pass
-	 * happenning there. In principle it doesn't need to be part of the build
-	 * state and we could pass it around directly, but it's more convenient
-	 * this way. And it's part of the build state, after all.
-	 */
-	Tuplesortstate *bs_worker_sort;
 } GinBuildState;
 
 
@@ -195,8 +187,7 @@ static Datum _gin_parse_tuple_key(GinTuple *a);
 
 static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category,
 								  Datum key, int16 typlen, bool typbyval,
-								  ItemPointerData *items, uint32 nitems,
-								  Size *len);
+								  ItemPointerData *items, uint32 nitems);
 
 /*
  * Adds array of item pointers to tuple's posting list, or
@@ -499,16 +490,15 @@ ginFlushBuildState(GinBuildState *buildstate, Relation index)
 
 		/* GIN tuple and tuple length */
 		GinTuple   *tup;
-		Size		tuplen;
 
 		/* there could be many entries, so be willing to abort here */
 		CHECK_FOR_INTERRUPTS();
 
 		tup = _gin_build_tuple(attnum, category,
 							   key, attr->attlen, attr->attbyval,
-							   list, nlist, &tuplen);
+							   list, nlist);
 
-		tuplesort_putgintuple(buildstate->bs_worker_sort, tup, tuplen);
+		tuplesort_putgintuple(buildstate->bs_sortstate, tup);
 
 		pfree(tup);
 	}
@@ -1144,8 +1134,14 @@ _gin_parallel_heapscan(GinBuildState *state)
  * during the initial table scan (and detecting when the scan wraps around),
  * and during merging (where we do mergesort).
  */
-typedef struct GinBuffer
+struct GinBuffer
 {
+	/*
+	 * The memory context holds the dynamic allocation of items, key, and any
+	 * produced GinTuples.
+	 */
+	MemoryContext context;
+	GinTuple   *cached;			/* copy of previous GIN tuple */
 	OffsetNumber attnum;
 	GinNullCategory category;
 	Datum		key;			/* 0 if no key (and keylen == 0) */
@@ -1163,7 +1159,7 @@ typedef struct GinBuffer
 	int			nfrozen;
 	SortSupport ssup;			/* for sorting/comparing keys */
 	ItemPointerData *items;
-} GinBuffer;
+};
 
 /*
  * Check that TID array contains valid values, and that it's sorted (if we
@@ -1174,8 +1170,7 @@ AssertCheckItemPointers(GinBuffer *buffer)
 {
 #ifdef USE_ASSERT_CHECKING
 	/* we should not have a buffer with no TIDs to sort */
-	Assert(buffer->items != NULL);
-	Assert(buffer->nitems > 0);
+	Assert(buffer->nitems == 0 || buffer->items != NULL);
 
 	for (int i = 0; i < buffer->nitems; i++)
 	{
@@ -1201,7 +1196,7 @@ AssertCheckGinBuffer(GinBuffer *buffer)
 {
 #ifdef USE_ASSERT_CHECKING
 	/* if we have any items, the array must exist */
-	Assert(!((buffer->nitems > 0) && (buffer->items == NULL)));
+	Assert((buffer->nitems == 0) || (buffer->items != NULL));
 
 	/*
 	 * The buffer may be empty, in which case we must not call the check of
@@ -1225,7 +1220,7 @@ AssertCheckGinBuffer(GinBuffer *buffer)
  *
  * Initializes sort support procedures for all index attributes.
  */
-static GinBuffer *
+GinBuffer *
 GinBufferInit(Relation index)
 {
 	GinBuffer  *buffer = palloc0(sizeof(GinBuffer));
@@ -1288,15 +1283,18 @@ GinBufferInit(Relation index)
 
 		PrepareSortSupportComparisonShim(cmpFunc, sortKey);
 	}
+	buffer->context = GenerationContextCreate(CurrentMemoryContext,
+											  "Gin Buffer",
+											  ALLOCSET_DEFAULT_SIZES);
 
 	return buffer;
 }
 
 /* Is the buffer empty, i.e. has no TID values in the array? */
-static bool
+bool
 GinBufferIsEmpty(GinBuffer *buffer)
 {
-	return (buffer->nitems == 0);
+	return (buffer->nitems == 0 && buffer->cached == NULL);
 }
 
 /*
@@ -1312,37 +1310,71 @@ GinBufferIsEmpty(GinBuffer *buffer)
 static bool
 GinBufferKeyEquals(GinBuffer *buffer, GinTuple *tup)
 {
+	MemoryContext prev;
 	int			r;
+	AttrNumber	attnum;
 	Datum		tupkey;
+	Datum		bufkey;
 
 	AssertCheckGinBuffer(buffer);
+	if (buffer->cached)
+	{
+		GinTuple   *cached = buffer->cached;
 
-	if (tup->attrnum != buffer->attnum)
-		return false;
+		if (tup->attrnum != cached->attrnum)
+			return false;
 
-	/* same attribute should have the same type info */
-	Assert(tup->typbyval == buffer->typbyval);
-	Assert(tup->typlen == buffer->typlen);
+		Assert(tup->typbyval == cached->typbyval);
+		Assert(tup->typlen == cached->typlen);
 
-	if (tup->category != buffer->category)
-		return false;
+		if (tup->category != cached->category)
+			return false;
 
-	/*
-	 * For NULL/empty keys, this means equality, for normal keys we need to
-	 * compare the actual key value.
-	 */
-	if (buffer->category != GIN_CAT_NORM_KEY)
-		return true;
+		/*
+		 * For NULL/empty keys, this means equality, for normal keys we need
+		 * to compare the actual key value.
+		 */
+		if (cached->category != GIN_CAT_NORM_KEY)
+			return true;
+
+		attnum = cached->attrnum;
+		bufkey = _gin_parse_tuple_key(cached);
+	}
+	else
+	{
+		if (tup->attrnum != buffer->attnum)
+			return false;
+
+		/* same attribute should have the same type info */
+		Assert(tup->typbyval == buffer->typbyval);
+		Assert(tup->typlen == buffer->typlen);
+
+		if (tup->category != buffer->category)
+			return false;
+
+		/*
+		 * For NULL/empty keys, this means equality, for normal keys we need
+		 * to compare the actual key value.
+		 */
+		if (buffer->category != GIN_CAT_NORM_KEY)
+			return true;
+		attnum = buffer->attnum;
+		bufkey = buffer->key;
+	}
 
 	/*
 	 * For the tuple, get either the first sizeof(Datum) bytes for byval
 	 * types, or a pointer to the beginning of the data array.
 	 */
-	tupkey = (buffer->typbyval) ? *(Datum *) tup->data : PointerGetDatum(tup->data);
+	tupkey = _gin_parse_tuple_key(tup);
+
+	prev = MemoryContextSwitchTo(buffer->context);
 
-	r = ApplySortComparator(buffer->key, false,
+	r = ApplySortComparator(bufkey, false,
 							tupkey, false,
-							&buffer->ssup[buffer->attnum - 1]);
+							&buffer->ssup[attnum - 1]);
+
+	MemoryContextSwitchTo(prev);
 
 	return (r == 0);
 }
@@ -1389,6 +1421,56 @@ GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
 	return true;
 }
 
+static void
+GinBufferUnpackCached(GinBuffer *buffer, int reserve_space)
+{
+	Datum		key;
+	ItemPointer items;
+	GinTuple   *cached;
+	int			totitems;
+
+	cached = buffer->cached;
+	totitems = cached->nitems + reserve_space;
+	key = _gin_parse_tuple_key(cached);
+
+	buffer->category = cached->category;
+	buffer->keylen = cached->keylen;
+	buffer->attnum = cached->attrnum;
+
+	buffer->typlen = cached->typlen;
+	buffer->typbyval = cached->typbyval;
+
+	if (cached->category == GIN_CAT_NORM_KEY)
+		buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen);
+	else
+		buffer->key = (Datum) 0;
+
+	items = _gin_parse_tuple_items(cached);
+
+	if (buffer->items == NULL)
+	{
+		buffer->items = palloc0(totitems * sizeof(ItemPointerData));
+		buffer->maxitems = totitems;
+	}
+	else if (buffer->maxitems < totitems)
+	{
+		buffer->items = repalloc(buffer->items,
+								 totitems * sizeof(ItemPointerData));
+		buffer->maxitems = totitems;
+	}
+	else
+	{
+		Assert(PointerIsValid(buffer->items) &&
+			   buffer->maxitems >= totitems);
+	}
+	memcpy(buffer->items, items, buffer->nitems * sizeof(ItemPointerData));
+	buffer->nitems = cached->nitems;
+
+	buffer->cached = NULL;
+	pfree(cached);
+	pfree(items);
+}
+
 /*
  * GinBufferStoreTuple
  *		Add data (especially TID list) from a GIN tuple to the buffer.
@@ -1412,32 +1494,29 @@ GinBufferShouldTrim(GinBuffer *buffer, GinTuple *tup)
  * workers. But the workers merge the items as much as possible, so there
  * should not be too many.
  */
-static void
-GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
+void
+GinBufferMergeTuple(GinBuffer *buffer, GinTuple *tup)
 {
+	MemoryContext prev;
 	ItemPointerData *items;
-	Datum		key;
 
+	prev = MemoryContextSwitchTo(buffer->context);
 	AssertCheckGinBuffer(buffer);
 
-	key = _gin_parse_tuple_key(tup);
-	items = _gin_parse_tuple_items(tup);
-
 	/* if the buffer is empty, set the fields (and copy the key) */
 	if (GinBufferIsEmpty(buffer))
 	{
-		buffer->category = tup->category;
-		buffer->keylen = tup->keylen;
-		buffer->attnum = tup->attrnum;
-
-		buffer->typlen = tup->typlen;
-		buffer->typbyval = tup->typbyval;
+		GinTuple   *tuple = palloc(tup->tuplen);
 
-		if (tup->category == GIN_CAT_NORM_KEY)
-			buffer->key = datumCopy(key, buffer->typbyval, buffer->typlen);
-		else
-			buffer->key = (Datum) 0;
+		memcpy(tuple, tup, tup->tuplen);
+		buffer->cached = tuple;
 	}
+	else if (buffer->cached != NULL)
+	{
+		GinBufferUnpackCached(buffer, tup->nitems);
+	}
+
+	items = _gin_parse_tuple_items(tup);
 
 	/*
 	 * Try freeze TIDs at the beginning of the list, i.e. exclude them from
@@ -1515,20 +1594,54 @@ GinBufferStoreTuple(GinBuffer *buffer, GinTuple *tup)
 
 	/* free the decompressed TID list */
 	pfree(items);
+
+	MemoryContextSwitchTo(prev);
+}
+
+GinTuple *
+GinBufferBuildTuple(GinBuffer *buffer)
+{
+	MemoryContext prev = MemoryContextSwitchTo(buffer->context);
+	GinTuple   *result;
+
+	if (buffer->cached)
+	{
+		result = buffer->cached;
+		buffer->cached = NULL;
+	}
+	else
+	{
+		result = _gin_build_tuple(buffer->attnum, buffer->category,
+								  buffer->key, buffer->typlen,
+								  buffer->typbyval, buffer->items,
+								  buffer->nitems);
+	}
+
+	GinBufferReset(buffer);
+
+	MemoryContextSwitchTo(prev);
+	return result;
 }
 
 /*
  * GinBufferReset
  *		Reset the buffer into a state as if it contains no data.
  */
-static void
+void
 GinBufferReset(GinBuffer *buffer)
 {
 	Assert(!GinBufferIsEmpty(buffer));
 
-	/* release byref values, do nothing for by-val ones */
-	if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
-		pfree(DatumGetPointer(buffer->key));
+	/* release cached buffer tuple, if present */
+	if (buffer->cached)
+		pfree(buffer->cached);
+	else
+	{
+		/* release byref values, do nothing for by-val ones */
+		if ((buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval
+			&& PointerIsValid(DatumGetPointer(buffer->key)))
+			pfree(DatumGetPointer(buffer->key));
+	}
 
 	/*
 	 * Not required, but makes it more likely to trigger NULL derefefence if
@@ -1544,6 +1657,7 @@ GinBufferReset(GinBuffer *buffer)
 
 	buffer->typlen = 0;
 	buffer->typbyval = 0;
+	/* Note that we don't reset the memory context, this is deliberate */
 }
 
 /*
@@ -1567,7 +1681,7 @@ GinBufferTrim(GinBuffer *buffer)
  * GinBufferFree
  *		Release memory associated with the GinBuffer (including TID array).
  */
-static void
+void
 GinBufferFree(GinBuffer *buffer)
 {
 	if (buffer->items)
@@ -1578,6 +1692,7 @@ GinBufferFree(GinBuffer *buffer)
 		(buffer->category == GIN_CAT_NORM_KEY) && !buffer->typbyval)
 		pfree(DatumGetPointer(buffer->key));
 
+	MemoryContextDelete(buffer->context);
 	pfree(buffer);
 }
 
@@ -1587,7 +1702,7 @@ GinBufferFree(GinBuffer *buffer)
  *
  * Returns true if the buffer is either empty or for the same index key.
  */
-static bool
+bool
 GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup)
 {
 	/* empty buffer can accept data for any key */
@@ -1684,6 +1799,7 @@ _gin_parallel_merge(GinBuildState *state)
 			 * GinTuple.
 			 */
 			AssertCheckItemPointers(buffer);
+			Assert(!PointerIsValid(buffer->cached));
 
 			ginEntryInsert(&state->ginstate,
 						   buffer->attnum, buffer->key, buffer->category,
@@ -1710,6 +1826,7 @@ _gin_parallel_merge(GinBuildState *state)
 			 * GinTuple.
 			 */
 			AssertCheckItemPointers(buffer);
+			Assert(!PointerIsValid(buffer->cached));
 
 			ginEntryInsert(&state->ginstate,
 						   buffer->attnum, buffer->key, buffer->category,
@@ -1723,7 +1840,10 @@ _gin_parallel_merge(GinBuildState *state)
 		 * Remember data for the current tuple (either remember the new key,
 		 * or append if to the existing data).
 		 */
-		GinBufferStoreTuple(buffer, tup);
+		GinBufferMergeTuple(buffer, tup);
+
+		if (buffer->cached)
+			GinBufferUnpackCached(buffer, 0);
 
 		/* Report progress */
 		pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
@@ -1734,6 +1854,7 @@ _gin_parallel_merge(GinBuildState *state)
 	if (!GinBufferIsEmpty(buffer))
 	{
 		AssertCheckItemPointers(buffer);
+		Assert(!PointerIsValid(buffer->cached));
 
 		ginEntryInsert(&state->ginstate,
 					   buffer->attnum, buffer->key, buffer->category,
@@ -1789,158 +1910,6 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela
 								 sortmem, true);
 }
 
-/*
- * _gin_process_worker_data
- *		First phase of the key merging, happening in the worker.
- *
- * Depending on the number of distinct keys, the TID lists produced by the
- * callback may be very short (due to frequent evictions in the callback).
- * But combining many tiny lists is expensive, so we try to do as much as
- * possible in the workers and only then pass the results to the leader.
- *
- * We read the tuples sorted by the key, and merge them into larger lists.
- * At the moment there's no memory limit, so this will just produce one
- * huge (sorted) list per key in each worker. Which means the leader will
- * do a very limited number of mergesorts, which is good.
- */
-static void
-_gin_process_worker_data(GinBuildState *state, Tuplesortstate *worker_sort,
-						 bool progress)
-{
-	GinTuple   *tup;
-	Size		tuplen;
-
-	GinBuffer  *buffer;
-
-	/*
-	 * Initialize buffer to combine entries for the same key.
-	 *
-	 * The workers are limited to the same amount of memory as during the sort
-	 * in ginBuildCallbackParallel. But this probably should be the 32MB used
-	 * during planning, just like there.
-	 */
-	buffer = GinBufferInit(state->ginstate.index);
-
-	/* sort the raw per-worker data */
-	if (progress)
-		pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
-									 PROGRESS_GIN_PHASE_PERFORMSORT_1);
-
-	tuplesort_performsort(state->bs_worker_sort);
-
-	/* reset the number of GIN tuples produced by this worker */
-	state->bs_numtuples = 0;
-
-	if (progress)
-		pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
-									 PROGRESS_GIN_PHASE_MERGE_1);
-
-	/*
-	 * Read the GIN tuples from the shared tuplesort, sorted by the key, and
-	 * merge them into larger chunks for the leader to combine.
-	 */
-	while ((tup = tuplesort_getgintuple(worker_sort, &tuplen, true)) != NULL)
-	{
-
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * If the buffer can accept the new GIN tuple, just store it there and
-		 * we're done. If it's a different key (or maybe too much data) flush
-		 * the current contents into the index first.
-		 */
-		if (!GinBufferCanAddKey(buffer, tup))
-		{
-			GinTuple   *ntup;
-			Size		ntuplen;
-
-			/*
-			 * Buffer is not empty and it's storing a different key - flush
-			 * the data into the insert, and start a new entry for current
-			 * GinTuple.
-			 */
-			AssertCheckItemPointers(buffer);
-
-			ntup = _gin_build_tuple(buffer->attnum, buffer->category,
-									buffer->key, buffer->typlen, buffer->typbyval,
-									buffer->items, buffer->nitems, &ntuplen);
-
-			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
-			state->bs_numtuples++;
-
-			pfree(ntup);
-
-			/* discard the existing data */
-			GinBufferReset(buffer);
-		}
-
-		/*
-		 * We're about to add a GIN tuple to the buffer - check the memory
-		 * limit first, and maybe write out some of the data into the index
-		 * first, if needed (and possible). We only flush the part of the TID
-		 * list that we know won't change, and only if there's enough data for
-		 * compression to work well.
-		 */
-		if (GinBufferShouldTrim(buffer, tup))
-		{
-			GinTuple   *ntup;
-			Size		ntuplen;
-
-			Assert(buffer->nfrozen > 0);
-
-			/*
-			 * Buffer is not empty and it's storing a different key - flush
-			 * the data into the insert, and start a new entry for current
-			 * GinTuple.
-			 */
-			AssertCheckItemPointers(buffer);
-
-			ntup = _gin_build_tuple(buffer->attnum, buffer->category,
-									buffer->key, buffer->typlen, buffer->typbyval,
-									buffer->items, buffer->nfrozen, &ntuplen);
-
-			tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
-
-			pfree(ntup);
-
-			/* truncate the data we've just discarded */
-			GinBufferTrim(buffer);
-		}
-
-		/*
-		 * Remember data for the current tuple (either remember the new key,
-		 * or append if to the existing data).
-		 */
-		GinBufferStoreTuple(buffer, tup);
-	}
-
-	/* flush data remaining in the buffer (for the last key) */
-	if (!GinBufferIsEmpty(buffer))
-	{
-		GinTuple   *ntup;
-		Size		ntuplen;
-
-		AssertCheckItemPointers(buffer);
-
-		ntup = _gin_build_tuple(buffer->attnum, buffer->category,
-								buffer->key, buffer->typlen, buffer->typbyval,
-								buffer->items, buffer->nitems, &ntuplen);
-
-		tuplesort_putgintuple(state->bs_sortstate, ntup, ntuplen);
-		state->bs_numtuples++;
-
-		pfree(ntup);
-
-		/* discard the existing data */
-		GinBufferReset(buffer);
-	}
-
-	/* relase all the memory */
-	GinBufferFree(buffer);
-
-	tuplesort_end(worker_sort);
-}
-
 /*
  * Perform a worker's portion of a parallel GIN index build sort.
  *
@@ -2007,12 +1976,6 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 													coordinate,
 													TUPLESORT_NONE);
 
-	/* Local per-worker sort of raw-data */
-	state->bs_worker_sort = tuplesort_begin_index_gin(heap, index,
-													  state->work_mem,
-													  NULL,
-													  TUPLESORT_NONE);
-
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(index);
 	indexInfo->ii_Concurrent = ginshared->isconcurrent;
@@ -2026,13 +1989,6 @@ _gin_parallel_scan_and_build(GinBuildState *state,
 	/* write remaining accumulated entries */
 	ginFlushBuildState(state, index);
 
-	/*
-	 * Do the first phase of in-worker processing - sort the data produced by
-	 * the callback, and combine them into much larger chunks and place that
-	 * into the shared tuplestore for leader to process.
-	 */
-	_gin_process_worker_data(state, state->bs_worker_sort, progress);
-
 	/* sort the GIN tuples built by this worker */
 	tuplesort_performsort(state->bs_sortstate);
 
@@ -2187,8 +2143,7 @@ typedef struct
 static GinTuple *
 _gin_build_tuple(OffsetNumber attrnum, unsigned char category,
 				 Datum key, int16 typlen, bool typbyval,
-				 ItemPointerData *items, uint32 nitems,
-				 Size *len)
+				 ItemPointerData *items, uint32 nitems)
 {
 	GinTuple   *tuple;
 	char	   *ptr;
@@ -2256,8 +2211,6 @@ _gin_build_tuple(OffsetNumber attrnum, unsigned char category,
 	 */
 	tuplen = SHORTALIGN(offsetof(GinTuple, data) + keylen) + compresslen;
 
-	*len = tuplen;
-
 	/*
 	 * Allocate space for the whole GIN tuple.
 	 *
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 2ef32d53a43..7f346325678 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -395,6 +395,7 @@ struct Sharedsort
 #define REMOVEABBREV(state,stup,count)	((*(state)->base.removeabbrev) (state, stup, count))
 #define COMPARETUP(state,a,b)	((*(state)->base.comparetup) (a, b, state))
 #define WRITETUP(state,tape,stup)	((*(state)->base.writetup) (state, tape, stup))
+#define FLUSHWRITES(state,tape)	((state)->base.flushwrites ? (*(state)->base.flushwrites) (state, tape) : (void) 0)
 #define READTUP(state,stup,tape,len) ((*(state)->base.readtup) (state, stup, tape, len))
 #define FREESTATE(state)	((state)->base.freestate ? (*(state)->base.freestate) (state) : (void) 0)
 #define LACKMEM(state)		((state)->availMem < 0 && !(state)->slabAllocatorUsed)
@@ -2244,6 +2245,8 @@ mergeonerun(Tuplesortstate *state)
 		}
 	}
 
+	FLUSHWRITES(state, state->destTape);
+
 	/*
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
 	 * output tape.
@@ -2369,6 +2372,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 		WRITETUP(state, state->destTape, stup);
 	}
 
+	FLUSHWRITES(state, state->destTape);
+
 	state->memtupcount = 0;
 
 	/*
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index eb8601e2257..a106cc79efd 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -32,6 +32,7 @@
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/tuplesort.h"
+#include "access/gin.h"
 
 
 /* sort-type codes for sort__start probes */
@@ -90,6 +91,7 @@ static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
 							   LogicalTape *tape, unsigned int len);
 static void writetup_index_gin(Tuplesortstate *state, LogicalTape *tape,
 							   SortTuple *stup);
+static void flushwrites_index_gin(Tuplesortstate *state, LogicalTape *tape);
 static void readtup_index_gin(Tuplesortstate *state, SortTuple *stup,
 							  LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
@@ -101,6 +103,7 @@ static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
 						  LogicalTape *tape, unsigned int len);
 static void freestate_cluster(Tuplesortstate *state);
+static void freestate_index_gin(Tuplesortstate *state);
 
 /*
  * Data structure pointed by "TuplesortPublic.arg" for the CLUSTER case.  Set by
@@ -135,6 +138,16 @@ typedef struct
 	bool		uniqueNullsNotDistinct; /* unique constraint null treatment */
 } TuplesortIndexBTreeArg;
 
+/*
+ * Data structure pointed by "TuplesortPublic.arg" for the index_gin subcase.
+ */
+typedef struct
+{
+	TuplesortIndexArg index;
+	GinBuffer  *buffer;
+} TuplesortIndexGinArg;
+
+
 /*
  * Data structure pointed by "TuplesortPublic.arg" for the index_hash subcase.
  */
@@ -209,6 +222,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 	base->comparetup = comparetup_heap;
 	base->comparetup_tiebreak = comparetup_heap_tiebreak;
 	base->writetup = writetup_heap;
+	base->flushwrites = NULL;
 	base->readtup = readtup_heap;
 	base->haveDatum1 = true;
 	base->arg = tupDesc;		/* assume we need not copy tupDesc */
@@ -285,6 +299,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 	base->comparetup = comparetup_cluster;
 	base->comparetup_tiebreak = comparetup_cluster_tiebreak;
 	base->writetup = writetup_cluster;
+	base->flushwrites = NULL;
 	base->readtup = readtup_cluster;
 	base->freestate = freestate_cluster;
 	base->arg = arg;
@@ -393,6 +408,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	base->comparetup = comparetup_index_btree;
 	base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -472,6 +488,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 	base->comparetup = comparetup_index_hash;
 	base->comparetup_tiebreak = comparetup_index_hash_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -516,6 +533,7 @@ tuplesort_begin_index_gist(Relation heapRel,
 	base->comparetup = comparetup_index_btree;
 	base->comparetup_tiebreak = comparetup_index_btree_tiebreak;
 	base->writetup = writetup_index;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -571,6 +589,7 @@ tuplesort_begin_index_brin(int workMem,
 	base->removeabbrev = removeabbrev_index_brin;
 	base->comparetup = comparetup_index_brin;
 	base->writetup = writetup_index_brin;
+	base->flushwrites = NULL;
 	base->readtup = readtup_index_brin;
 	base->haveDatum1 = true;
 	base->arg = NULL;
@@ -587,6 +606,7 @@ tuplesort_begin_index_gin(Relation heapRel,
 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
 												   sortopt);
 	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	TuplesortIndexGinArg *arg;
 	MemoryContext oldcontext;
 	int			i;
 	TupleDesc	desc = RelationGetDescr(indexRel);
@@ -611,6 +631,10 @@ tuplesort_begin_index_gin(Relation heapRel,
 	/* Prepare SortSupport data for each column */
 	base->sortKeys = (SortSupport) palloc0(base->nKeys *
 										   sizeof(SortSupportData));
+	arg = palloc0(sizeof(TuplesortIndexGinArg));
+	arg->index.indexRel = indexRel;
+	arg->index.heapRel = heapRel;
+	arg->buffer = GinBufferInit(indexRel);
 
 	for (i = 0; i < base->nKeys; i++)
 	{
@@ -640,9 +664,11 @@ tuplesort_begin_index_gin(Relation heapRel,
 	base->removeabbrev = removeabbrev_index_gin;
 	base->comparetup = comparetup_index_gin;
 	base->writetup = writetup_index_gin;
+	base->flushwrites = flushwrites_index_gin;
 	base->readtup = readtup_index_gin;
+	base->freestate = freestate_index_gin;
 	base->haveDatum1 = false;
-	base->arg = NULL;
+	base->arg = arg;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -683,6 +709,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	base->comparetup = comparetup_datum;
 	base->comparetup_tiebreak = comparetup_datum_tiebreak;
 	base->writetup = writetup_datum;
+	base->flushwrites = NULL;
 	base->readtup = readtup_datum;
 	base->haveDatum1 = true;
 	base->arg = arg;
@@ -885,17 +912,17 @@ tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
 }
 
 void
-tuplesort_putgintuple(Tuplesortstate *state, GinTuple *tuple, Size size)
+tuplesort_putgintuple(Tuplesortstate *state, GinTuple *tuple)
 {
 	SortTuple	stup;
 	GinTuple   *ctup;
 	TuplesortPublic *base = TuplesortstateGetPublic(state);
 	MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext);
-	Size		tuplen;
+	Size		tuplen = tuple->tuplen;
 
 	/* copy the GinTuple into the right memory context */
-	ctup = palloc(size);
-	memcpy(ctup, tuple, size);
+	ctup = palloc(tuplen);
+	memcpy(ctup, tuple, tuplen);
 
 	stup.tuple = ctup;
 	stup.datum1 = (Datum) 0;
@@ -903,7 +930,7 @@ tuplesort_putgintuple(Tuplesortstate *state, GinTuple *tuple, Size size)
 
 	/* GetMemoryChunkSpace is not supported for bump contexts */
 	if (TupleSortUseBumpTupleCxt(base->sortopt))
-		tuplen = MAXALIGN(size);
+		tuplen = MAXALIGN(tuplen);
 	else
 		tuplen = GetMemoryChunkSpace(ctup);
 
@@ -1923,19 +1950,63 @@ comparetup_index_gin(const SortTuple *a, const SortTuple *b,
 }
 
 static void
-writetup_index_gin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+_writetup_index_gin(Tuplesortstate *state, LogicalTape *tape, GinTuple *tup)
 {
 	TuplesortPublic *base = TuplesortstateGetPublic(state);
-	GinTuple   *tuple = (GinTuple *) stup->tuple;
-	unsigned int tuplen = tuple->tuplen;
+	unsigned int tuplen = tup->tuplen;
 
 	tuplen = tuplen + sizeof(tuplen);
+
 	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(tape, tuple, tuple->tuplen);
+	LogicalTapeWrite(tape, tup, tup->tuplen);
+
 	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
 		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
 }
 
+static void
+writetup_index_gin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	GinTuple   *otup;
+	GinTuple   *ntup = (GinTuple *) stup->tuple;
+	TuplesortIndexGinArg *arg = (TuplesortIndexGinArg *) base->arg;
+
+	Assert(PointerIsValid(arg));
+
+	if (GinBufferCanAddKey(arg->buffer, ntup))
+	{
+		GinBufferMergeTuple(arg->buffer, ntup);
+		return;
+	}
+
+	otup = GinBufferBuildTuple(arg->buffer);
+
+	_writetup_index_gin(state, tape, otup);
+
+	pfree(otup);
+
+	Assert(GinBufferCanAddKey(arg->buffer, ntup));
+
+	GinBufferMergeTuple(arg->buffer, ntup);
+}
+
+static void
+flushwrites_index_gin(Tuplesortstate *state, LogicalTape *tape)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	TuplesortIndexGinArg *arg = (TuplesortIndexGinArg *) base->arg;
+
+	if (!GinBufferIsEmpty(arg->buffer))
+	{
+		GinTuple   *tuple = GinBufferBuildTuple(arg->buffer);
+
+		_writetup_index_gin(state, tape, tuple);
+		pfree(tuple);
+		Assert(GinBufferIsEmpty(arg->buffer));
+	}
+}
+
 static void
 readtup_index_gin(Tuplesortstate *state, SortTuple *stup,
 				  LogicalTape *tape, unsigned int len)
@@ -1961,6 +2032,17 @@ readtup_index_gin(Tuplesortstate *state, SortTuple *stup,
 	stup->datum1 = (Datum) 0;
 }
 
+static void
+freestate_index_gin(Tuplesortstate *state)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	TuplesortIndexGinArg *arg = (TuplesortIndexGinArg *) base->arg;
+
+	Assert(arg != NULL);
+	Assert(GinBufferIsEmpty(arg->buffer));
+	GinBufferFree(arg->buffer);
+}
+
 /*
  * Routines specialized for DatumTuple case
  */
diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index 95d8805b66f..da4351c3d3d 100644
--- a/src/include/access/gin_private.h
+++ b/src/include/access/gin_private.h
@@ -478,6 +478,9 @@ extern int	ginPostingListDecodeAllSegmentsToTbm(GinPostingList *ptr, int len, TI
 
 extern ItemPointer ginPostingListDecodeAllSegments(GinPostingList *segment, int len,
 												   int *ndecoded_out);
+extern bool ginPostingListDecodeAllSegmentsInto(GinPostingList *segment, int len,
+												ItemPointer into, int capacity,
+												int *ndecoded_out);
 extern ItemPointer ginPostingListDecode(GinPostingList *plist, int *ndecoded_out);
 extern ItemPointer ginMergeItemPointers(ItemPointerData *a, uint32 na,
 										ItemPointerData *b, uint32 nb,
diff --git a/src/include/access/gin_tuple.h b/src/include/access/gin_tuple.h
index ce555031335..4de7b5c32b5 100644
--- a/src/include/access/gin_tuple.h
+++ b/src/include/access/gin_tuple.h
@@ -39,6 +39,16 @@ GinTupleGetFirst(GinTuple *tup)
 	return &list->first;
 }
 
+typedef struct GinBuffer GinBuffer;
+
 extern int	_gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup);
 
+extern GinBuffer *GinBufferInit(Relation index);
+extern bool GinBufferIsEmpty(GinBuffer *buffer);
+extern bool GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup);
+extern void GinBufferReset(GinBuffer *buffer);
+extern void GinBufferFree(GinBuffer *buffer);
+extern void GinBufferMergeTuple(GinBuffer *buffer, GinTuple *tup);
+extern GinTuple *GinBufferBuildTuple(GinBuffer *buffer);
+
 #endif							/* GIN_TUPLE_H */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index ef79f259f93..64176b23cbe 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -195,6 +195,14 @@ typedef struct
 	void		(*writetup) (Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 
+	/*
+	 * Flush any buffered writetup() writes.
+	 *
+	 * This is useful when writetup() buffers writes for more efficient use of
+	 * the tape's resources, e.g. when deduplicating or merging values.
+	 */
+	void		(*flushwrites) (Tuplesortstate *state, LogicalTape *tape);
+
 	/*
 	 * Function to read a stored tuple from tape back into memory. 'len' is
 	 * the already-read length of the stored tuple.  The tuple is allocated
@@ -461,7 +469,7 @@ extern void tuplesort_putindextuplevalues(Tuplesortstate *state,
 										  Relation rel, ItemPointer self,
 										  const Datum *values, const bool *isnull);
 extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size);
-extern void tuplesort_putgintuple(Tuplesortstate *state, GinTuple *tuple, Size size);
+extern void tuplesort_putgintuple(Tuplesortstate *state, struct GinTuple *tuple);
 extern void tuplesort_putdatum(Tuplesortstate *state, Datum val,
 							   bool isNull);
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9840060997f..522e98109ae 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3037,6 +3037,7 @@ TuplesortClusterArg
 TuplesortDatumArg
 TuplesortIndexArg
 TuplesortIndexBTreeArg
+TuplesortIndexGinArg
 TuplesortIndexHashArg
 TuplesortInstrumentation
 TuplesortMethod
-- 
2.48.1

From fb3d0d4f319a8e548a792a4ebbd47142d94f96af Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Tue, 25 Feb 2025 16:16:24 +0100
Subject: [PATCH v20250304 2/2] WIP: parallel inserts into GIN index

---
 src/backend/access/gin/gininsert.c            | 450 +++++++++++-------
 .../utils/activity/wait_event_names.txt       |   2 +
 2 files changed, 286 insertions(+), 166 deletions(-)

diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index e873443784a..750c0c3270d 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -26,7 +26,9 @@
 #include "miscadmin.h"
 #include "nodes/execnodes.h"
 #include "pgstat.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
+#include "storage/buffile.h"
 #include "storage/predicate.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/datum.h"
@@ -42,6 +44,11 @@
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xB000000000000004)
 #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xB000000000000005)
 
+/* The phases for parallel builds, used by build_barrier. */
+#define GIN_BUILD_INIT					0
+#define GIN_BUILD_SCAN					1
+#define GIN_BUILD_PARTITION				2
+
 /*
  * Status for index builds performed in parallel.  This is allocated in a
  * dynamic shared memory segment.
@@ -88,6 +95,9 @@ typedef struct GinBuildShared
 	double		reltuples;
 	double		indtuples;
 
+	Barrier		build_barrier;
+	SharedFileSet fileset;		/* space for shared temporary files */
+
 	/*
 	 * ParallelTableScanDescData data follows. Can't directly embed here, as
 	 * implementations of the parallel table scan desc interface might need
@@ -173,7 +183,6 @@ static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relati
 static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state);
 static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
 static double _gin_parallel_heapscan(GinBuildState *buildstate);
-static double _gin_parallel_merge(GinBuildState *buildstate);
 static void _gin_leader_participate_as_worker(GinBuildState *buildstate,
 											  Relation heap, Relation index);
 static void _gin_parallel_scan_and_build(GinBuildState *buildstate,
@@ -189,6 +198,12 @@ static GinTuple *_gin_build_tuple(OffsetNumber attrnum, unsigned char category,
 								  Datum key, int16 typlen, bool typbyval,
 								  ItemPointerData *items, uint32 nitems);
 
+static double _gin_partition_sorted_data(GinBuildState *state);
+static void _gin_parallel_insert(GinBuildState *state,
+								 GinBuildShared *gistshared,
+								 Relation heap, Relation index,
+								 bool progress);
+
 /*
  * Adds array of item pointers to tuple's posting list, or
  * creates posting tree and tuple pointing to tree in case
@@ -699,8 +714,12 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 									  maintenance_work_mem, coordinate,
 									  TUPLESORT_NONE);
 
-		/* scan the relation in parallel and merge per-worker results */
-		reltuples = _gin_parallel_merge(state);
+		/* partition the sorted data */
+		reltuples = _gin_partition_sorted_data(state);
+
+		/* do the insert for the leader's partition */
+		_gin_parallel_insert(state, state->bs_leader->ginshared,
+							 heap, index, true);
 
 		_gin_end_parallel(state->bs_leader, state);
 	}
@@ -989,6 +1008,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	ginshared->reltuples = 0.0;
 	ginshared->indtuples = 0.0;
 
+	/* used to wait for data to insert */
+	BarrierInit(&ginshared->build_barrier, scantuplesortstates);
+
+	/* Set up the space we'll use for shared temporary files. */
+	SharedFileSetInit(&ginshared->fileset, pcxt->seg);
+
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromGinBuildShared(ginshared),
 								  snapshot);
@@ -1056,6 +1081,11 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	 * sure that the failure-to-start case will not hang forever.
 	 */
 	WaitForParallelWorkersToAttach(pcxt);
+
+	/* wait for workers to read the data and add them to tuplesort */
+	if (BarrierArriveAndWait(&ginshared->build_barrier,
+							 WAIT_EVENT_GIN_BUILD_SCAN))
+		elog(LOG, "data scanned, leader continues");
 }
 
 /*
@@ -1069,6 +1099,8 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
 	/* Shutdown worker processes */
 	WaitForParallelWorkersToFinish(ginleader->pcxt);
 
+	SharedFileSetDeleteAll(&ginleader->ginshared->fileset);
+
 	/*
 	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
 	 * or we might get incomplete data.)
@@ -1713,169 +1745,6 @@ GinBufferCanAddKey(GinBuffer *buffer, GinTuple *tup)
 	return GinBufferKeyEquals(buffer, tup);
 }
 
-/*
- * Within leader, wait for end of heap scan and merge per-worker results.
- *
- * After waiting for all workers to finish, merge the per-worker results into
- * the complete index. The results from each worker are sorted by block number
- * (start of the page range). While combinig the per-worker results we merge
- * summaries for the same page range, and also fill-in empty summaries for
- * ranges without any tuples.
- *
- * Returns the total number of heap tuples scanned.
- */
-static double
-_gin_parallel_merge(GinBuildState *state)
-{
-	GinTuple   *tup;
-	Size		tuplen;
-	double		reltuples = 0;
-	GinBuffer  *buffer;
-
-	/* GIN tuples from workers, merged by leader */
-	double		numtuples = 0;
-
-	/* wait for workers to scan table and produce partial results */
-	reltuples = _gin_parallel_heapscan(state);
-
-	/* Execute the sort */
-	pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
-								 PROGRESS_GIN_PHASE_PERFORMSORT_2);
-
-	/* do the actual sort in the leader */
-	tuplesort_performsort(state->bs_sortstate);
-
-	/*
-	 * Initialize buffer to combine entries for the same key.
-	 *
-	 * The leader is allowed to use the whole maintenance_work_mem buffer to
-	 * combine data. The parallel workers already completed.
-	 */
-	buffer = GinBufferInit(state->ginstate.index);
-
-	/*
-	 * Set the progress target for the next phase.  Reset the block number
-	 * values set by table_index_build_scan
-	 */
-	{
-		const int	progress_index[] = {
-			PROGRESS_CREATEIDX_SUBPHASE,
-			PROGRESS_CREATEIDX_TUPLES_TOTAL,
-			PROGRESS_SCAN_BLOCKS_TOTAL,
-			PROGRESS_SCAN_BLOCKS_DONE
-		};
-		const int64 progress_vals[] = {
-			PROGRESS_GIN_PHASE_MERGE_2,
-			state->bs_numtuples,
-			0, 0
-		};
-
-		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
-	}
-
-	/*
-	 * Read the GIN tuples from the shared tuplesort, sorted by category and
-	 * key. That probably gives us order matching how data is organized in the
-	 * index.
-	 *
-	 * We don't insert the GIN tuples right away, but instead accumulate as
-	 * many TIDs for the same key as possible, and then insert that at once.
-	 * This way we don't need to decompress/recompress the posting lists, etc.
-	 */
-	while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL)
-	{
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * If the buffer can accept the new GIN tuple, just store it there and
-		 * we're done. If it's a different key (or maybe too much data) flush
-		 * the current contents into the index first.
-		 */
-		if (!GinBufferCanAddKey(buffer, tup))
-		{
-			/*
-			 * Buffer is not empty and it's storing a different key - flush
-			 * the data into the insert, and start a new entry for current
-			 * GinTuple.
-			 */
-			AssertCheckItemPointers(buffer);
-			Assert(!PointerIsValid(buffer->cached));
-
-			ginEntryInsert(&state->ginstate,
-						   buffer->attnum, buffer->key, buffer->category,
-						   buffer->items, buffer->nitems, &state->buildStats);
-
-			/* discard the existing data */
-			GinBufferReset(buffer);
-		}
-
-		/*
-		 * We're about to add a GIN tuple to the buffer - check the memory
-		 * limit first, and maybe write out some of the data into the index
-		 * first, if needed (and possible). We only flush the part of the TID
-		 * list that we know won't change, and only if there's enough data for
-		 * compression to work well.
-		 */
-		if (GinBufferShouldTrim(buffer, tup))
-		{
-			Assert(buffer->nfrozen > 0);
-
-			/*
-			 * Buffer is not empty and it's storing a different key - flush
-			 * the data into the insert, and start a new entry for current
-			 * GinTuple.
-			 */
-			AssertCheckItemPointers(buffer);
-			Assert(!PointerIsValid(buffer->cached));
-
-			ginEntryInsert(&state->ginstate,
-						   buffer->attnum, buffer->key, buffer->category,
-						   buffer->items, buffer->nfrozen, &state->buildStats);
-
-			/* truncate the data we've just discarded */
-			GinBufferTrim(buffer);
-		}
-
-		/*
-		 * Remember data for the current tuple (either remember the new key,
-		 * or append if to the existing data).
-		 */
-		GinBufferMergeTuple(buffer, tup);
-
-		if (buffer->cached)
-			GinBufferUnpackCached(buffer, 0);
-
-		/* Report progress */
-		pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
-									 ++numtuples);
-	}
-
-	/* flush data remaining in the buffer (for the last key) */
-	if (!GinBufferIsEmpty(buffer))
-	{
-		AssertCheckItemPointers(buffer);
-		Assert(!PointerIsValid(buffer->cached));
-
-		ginEntryInsert(&state->ginstate,
-					   buffer->attnum, buffer->key, buffer->category,
-					   buffer->items, buffer->nitems, &state->buildStats);
-
-		/* discard the existing data */
-		GinBufferReset(buffer);
-
-		/* Report progress */
-		pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
-									 ++numtuples);
-	}
-
-	/* relase all the memory */
-	GinBufferFree(buffer);
-
-	tuplesort_end(state->bs_sortstate);
-
-	return reltuples;
-}
-
 /*
  * Returns size of shared memory required to store state for a parallel
  * gin index build based on the snapshot its parallel scan will use.
@@ -2093,6 +1962,9 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 	/* Prepare to track buffer usage during parallel execution */
 	InstrStartParallelQuery();
 
+	/* attach to the fileset too */
+	SharedFileSetAttach(&ginshared->fileset, seg);
+
 	/*
 	 * Might as well use reliable figure when doling out maintenance_work_mem
 	 * (when requested number of workers were not launched, this will be
@@ -2103,6 +1975,20 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 	_gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort,
 								 heapRel, indexRel, sortmem, false);
 
+	/* wait for workers to read the data and add them to tuplesort */
+	if (BarrierArriveAndWait(&ginshared->build_barrier,
+							 WAIT_EVENT_GIN_BUILD_SCAN))
+		elog(LOG, "data scanned by workers, leader continues");
+
+	/* leader sorts and partitions the data */
+
+	/* wait for the leader to partition the data */
+	if (BarrierArriveAndWait(&ginshared->build_barrier,
+							 WAIT_EVENT_GIN_BUILD_PARTITION))
+		elog(LOG, "data partitioned by leader, worker continues");
+
+	_gin_parallel_insert(&buildstate, ginshared, heapRel, indexRel, false);
+
 	/* Report WAL/buffer usage during parallel execution */
 	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
 	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
@@ -2375,3 +2261,235 @@ _gin_compare_tuples(GinTuple *a, GinTuple *b, SortSupport ssup)
 	return ItemPointerCompare(GinTupleGetFirst(a),
 							  GinTupleGetFirst(b));
 }
+
+static double
+_gin_partition_sorted_data(GinBuildState *state)
+{
+	GinTuple   *tup;
+	Size		tuplen;
+	GinBuildShared *shared = state->bs_leader->ginshared;
+	BufFile   **files;
+	int64		fileidx = 0;
+	double		reltuples;
+
+	/* how many tuples per worker */
+	int64		worker_tuples = (state->indtuples / shared->scantuplesortstates) + 1;
+	int64		remaining = Min(worker_tuples, 1000);
+	int64		ntmp = 0;
+
+	/* wait for workers to scan table and produce partial results */
+	reltuples = _gin_parallel_heapscan(state);
+
+	/* do the actual sort in the leader */
+	tuplesort_performsort(state->bs_sortstate);
+
+	/* Allocate BufFiles, one for each participants. */
+	files = palloc0_array(BufFile *, shared->scantuplesortstates);
+
+	for (int i = 0; i < shared->scantuplesortstates; i++)
+	{
+		char		fname[MAXPGPATH];
+
+		sprintf(fname, "worker-%d", i);
+
+		files[i] = BufFileCreateFileSet(&shared->fileset.fs, fname);
+	}
+
+	/*
+	 * Read the GIN tuples from the shared tuplesort, sorted by category and
+	 * key. That probably gives us order matching how data is organized in the
+	 * index.
+	 *
+	 * We don't insert the GIN tuples right away, but instead accumulate as
+	 * many TIDs for the same key as possible, and then insert that at once.
+	 * This way we don't need to decompress/recompress the posting lists, etc.
+	 *
+	 * XXX Maybe we should sort by key first, then by category? The idea is
+	 * that if this matches the order of the keys in the index, we'd insert
+	 * the entries in order better matching the index.
+	 */
+	while ((tup = tuplesort_getgintuple(state->bs_sortstate, &tuplen, true)) != NULL)
+	{
+		ntmp++;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * FIXME Maybe move to next partition only when the index key changes?
+		 * Otherwise we might have issues with 'could not fit onto page' when
+		 * adding overlapping TID lists to the index. But maybe it can't with
+		 * the merging of data in the tuplesort?
+		 */
+
+		BufFileWrite(files[fileidx], &tuplen, sizeof(tuplen));
+		BufFileWrite(files[fileidx], tup, tuplen);
+
+		remaining--;
+
+		/* move to the next file */
+		if (remaining == 0)
+		{
+			remaining = Min(worker_tuples, 1000);
+			fileidx++;
+			fileidx = fileidx % shared->scantuplesortstates;
+		}
+	}
+
+	/* close the files */
+	for (int i = 0; i < shared->scantuplesortstates; i++)
+	{
+		BufFileClose(files[i]);
+	}
+
+	/* and also close the tuplesort */
+	tuplesort_end(state->bs_sortstate);
+
+	/* wait for the leader to partition the data */
+	if (BarrierArriveAndWait(&shared->build_barrier,
+							 WAIT_EVENT_GIN_BUILD_PARTITION))
+		elog(LOG, "data partitioned, leader continues");
+
+	return reltuples;
+}
+
+static void
+_gin_parallel_insert(GinBuildState *state, GinBuildShared *ginshared,
+					 Relation heap, Relation index, bool progress)
+{
+	GinBuffer  *buffer;
+	GinTuple   *tup;
+	Size		len;
+
+	BufFile    *file;
+	char		fname[MAXPGPATH];
+	char	   *buff;
+	int64		ntuples = 0;
+	Size		maxlen;
+
+	/*
+	 * Initialize buffer to combine entries for the same key.
+	 *
+	 * The leader is allowed to use the whole maintenance_work_mem buffer to
+	 * combine data. The parallel workers already completed.
+	 */
+	buffer = GinBufferInit(state->ginstate.index);
+
+
+	sprintf(fname, "worker-%d", ParallelWorkerNumber + 1);
+	file = BufFileOpenFileSet(&ginshared->fileset.fs, fname, O_RDONLY, false);
+
+	/* 8kB seems like a reasonable starting point */
+	maxlen = 8192;
+	buff = palloc(maxlen);
+
+	while (true)
+	{
+		size_t		ret;
+
+		ret = BufFileRead(file, &len, sizeof(len));
+
+		if (ret == 0)
+			break;
+		if (ret != sizeof(len))
+			elog(ERROR, "incorrect data %zu %zu", ret, sizeof(len));
+
+		/* maybe resize the buffer */
+		if (maxlen < len)
+		{
+			while (maxlen < len)
+				maxlen *= 2;
+
+			buff = repalloc(buff, maxlen);
+		}
+
+		tup = (GinTuple *) buff;
+
+
+		BufFileReadExact(file, tup, len);
+
+		ntuples++;
+
+		if (ntuples % 100000 == 0)
+			elog(LOG, "inserted " INT64_FORMAT " tuples", ntuples);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * If the buffer can accept the new GIN tuple, just store it there and
+		 * we're done. If it's a different key (or maybe too much data) flush
+		 * the current contents into the index first.
+		 */
+		if (!GinBufferCanAddKey(buffer, tup))
+		{
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer);
+			Assert(!PointerIsValid(buffer->cached));
+
+			ginEntryInsert(&state->ginstate,
+						   buffer->attnum, buffer->key, buffer->category,
+						   buffer->items, buffer->nitems, &state->buildStats);
+
+			/* discard the existing data */
+			GinBufferReset(buffer);
+		}
+
+		/*
+		 * We're about to add a GIN tuple to the buffer - check the memory
+		 * limit first, and maybe write out some of the data into the index
+		 * first, if needed (and possible). We only flush the part of the TID
+		 * list that we know won't change, and only if there's enough data for
+		 * compression to work well.
+		 */
+		if (GinBufferShouldTrim(buffer, tup))
+		{
+			Assert(buffer->nfrozen > 0);
+
+			/*
+			 * Buffer is not empty and it's storing a different key - flush
+			 * the data into the insert, and start a new entry for current
+			 * GinTuple.
+			 */
+			AssertCheckItemPointers(buffer);
+			Assert(!PointerIsValid(buffer->cached));
+
+			ginEntryInsert(&state->ginstate,
+						   buffer->attnum, buffer->key, buffer->category,
+						   buffer->items, buffer->nfrozen, &state->buildStats);
+
+			/* truncate the data we've just discarded */
+			GinBufferTrim(buffer);
+		}
+
+		/*
+		 * Remember data for the current tuple (either remember the new key,
+		 * or append if to the existing data).
+		 */
+		GinBufferMergeTuple(buffer, tup);
+
+		if (buffer->cached)
+			GinBufferUnpackCached(buffer, 0);
+	}
+
+	/* flush data remaining in the buffer (for the last key) */
+	if (!GinBufferIsEmpty(buffer))
+	{
+		AssertCheckItemPointers(buffer);
+
+		Assert(!PointerIsValid(buffer->cached));
+		ginEntryInsert(&state->ginstate,
+					   buffer->attnum, buffer->key, buffer->category,
+					   buffer->items, buffer->nitems, &state->buildStats);
+
+		/* discard the existing data */
+		GinBufferReset(buffer);
+	}
+
+	/* relase all the memory */
+	GinBufferFree(buffer);
+
+	BufFileClose(file);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index e199f071628..afb9be848a0 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -116,6 +116,8 @@ CHECKPOINT_DELAY_START	"Waiting for a backend that blocks a checkpoint from star
 CHECKPOINT_DONE	"Waiting for a checkpoint to complete."
 CHECKPOINT_START	"Waiting for a checkpoint to start."
 EXECUTE_GATHER	"Waiting for activity from a child process while executing a <literal>Gather</literal> plan node."
+GIN_BUILD_SCAN	"Wait for scan of data during parallel GIN index build."
+GIN_BUILD_PARTITION	"Wait for partition of data during parallel GIN index build."
 HASH_BATCH_ALLOCATE	"Waiting for an elected Parallel Hash participant to allocate a hash table."
 HASH_BATCH_ELECT	"Waiting to elect a Parallel Hash participant to allocate a hash table."
 HASH_BATCH_LOAD	"Waiting for other Parallel Hash participants to finish loading a hash table."
-- 
2.48.1

Reply via email to