On Tue, Aug 27, 2019 at 6:35 AM Andres Freund <and...@anarazel.de> wrote:
> On 2019-08-26 14:09:45 -0400, Robert Haas wrote:
> > There's a comment in htup.h which says:
> >
> >  * * Separately allocated tuple: t_data points to a palloc'd chunk that
> >  *       is not adjacent to the HeapTupleData.  (This case is deprecated 
> > since
> >  *       it's difficult to tell apart from case #1.  It should be used only 
> > in
> >  *       limited contexts where the code knows that case #1 will never 
> > apply.)
> >
> > I got scared and ran away.
>
> Perhaps this'd could be sidestepped by funneling through MinimalTuples
> instead of HeapTuples. Afaict that should always be sufficient, because
> all system column accesses ought to happen below (including being
> projected into a separate column, if needed above). With the added
> benefit of needing less space, of course.

I tried that out (attached).  That makes various simple tests like
this to go 10%+ faster on my development machine:

  create table s as select generate_series(1, 50000000)::int i,
                           'hello world' a,
                           'this is a message' b,
                           42 c;
  select pg_prewarm('s');
  set force_parallel_mode = on;

  explain analyze select * from s;

PS  It looks like the following load of mq_ring_size might be running
a little hot due to false sharing with the atomic counters:

       if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
From 0ba21ee67c9e8f2be404fa7e16d30a815310c52a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 14 May 2020 19:08:50 +1200
Subject: [PATCH] Use MinimalTuple for tuple queues.

This saves 8 bytes per tuple in the queue, and avoids the need to copy
and free tuples on the receiving side.

Gather can emit the returned MinimalTuple directly, but GatherMerge now
needs to make an explicit copy because it buffers multiple tuples at a
time.

Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
---
 src/backend/executor/nodeGather.c      | 16 +++++------
 src/backend/executor/nodeGatherMerge.c | 40 ++++++++++++++------------
 src/backend/executor/tqueue.c          | 30 +++++++++----------
 src/include/executor/tqueue.h          |  4 +--
 4 files changed, 46 insertions(+), 44 deletions(-)

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 6b8ed867d5..a01b46af14 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,7 +46,7 @@
 
 static TupleTableSlot *ExecGather(PlanState *pstate);
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
-static HeapTuple gather_readnext(GatherState *gatherstate);
+static MinimalTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
 
 
@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * Initialize funnel slot to same tuple descriptor as outer plan.
 	 */
 	gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
-													  &TTSOpsHeapTuple);
+													  &TTSOpsMinimalTuple);
 
 	/*
 	 * Gather doesn't support checking a qual (it's always more efficient to
@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
 	PlanState  *outerPlan = outerPlanState(gatherstate);
 	TupleTableSlot *outerTupleSlot;
 	TupleTableSlot *fslot = gatherstate->funnel_slot;
-	HeapTuple	tup;
+	MinimalTuple	tup;
 
 	while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
 	{
@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
 
 			if (HeapTupleIsValid(tup))
 			{
-				ExecStoreHeapTuple(tup, /* tuple to store */
-								   fslot,	/* slot to store the tuple */
-								   true);	/* pfree tuple when done with it */
+				ExecStoreMinimalTuple(tup, /* tuple to store */
+									  fslot,	/* slot to store the tuple */
+									  false);	/* don't pfree tuple  */
 				return fslot;
 			}
 		}
@@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate)
 /*
  * Attempt to read a tuple from one of our parallel workers.
  */
-static HeapTuple
+static MinimalTuple
 gather_readnext(GatherState *gatherstate)
 {
 	int			nvisited = 0;
@@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate)
 	for (;;)
 	{
 		TupleQueueReader *reader;
-		HeapTuple	tup;
+		MinimalTuple tup;
 		bool		readerdone;
 
 		/* Check for async events, particularly messages from workers. */
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 317ddb4ae2..47129344f3 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -45,7 +45,7 @@
  */
 typedef struct GMReaderTupleBuffer
 {
-	HeapTuple  *tuple;			/* array of length MAX_TUPLE_STORE */
+	MinimalTuple *tuple;		/* array of length MAX_TUPLE_STORE */
 	int			nTuples;		/* number of tuples currently stored */
 	int			readCounter;	/* index of next tuple to extract */
 	bool		done;			/* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
 static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
 static int32 heap_compare_slots(Datum a, Datum b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
-static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
-								   bool nowait, bool *done);
+static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+									  bool nowait, bool *done);
 static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
 static void gather_merge_setup(GatherMergeState *gm_state);
 static void gather_merge_init(GatherMergeState *gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
 	{
 		/* Allocate the tuple array with length MAX_TUPLE_STORE */
 		gm_state->gm_tuple_buffers[i].tuple =
-			(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+			(MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
 
 		/* Initialize tuple slot for worker */
 		gm_state->gm_slots[i + 1] =
 			ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
-								   &TTSOpsHeapTuple);
+								   &TTSOpsMinimalTuple);
 	}
 
 	/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
 		GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
 
 		while (tuple_buffer->readCounter < tuple_buffer->nTuples)
-			heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+			pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
 
 		ExecClearTuple(gm_state->gm_slots[i + 1]);
 	}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
 	/* Try to fill additional slots in the array. */
 	for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
 	{
-		HeapTuple	tuple;
+		MinimalTuple tuple;
 
 		tuple = gm_readnext_tuple(gm_state,
 								  reader,
 								  true,
 								  &tuple_buffer->done);
-		if (!HeapTupleIsValid(tuple))
+		if (!tuple)
 			break;
 		tuple_buffer->tuple[i] = tuple;
 		tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
 gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 {
 	GMReaderTupleBuffer *tuple_buffer;
-	HeapTuple	tup;
+	MinimalTuple tup;
 
 	/*
 	 * If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 								reader,
 								nowait,
 								&tuple_buffer->done);
-		if (!HeapTupleIsValid(tup))
+		if (!tup)
 			return false;
 
 		/*
@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 		load_tuple_array(gm_state, reader);
 	}
 
-	Assert(HeapTupleIsValid(tup));
+	Assert(tup);
 
 	/* Build the TupleTableSlot for the given tuple */
-	ExecStoreHeapTuple(tup,		/* tuple to store */
-					   gm_state->gm_slots[reader],	/* slot in which to store
-													 * the tuple */
-					   true);	/* pfree tuple when done with it */
+	ExecStoreMinimalTuple(tup,		/* tuple to store */
+						  gm_state->gm_slots[reader],	/* slot in which to store
+														 * the tuple */
+						  true);	/* pfree tuple when done with it */
 
 	return true;
 }
@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 /*
  * Attempt to read a tuple from given worker.
  */
-static HeapTuple
+static MinimalTuple
 gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
 				  bool *done)
 {
 	TupleQueueReader *reader;
-	HeapTuple	tup;
+	MinimalTuple tup;
 
 	/* Check for async events, particularly messages from workers. */
 	CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
 	reader = gm_state->reader[nreader - 1];
 	tup = TupleQueueReaderNext(reader, nowait, done);
 
-	return tup;
+	/*
+	 * Since we'll be buffering these across multiple calls, we need to make a
+	 * copy.
+	 */
+	return tup ? heap_copy_minimal_tuple(tup) : NULL;
 }
 
 /*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e5656fbfac..30a264ebea 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -54,16 +54,16 @@ static bool
 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
-	HeapTuple	tuple;
+	MinimalTuple tuple;
 	shm_mq_result result;
 	bool		should_free;
 
 	/* Send the tuple itself. */
-	tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
-	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
+	tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
+	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
 
 	if (should_free)
-		heap_freetuple(tuple);
+		pfree(tuple);
 
 	/* Check for failure. */
 	if (result == SHM_MQ_DETACHED)
@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
  * nowait = true and no tuple is ready to return.  *done, if not NULL,
  * is set to true when there are no remaining tuples and otherwise to false.
  *
- * The returned tuple, if any, is allocated in CurrentMemoryContext.
- * Note that this routine must not leak memory!  (We used to allow that,
- * but not any more.)
+ * The returned tuple, if any, is either in shared memory or a private buffer
+ * and should not be freed.  The pointer is invalid after the next call to
+ * TupleQueueReaderNext().
  *
  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
  * accumulate bytes from a partially-read message, so it's useful to call
  * this with nowait = true even if nothing is returned.
  */
-HeapTuple
+MinimalTuple
 TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
 {
-	HeapTupleData htup;
+	MinimalTuple tuple;
 	shm_mq_result result;
 	Size		nbytes;
 	void	   *data;
@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
 	Assert(result == SHM_MQ_SUCCESS);
 
 	/*
-	 * Set up a dummy HeapTupleData pointing to the data from the shm_mq
-	 * (which had better be sufficiently aligned).
+	 * Return a pointer to the queue memory directly (which had better be
+	 * sufficiently aligned).
 	 */
-	ItemPointerSetInvalid(&htup.t_self);
-	htup.t_tableOid = InvalidOid;
-	htup.t_len = nbytes;
-	htup.t_data = data;
+	tuple = (MinimalTuple) data;
+	Assert(tuple->t_len == nbytes);
 
-	return heap_copytuple(&htup);
+	return tuple;
 }
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 93655ef6bd..264eb56641 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
 /* Use these to receive tuples from a shm_mq. */
 extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
 extern void DestroyTupleQueueReader(TupleQueueReader *reader);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
-									  bool nowait, bool *done);
+extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
+										 bool nowait, bool *done);
 
 #endif							/* TQUEUE_H */
-- 
2.20.1

Reply via email to