On Tue, Oct 6, 2020 at 10:38 PM Greg Nancarrow <gregn4...@gmail.com> wrote:
+            if (estate->es_plannedstmt->commandType == CMD_INSERT)
...
+    if ((XactReadOnly || (IsInParallelMode() &&
queryDesc->plannedstmt->commandType != CMD_INSERT)) &&
...
+        isParallelInsertLeader = nodeModifyTableState->operation == CMD_INSERT;
...

One thing I noticed is that you have logic, variable names and
assertions all over the tree that assume that we can only do parallel
*inserts*.  I agree 100% with your plan to make Parallel Insert work
first, it is an excellent goal and if we get it in it'll be a headline
feature of PG14 (along with COPY etc).  That said, I wonder if it
would make sense to use more general naming (isParallelModifyLeader?),
be more liberal where you really mean "is it DML", and find a way to
centralise the logic about which DML commands types are currently
allowed (ie insert only for now) for assertions and error checks etc,
so that in future we don't have to go around and change all these
places and rename things again and again.

While contemplating that, I couldn't resist taking a swing at the main
(?) show stopper for Parallel Update and Parallel Delete, judging by
various clues left in code comments by Robert: combo command IDs
created by other processes.  Here's a rapid prototype to make that
work (though perhaps not as efficiently as we'd want, not sure).  With
that in place, I wonder what else we'd need to extend your patch to
cover all three operations... it can't be much!  Of course I don't
want to derail your work on Parallel Insert, I'm just providing some
motivation for my comments on the (IMHO) shortsightedness of some of
the coding.

PS Why not use git format-patch to create patches?
From ad2b5e07a09603b09859dfcbde6addd51096cbb0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 9 Oct 2020 00:27:07 +1300
Subject: [PATCH] Coordinate combo command IDs with parallel workers.

Previously, we would serialize the leader's combo command IDs and
restore a read-only copy of them in worker processes, not allowing
updates.  Instead, migrate them into shared memory, in preparation for
parallel update/delete queries where new combo command IDs might need to
be created in any process and visible to others.

XXX This design causes every backend to maintain its own deduplication
hash table, and requires a shared lock to look up any combocid.  Both
policies could be reconsidered, basically a memory size vs locking
tradeoff.  Need some experience/profiling of real work to see how much
any of this really matters.

Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV%3DqpFJrR3AcrTS3g%40mail.gmail.com
---
 src/backend/access/common/session.c        |  28 +-
 src/backend/access/heap/heapam.c           |  10 -
 src/backend/access/transam/README.parallel |   6 -
 src/backend/access/transam/parallel.c      |  14 -
 src/backend/storage/lmgr/lwlock.c          |   2 +
 src/backend/utils/time/combocid.c          | 290 ++++++++++++++++-----
 src/include/access/session.h               |  20 +-
 src/include/storage/lwlock.h               |   1 +
 src/include/utils/combocid.h               |   8 +-
 9 files changed, 275 insertions(+), 104 deletions(-)

diff --git a/src/backend/access/common/session.c b/src/backend/access/common/session.c
index 0ec61d48a2..7e1bffb680 100644
--- a/src/backend/access/common/session.c
+++ b/src/backend/access/common/session.c
@@ -23,6 +23,7 @@
 #include "access/session.h"
 #include "storage/lwlock.h"
 #include "storage/shm_toc.h"
+#include "utils/combocid.h"
 #include "utils/memutils.h"
 #include "utils/typcache.h"
 
@@ -43,6 +44,7 @@
  */
 #define SESSION_KEY_DSA						UINT64CONST(0xFFFFFFFFFFFF0001)
 #define SESSION_KEY_RECORD_TYPMOD_REGISTRY	UINT64CONST(0xFFFFFFFFFFFF0002)
+#define SESSION_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0003)
 
 /* This backend's current session. */
 Session    *CurrentSession = NULL;
@@ -74,8 +76,10 @@ GetSessionDsmHandle(void)
 	dsm_segment *seg;
 	size_t		typmod_registry_size;
 	size_t		size;
+	void	   *fixed_space;
 	void	   *dsa_space;
 	void	   *typmod_registry_space;
+	SessionFixed *fixed;
 	dsa_area   *dsa;
 	MemoryContext old_context;
 
@@ -91,6 +95,10 @@ GetSessionDsmHandle(void)
 	old_context = MemoryContextSwitchTo(TopMemoryContext);
 	shm_toc_initialize_estimator(&estimator);
 
+	/* Estimate size for the fixed-sized per-session state. */
+	shm_toc_estimate_keys(&estimator, 1);
+	shm_toc_estimate_chunk(&estimator, sizeof(SessionFixed));
+
 	/* Estimate space for the per-session DSA area. */
 	shm_toc_estimate_keys(&estimator, 1);
 	shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE);
@@ -113,6 +121,14 @@ GetSessionDsmHandle(void)
 						 dsm_segment_address(seg),
 						 size);
 
+	/* Create the simple fixed-sized session state. */
+	fixed_space = shm_toc_allocate(toc, sizeof(SessionFixed));
+	fixed = (SessionFixed *) fixed_space;
+	memset(fixed, 0, sizeof(*fixed));
+	LWLockInitialize(&fixed->shared_combocid_lock, LWTRANCHE_SHARED_COMBOCID);
+	shm_toc_insert(toc, SESSION_KEY_FIXED, fixed_space);
+	CurrentSession->fixed = fixed;
+
 	/* Create per-session DSA area. */
 	dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE);
 	dsa = dsa_create_in_place(dsa_space,
@@ -121,7 +137,6 @@ GetSessionDsmHandle(void)
 							  seg);
 	shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space);
 
-
 	/* Create session-scoped shared record typmod registry. */
 	typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size);
 	SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
@@ -129,6 +144,9 @@ GetSessionDsmHandle(void)
 	shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY,
 				   typmod_registry_space);
 
+	/* Initialize shared commmand ids. */
+	SharedComboCidRegistryInit(seg, dsa);
+
 	/*
 	 * If we got this far, we can pin the shared memory so it stays mapped for
 	 * the rest of this backend's life.  If we don't make it this far, cleanup
@@ -156,6 +174,7 @@ AttachSession(dsm_handle handle)
 {
 	dsm_segment *seg;
 	shm_toc    *toc;
+	void	   *fixed_space;
 	void	   *dsa_space;
 	void	   *typmod_registry_space;
 	dsa_area   *dsa;
@@ -177,12 +196,19 @@ AttachSession(dsm_handle handle)
 	CurrentSession->segment = seg;
 	CurrentSession->area = dsa;
 
+	/* Attach to the "fixed sized" data region. */
+	fixed_space = shm_toc_lookup(toc, SESSION_KEY_FIXED, false);
+	CurrentSession->fixed = (SessionFixed *) fixed_space;
+
 	/* Attach to the shared record typmod registry. */
 	typmod_registry_space =
 		shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false);
 	SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
 									 typmod_registry_space);
 
+	/* Attach to the shared combo CID registry. */
+	SharedComboCidRegistryAttach();
+
 	/* Remain attached until end of backend or DetachSession(). */
 	dsm_pin_mapping(seg);
 	dsa_pin_mapping(dsa);
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861a02..416ff00510 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2462,11 +2462,6 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	Assert(ItemPointerIsValid(tid));
 
-	/*
-	 * Forbid this during a parallel operation, lest it allocate a combocid.
-	 * Other workers might need that combocid for visibility checks, and we
-	 * have no provision for broadcasting it to them.
-	 */
 	if (IsInParallelMode())
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
@@ -2934,11 +2929,6 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 
 	Assert(ItemPointerIsValid(otid));
 
-	/*
-	 * Forbid this during a parallel operation, lest it allocate a combocid.
-	 * Other workers might need that combocid for visibility checks, and we
-	 * have no provision for broadcasting it to them.
-	 */
 	if (IsInParallelMode())
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index 99c588d6dc..d78249bd8a 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -105,12 +105,6 @@ worker.  This includes:
     checks return the same results in the worker as they do in the
     initiating backend.  See also the section Transaction Integration, below.
 
-  - The combo CID mappings.  This is needed to ensure consistent answers to
-    tuple visibility checks.  The need to synchronize this data structure is
-    a major reason why we can't support writes in parallel mode: such writes
-    might create new combo CIDs, and we have no way to let other workers
-    (or the initiating backend) know about them.
-
   - The transaction snapshot.
 
   - The active snapshot, which might be different from the transaction
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b0426960c7..f643cc28d3 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -204,7 +204,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	MemoryContext oldcontext;
 	Size		library_len = 0;
 	Size		guc_len = 0;
-	Size		combocidlen = 0;
 	Size		tsnaplen = 0;
 	Size		asnaplen = 0;
 	Size		tstatelen = 0;
@@ -252,8 +251,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		shm_toc_estimate_chunk(&pcxt->estimator, library_len);
 		guc_len = EstimateGUCStateSpace();
 		shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
-		combocidlen = EstimateComboCIDStateSpace();
-		shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
 		tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
 		shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
 		asnaplen = EstimateSnapshotSpace(active_snapshot);
@@ -338,7 +335,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	{
 		char	   *libraryspace;
 		char	   *gucspace;
-		char	   *combocidspace;
 		char	   *tsnapspace;
 		char	   *asnapspace;
 		char	   *tstatespace;
@@ -361,11 +357,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		SerializeGUCState(guc_len, gucspace);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
 
-		/* Serialize combo CID state. */
-		combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
-		SerializeComboCIDState(combocidlen, combocidspace);
-		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
-
 		/* Serialize transaction snapshot and active snapshot. */
 		tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
 		SerializeSnapshot(transaction_snapshot, tsnapspace);
@@ -1250,7 +1241,6 @@ ParallelWorkerMain(Datum main_arg)
 	char	   *function_name;
 	parallel_worker_main_type entrypt;
 	char	   *gucspace;
-	char	   *combocidspace;
 	char	   *tsnapspace;
 	char	   *asnapspace;
 	char	   *tstatespace;
@@ -1398,10 +1388,6 @@ ParallelWorkerMain(Datum main_arg)
 	tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
 	StartParallelWorkerTransaction(tstatespace);
 
-	/* Restore combo CID state. */
-	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
-	RestoreComboCIDState(combocidspace);
-
 	/* Attach to the per-session DSM segment and contained objects. */
 	session_dsm_handle_space =
 		shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 2fa90cc095..1a254d1af8 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -170,6 +170,8 @@ static const char *const BuiltinTrancheNames[] = {
 	"PerSessionRecordType",
 	/* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
 	"PerSessionRecordTypmod",
+	/* LWTRANCHE_SHARED_COMBOCID: */
+	"SharedComboCid",
 	/* LWTRANCHE_SHARED_TUPLESTORE: */
 	"SharedTupleStore",
 	/* LWTRANCHE_SHARED_TIDBITMAP: */
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
index 4ee9ef0ffe..e39a179f76 100644
--- a/src/backend/utils/time/combocid.c
+++ b/src/backend/utils/time/combocid.c
@@ -42,6 +42,8 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/parallel.h"
+#include "access/session.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/shmem.h"
@@ -61,6 +63,19 @@ typedef struct
 
 typedef ComboCidKeyData *ComboCidKey;
 
+/*
+ * Shared memory version of the array for use in parallel queries.  For now we
+ * don't have a shared memory hash table, we just let each backend deduplicate
+ * as much as it can, but all participating backends can decode each other's
+ * combo CIDs through this structure.  It is protected by shared_combocid_lock.
+ */
+typedef struct SharedComboCidRegistry
+{
+	size_t		size;
+	size_t		used;
+	ComboCidKeyData	combocids[FLEXIBLE_ARRAY_MEMBER];
+} SharedComboCidRegistry;
+
 typedef struct
 {
 	ComboCidKeyData key;
@@ -69,6 +84,7 @@ typedef struct
 
 typedef ComboCidEntryData *ComboCidEntry;
 
+
 /* Initial size of the hash table */
 #define CCID_HASH_SIZE			100
 
@@ -81,12 +97,19 @@ static ComboCidKey comboCids = NULL;
 static int	usedComboCids = 0;	/* number of elements in comboCids */
 static int	sizeComboCids = 0;	/* allocated size of array */
 
+/*
+ * For the shared memory version of the above, used for parallel queries, see
+ * session.h.
+ */
+
 /* Initial size of the array */
 #define CCID_ARRAY_SIZE			100
 
 
 /* prototypes for internal functions */
 static CommandId GetComboCommandId(CommandId cmin, CommandId cmax);
+static CommandId GetSharedComboCommandId(CommandId cmin, CommandId cmax);
+static CommandId GetLocalComboCommandId(CommandId cmin, CommandId cmax);
 static CommandId GetRealCmin(CommandId combocid);
 static CommandId GetRealCmax(CommandId combocid);
 
@@ -190,6 +213,15 @@ AtEOXact_ComboCid(void)
 	comboCids = NULL;
 	usedComboCids = 0;
 	sizeComboCids = 0;
+
+	/*
+	 * If we're attached a shared registry, the leader marks it empty, but
+	 * we'll keep the memory around for use by future transactions.
+	 */
+	if (!IsParallelWorker() &&
+		CurrentSession &&
+		CurrentSession->shared_combocid_registry)
+		CurrentSession->shared_combocid_registry->used = 0;
 }
 
 
@@ -198,16 +230,15 @@ AtEOXact_ComboCid(void)
 /*
  * Get a combo command id that maps to cmin and cmax.
  *
- * We try to reuse old combo command ids when possible.
+ * We try to reuse old combo command ids when possible, but for now we only
+ * consider combos created by this backend.  Another process in the same
+ * parallel query could generate a distinct different combo command IDs for the
+ * same transaction, but all processes will be able to understand that combo
+ * command ID.
  */
 static CommandId
 GetComboCommandId(CommandId cmin, CommandId cmax)
 {
-	CommandId	combocid;
-	ComboCidKeyData key;
-	ComboCidEntry entry;
-	bool		found;
-
 	/*
 	 * Create the hash table and array the first time we need to use combo
 	 * cids in the transaction.
@@ -234,6 +265,99 @@ GetComboCommandId(CommandId cmin, CommandId cmax)
 								HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 	}
 
+	if (CurrentSession->shared_combocid_registry)
+		return GetSharedComboCommandId(cmin, cmax);
+	else
+		return GetLocalComboCommandId(cmin, cmax);
+}
+
+static CommandId
+GetSharedComboCommandId(CommandId cmin, CommandId cmax)
+{
+	CommandId	combocid;
+	ComboCidKeyData key;
+	ComboCidEntry entry;
+	bool		found;
+
+	/* Check if we already have it before taking any locks. */
+	key.cmin = cmin;
+	key.cmax = cmax;
+	entry = (ComboCidEntry) hash_search(comboHash,
+										&key,
+										HASH_ENTER,
+										&found);
+	if (found)
+		return entry->combocid;
+
+	/* We'll create a new one in shared memory. */
+	LWLockAcquire(&CurrentSession->fixed->shared_combocid_lock, LW_EXCLUSIVE);
+
+	/* If the shared memory array is already full, we'll have to expand it. */
+	if (CurrentSession->shared_combocid_registry->used ==
+		CurrentSession->shared_combocid_registry->size)
+	{
+		SharedComboCidRegistry *old_data;
+		SharedComboCidRegistry *new_data;
+		dsa_pointer	new_data_dsa;
+		size_t		new_size;
+
+		/* Double the size of the array. */
+		new_size = CurrentSession->shared_combocid_registry->size * 2;
+		new_data_dsa = dsa_allocate_extended(CurrentSession->area,
+											 offsetof(SharedComboCidRegistry,
+													  combocids) +
+											 sizeof(ComboCidKeyData) *
+											 new_size,
+											 DSA_ALLOC_NO_OOM);
+		if (new_data_dsa == InvalidDsaPointer)
+		{
+			/* Undo the new hash table entry. */
+			hash_search(comboHash, &key, HASH_REMOVE, &found);
+			LWLockRelease(&CurrentSession->fixed->shared_combocid_lock);
+			elog(ERROR, "out of memory");
+		}
+
+		/* Copy the old contents into the new array. */
+		old_data = CurrentSession->shared_combocid_registry;
+		new_data = (SharedComboCidRegistry *)
+			dsa_get_address(CurrentSession->area, new_data_dsa);
+		memcpy(new_data, old_data, offsetof(SharedComboCidRegistry,
+											combocids) +
+											sizeof(ComboCidKeyData) *
+											old_data->used);
+		new_data->size = new_size;
+
+		/* Free the old array. */
+		dsa_free(CurrentSession->area,
+				 CurrentSession->fixed->shared_combocid_registry_dsa);
+
+		/* Advertise the new array for other backends to notice. */
+		CurrentSession->fixed->shared_combocid_registry_dsa = new_data_dsa;
+		CurrentSession->fixed->shared_combocid_change++;
+
+		/* Set our own local pointer so we can access it. */
+		CurrentSession->shared_combocid_registry = new_data;
+	}
+
+	/* Now we can add a new entry. */
+	combocid = CurrentSession->shared_combocid_registry->used++;
+	CurrentSession->shared_combocid_registry->combocids[combocid].cmin = cmin;
+	CurrentSession->shared_combocid_registry->combocids[combocid].cmax = cmax;
+	entry->combocid = combocid;
+
+	LWLockRelease(&CurrentSession->fixed->shared_combocid_lock);
+
+	return combocid;
+}
+
+static CommandId
+GetLocalComboCommandId(CommandId cmin, CommandId cmax)
+{
+	CommandId	combocid;
+	ComboCidKeyData key;
+	ComboCidEntry entry;
+	bool		found;
+
 	/*
 	 * Grow the array if there's not at least one free slot.  We must do this
 	 * before possibly entering a new hashtable entry, else failure to
@@ -276,90 +400,118 @@ GetComboCommandId(CommandId cmin, CommandId cmax)
 	return combocid;
 }
 
+/*
+ * Another backend could have replaced the array in order to expand it.  Make
+ * sure that CurrentSession->shared_combocid_registry points to the current
+ * one.
+ */
+static inline void
+ensure_shared_combocid_registry(void)
+{
+	if (unlikely(CurrentSession->fixed->shared_combocid_change !=
+				 CurrentSession->shared_combocid_change))
+	{
+		CurrentSession->shared_combocid_registry = (SharedComboCidRegistry *)
+			dsa_get_address(CurrentSession->area,
+							CurrentSession->fixed->shared_combocid_registry_dsa);
+		CurrentSession->shared_combocid_change =
+			CurrentSession->fixed->shared_combocid_change;
+	}
+}
+
 static CommandId
 GetRealCmin(CommandId combocid)
 {
-	Assert(combocid < usedComboCids);
-	return comboCids[combocid].cmin;
+	if (CurrentSession->shared_combocid_registry)
+	{
+		CommandId	result;
+
+		/*
+		 * XXX Use local comboCids array as a cache, to avoid acquiring the
+		 * lock?
+		 */
+
+		LWLockAcquire(&CurrentSession->fixed->shared_combocid_lock, LW_SHARED);
+		ensure_shared_combocid_registry();
+		result = CurrentSession->shared_combocid_registry->combocids[combocid].cmin;
+		LWLockRelease(&CurrentSession->fixed->shared_combocid_lock);
+
+		return result;
+
+	}
+	else
+	{
+		Assert(combocid < usedComboCids);
+		return comboCids[combocid].cmin;
+	}
 }
 
 static CommandId
 GetRealCmax(CommandId combocid)
 {
-	Assert(combocid < usedComboCids);
-	return comboCids[combocid].cmax;
+	if (CurrentSession->shared_combocid_registry)
+	{
+		CommandId	result;
+
+		LWLockAcquire(&CurrentSession->fixed->shared_combocid_lock, LW_SHARED);
+		ensure_shared_combocid_registry();
+		result = CurrentSession->shared_combocid_registry->combocids[combocid].cmax;
+		LWLockRelease(&CurrentSession->fixed->shared_combocid_lock);
+
+		return result;
+	}
+	else
+	{
+		Assert(combocid < usedComboCids);
+		return comboCids[combocid].cmax;
+	}
 }
 
-/*
- * Estimate the amount of space required to serialize the current ComboCID
- * state.
- */
-Size
-EstimateComboCIDStateSpace(void)
+void
+SharedComboCidRegistryInit(dsm_segment *seg, dsa_area *area)
 {
-	Size		size;
+	SharedComboCidRegistry *new_data;
+	dsa_pointer		new_data_dsa;
+	size_t			new_size;
 
-	/* Add space required for saving usedComboCids */
-	size = sizeof(int);
+	/*
+	 * No need to acquire the lock, because during initialization no workers
+	 * are running yet.
+	 */
 
-	/* Add space required for saving the combocids key */
-	size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+	new_size = Min(usedComboCids, 16);
 
-	return size;
-}
+	new_data_dsa = dsa_allocate(area,
+								offsetof(SharedComboCidRegistry,
+										 combocids) +
+								sizeof(ComboCidKeyData) * new_size);
+	new_data = (SharedComboCidRegistry *) dsa_get_address(area, new_data_dsa);
 
-/*
- * Serialize the ComboCID state into the memory, beginning at start_address.
- * maxsize should be at least as large as the value returned by
- * EstimateComboCIDStateSpace.
- */
-void
-SerializeComboCIDState(Size maxsize, char *start_address)
-{
-	char	   *endptr;
+	/*
+	 * Copy all existing combos into shared memory.  From now on this session
+	 * will be using shared memory for this.
+	 */
+	new_data->size = new_size;
+	new_data->used = usedComboCids;
+	memcpy(&new_data->combocids, comboCids,
+		   sizeof(ComboCidKeyData) * usedComboCids);
 
-	/* First, we store the number of currently-existing ComboCIDs. */
-	*(int *) start_address = usedComboCids;
+	/* Advertise the new array for other backends to notice. */
+	CurrentSession->fixed->shared_combocid_registry_dsa = new_data_dsa;
+	CurrentSession->fixed->shared_combocid_change = 1;
 
-	/* If maxsize is too small, throw an error. */
-	endptr = start_address + sizeof(int) +
-		(sizeof(ComboCidKeyData) * usedComboCids);
-	if (endptr < start_address || endptr > start_address + maxsize)
-		elog(ERROR, "not enough space to serialize ComboCID state");
+	/* Set our own local pointer so we can access it. */
+	CurrentSession->shared_combocid_registry = new_data;
 
-	/* Now, copy the actual cmin/cmax pairs. */
-	if (usedComboCids > 0)
-		memcpy(start_address + sizeof(int), comboCids,
-			   (sizeof(ComboCidKeyData) * usedComboCids));
+	/* XXX install cleanup callback? */
 }
 
-/*
- * Read the ComboCID state at the specified address and initialize this
- * backend with the same ComboCIDs.  This is only valid in a backend that
- * currently has no ComboCIDs (and only makes sense if the transaction state
- * is serialized and restored as well).
- */
 void
-RestoreComboCIDState(char *comboCIDstate)
+SharedComboCidRegistryAttach(void)
 {
-	int			num_elements;
-	ComboCidKeyData *keydata;
-	int			i;
-	CommandId	cid;
+	LWLockAcquire(&CurrentSession->fixed->shared_combocid_lock, LW_SHARED);
+	ensure_shared_combocid_registry();
+	LWLockRelease(&CurrentSession->fixed->shared_combocid_lock);
 
-	Assert(!comboCids && !comboHash);
-
-	/* First, we retrieve the number of ComboCIDs that were serialized. */
-	num_elements = *(int *) comboCIDstate;
-	keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
-
-	/* Use GetComboCommandId to restore each ComboCID. */
-	for (i = 0; i < num_elements; i++)
-	{
-		cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
-
-		/* Verify that we got the expected answer. */
-		if (cid != i)
-			elog(ERROR, "unexpected command ID while restoring combo CIDs");
-	}
+	/* XXX install cleanup callback? */
 }
diff --git a/src/include/access/session.h b/src/include/access/session.h
index 4c1f6ffd40..9d42546098 100644
--- a/src/include/access/session.h
+++ b/src/include/access/session.h
@@ -13,10 +13,22 @@
 #define SESSION_H
 
 #include "lib/dshash.h"
+#include "storage/lwlock.h"
 
-/* Avoid including typcache.h */
+struct SharedComboCidLock;
 struct SharedRecordTypmodRegistry;
 
+/*
+ * Part of the session object that is of fixed size in shared memory.
+ */
+typedef struct SessionFixed
+{
+	/* State managed by combocid.c. */
+	LWLock		shared_combocid_lock;
+	dsa_pointer	shared_combocid_registry_dsa;
+	int			shared_combocid_change;
+} SessionFixed;
+
 /*
  * A struct encapsulating some elements of a user's session.  For now this
  * manages state that applies to parallel query, but in principle it could
@@ -27,6 +39,12 @@ typedef struct Session
 	dsm_segment *segment;		/* The session-scoped DSM segment. */
 	dsa_area   *area;			/* The session-scoped DSA area. */
 
+	SessionFixed *fixed;
+
+	/* State managed by combocid.c. */
+	struct SharedComboCidRegistry *shared_combocid_registry;
+	int			shared_combocid_change;
+
 	/* State managed by typcache.c. */
 	struct SharedRecordTypmodRegistry *shared_typmod_registry;
 	dshash_table *shared_record_table;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index af9b41795d..780eb44b5b 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -214,6 +214,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PER_SESSION_DSA,
 	LWTRANCHE_PER_SESSION_RECORD_TYPE,
 	LWTRANCHE_PER_SESSION_RECORD_TYPMOD,
+	LWTRANCHE_SHARED_COMBOCID,
 	LWTRANCHE_SHARED_TUPLESTORE,
 	LWTRANCHE_SHARED_TIDBITMAP,
 	LWTRANCHE_PARALLEL_APPEND,
diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h
index b39166a685..b13e44e1fc 100644
--- a/src/include/utils/combocid.h
+++ b/src/include/utils/combocid.h
@@ -14,6 +14,9 @@
 #ifndef COMBOCID_H
 #define COMBOCID_H
 
+#include "storage/dsm.h"
+#include "utils/dsa.h"
+
 /*
  * HeapTupleHeaderGetCmin and HeapTupleHeaderGetCmax function prototypes
  * are in access/htup.h, because that's where the macro definitions that
@@ -21,8 +24,7 @@
  */
 
 extern void AtEOXact_ComboCid(void);
-extern void RestoreComboCIDState(char *comboCIDstate);
-extern void SerializeComboCIDState(Size maxsize, char *start_address);
-extern Size EstimateComboCIDStateSpace(void);
+extern void SharedComboCidRegistryInit(dsm_segment *seg, dsa_area *area);
+extern void SharedComboCidRegistryAttach(void);
 
 #endif							/* COMBOCID_H */
-- 
2.20.1

Reply via email to