On Tue, Sep 7, 2021 at 8:41 PM Tomas Vondra <tomas.von...@enterprisedb.com>
wrote:

> Hi,
>
> The numbers presented in this thread seem very promising - clearly
> there's significant potential for improvements. I'll run similar
> benchmarks too, to get a better understanding of this.
>

Thanks for showing interest.


>
> Can you share some basic details about the hardware you used?
> Particularly the CPU model - I guess this might explain some of the
> results, e.g. if CPU caches are ~1MB, that'd explain why setting
> tup_queue_size to 1MB improves things, but 4MB is a bit slower.
> Similarly, number of cores might explain why 4 workers perform better
> than 8 or 16 workers.
>

I have attached the output of the lscpu.  I think batching the data before
updating in the shared memory will win because we are avoiding the frequent
cache misses and IMHO the benefit will be more in the machine with more CPU
sockets.

Now, this is mostly expected, but the consequence is that maybe things
> like queue size should be tunable/dynamic, not hard-coded?
>

Actually, my intention behind the tuple queue size was to just see the
behavior. Do we really have the problem of workers stalling on queue while
sending the tuple, the perf report showed some load on WaitLatch on the
worker side so I did this experiment.  I saw some benefits but it was not
really huge.  I am not sure whether we want to just increase the tuple
queue size or make it tunable,  but if we want to support redistribute
operators in future sometime then maybe we should make it dynamically
growing at runtime, maybe using dsa or dsa + shared files.


> As for the patches, I think the proposed changes are sensible, but I
> wonder what queries might get slower. For example with the batching
> (updating the counter only once every 4kB, that pretty much transfers
> data in larger chunks with higher latency. So what if the query needs
> only a small chunk, like a LIMIT query? Similarly, this might mean the
> upper parts of the plan have to wait for the data for longer, and thus
> can't start some async operation (like send them to a FDW, or something
> like that). I do admit those are theoretical queries, I haven't tried
> creating such query.
>

Yeah, I was thinking about such cases, basically, this design can increase
the startup cost of the Gather node, I will also try to derive such cases
and test them.


>
> FWIW I've tried applying both patches at the same time, but there's a
> conflict in shm_mq_sendv - not a complex one, but I'm not sure what's
> the correct solution. Can you share a "combined" patch?
>

Actually, these both patches are the same,
"v1-0001-Optimize-parallel-tuple-send-shm_mq_send_bytes.patch" is the
cleaner version of the first patch.  For configurable tuple queue size I
did not send a patch, because that is I just used for the testing purpose
and never intended to to propose anything.  My most of the latest
performance data I sent with only
"v1-0001-Optimize-parallel-tuple-send-shm_mq_send_bytes.patch" and with
default tuple queue size.

But I am attaching both the patches in case you want to play around.


-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment: cpuinfo
Description: Binary data

From 84c2e46808b59f6bf7a782f6b0735dafc4e89e13 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 4 Aug 2021 16:51:01 +0530
Subject: [PATCH v1 1/2] Optimize parallel tuple send (shm_mq_send_bytes)

Do not update shm_mq's mq_bytes_written until we have written
an amount of data greater than 1/4th of the ring size.  This
will prevent frequent CPU cache misses, and it will also avoid
frequent SetLatch() calls, which are quite expensive.
---
 src/backend/executor/tqueue.c         |  2 +-
 src/backend/libpq/pqmq.c              |  7 +++-
 src/backend/storage/ipc/shm_mq.c      | 65 +++++++++++++++++++++++++++++------
 src/include/storage/shm_mq.h          |  8 +++--
 src/test/modules/test_shm_mq/test.c   |  7 ++--
 src/test/modules/test_shm_mq/worker.c |  2 +-
 6 files changed, 72 insertions(+), 19 deletions(-)

diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 7af9fbe..eb0cbd7 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the tuple itself. */
 	tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
-	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
+	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
 
 	if (should_free)
 		pfree(tuple);
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index d1a1f47..846494b 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 
 	for (;;)
 	{
-		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+		/*
+		 * Immediately notify the receiver by passing force_flush as true so
+		 * that the shared memory value is updated before we send the parallel
+		 * message signal right after this.
+		 */
+		result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
 
 		if (pq_mq_parallel_leader_pid != 0)
 			SendProcSignal(pq_mq_parallel_leader_pid,
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 91a7093..3e1781c 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -120,6 +120,12 @@ struct shm_mq
  * message itself, and mqh_expected_bytes - which is used only for reads -
  * tracks the expected total size of the payload.
  *
+ * mqh_send_pending, is number of bytes that is written to the queue but not
+ * yet updated in the shared memory.  We will not update it until the written
+ * data is 1/4th of the ring size or the tuple queue is full.  This will
+ * prevent frequent CPU cache misses, and it will also avoid frequent
+ * SetLatch() calls, which are quite expensive.
+ *
  * mqh_counterparty_attached tracks whether we know the counterparty to have
  * attached to the queue at some previous point.  This lets us avoid some
  * mutex acquisitions.
@@ -139,6 +145,7 @@ struct shm_mq_handle
 	Size		mqh_consume_pending;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
+	Size		mqh_send_pending;
 	bool		mqh_length_word_complete;
 	bool		mqh_counterparty_attached;
 	MemoryContext mqh_context;
@@ -294,6 +301,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_consume_pending = 0;
 	mqh->mqh_partial_bytes = 0;
 	mqh->mqh_expected_bytes = 0;
+	mqh->mqh_send_pending = 0;
 	mqh->mqh_length_word_complete = false;
 	mqh->mqh_counterparty_attached = false;
 	mqh->mqh_context = CurrentMemoryContext;
@@ -317,16 +325,22 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
 /*
  * Write a message into a shared message queue.
+ *
+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
+ * and notify the receiver if it is already attached.  Otherwise, we don't
+ * update it until we have written an amount of data greater than 1/4th of the
+ * ring size.
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
+			bool force_flush)
 {
 	shm_mq_iovec iov;
 
 	iov.data = data;
 	iov.len = nbytes;
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+	return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
 }
 
 /*
@@ -343,9 +357,12 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
  * arguments, each time the process latch is set.  (Once begun, the sending
  * of a message cannot be aborted except by detaching from the queue; changing
  * the length or payload will corrupt the queue.)
+ *
+ * For force_flush, refer comments atop shm_mq_send interface.
  */
 shm_mq_result
-shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
+			 bool force_flush)
 {
 	shm_mq_result res;
 	shm_mq	   *mq = mqh->mqh_queue;
@@ -518,8 +535,19 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 		mqh->mqh_counterparty_attached = true;
 	}
 
-	/* Notify receiver of the newly-written data, and return. */
-	SetLatch(&receiver->procLatch);
+	/*
+	 * If we have written more than 1/4 of the ring or the caller has
+	 * requested force flush, mark it as written in shared memory and notify
+	 * the receiver.  For more detail refer comments atop shm_mq_handle
+	 * structure.
+	 */
+	if (mqh->mqh_send_pending > mq->mq_ring_size / 4 || force_flush)
+	{
+		shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+		SetLatch(&receiver->procLatch);
+		mqh->mqh_send_pending = 0;
+	}
+
 	return SHM_MQ_SUCCESS;
 }
 
@@ -816,6 +844,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
 void
 shm_mq_detach(shm_mq_handle *mqh)
 {
+	/* Before detaching, notify already written data to the receiver. */
+	if (mqh->mqh_send_pending > 0)
+	{
+		shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
+		mqh->mqh_send_pending = 0;
+	}
+
 	/* Notify counterparty that we're outta here. */
 	shm_mq_detach_internal(mqh->mqh_queue);
 
@@ -894,7 +929,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 
 		/* Compute number of ring buffer bytes used and available. */
 		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
-		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+		wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
 		Assert(wb >= rb);
 		used = wb - rb;
 		Assert(used <= ringsize);
@@ -951,6 +986,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 		}
 		else if (available == 0)
 		{
+			/* Update the pending send bytes in the shared memory. */
+			shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+
 			/*
 			 * Since mq->mqh_counterparty_attached is known to be true at this
 			 * point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +997,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			Assert(mqh->mqh_counterparty_attached);
 			SetLatch(&mq->mq_receiver->procLatch);
 
+			/*
+			 * We have just updated the mqh_send_pending bytes in the shared
+			 * memory so reset it.
+			 */
+			mqh->mqh_send_pending = 0;
+
 			/* Skip manipulation of our latch if nowait = true. */
 			if (nowait)
 			{
@@ -1009,13 +1053,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			 * MAXIMUM_ALIGNOF, and each read is as well.
 			 */
 			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
-			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
 			/*
-			 * For efficiency, we don't set the reader's latch here.  We'll do
-			 * that only when the buffer fills up or after writing an entire
-			 * message.
+			 * For efficiency, we don't update the bytes written in the shared
+			 * memory and also don't set the reader's latch here.  Refer to
+			 * the comments atop the shm_mq_handle structure for more
+			 * information.
 			 */
+			mqh->mqh_send_pending += MAXALIGN(sendnow);
 		}
 	}
 
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index e693f3f..cb1c555 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-								 Size nbytes, const void *data, bool nowait);
-extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
-								  shm_mq_iovec *iov, int iovcnt, bool nowait);
+								 Size nbytes, const void *data, bool nowait,
+								 bool force_flush);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
+								  int iovcnt, bool nowait, bool force_flush);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 									Size *nbytesp, void **datap, bool nowait);
+extern void shm_mq_flush(shm_mq_handle *mqh);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c
index 2d8d695..be074f0 100644
--- a/src/test/modules/test_shm_mq/test.c
+++ b/src/test/modules/test_shm_mq/test.c
@@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
 
 	/* Send the initial message. */
-	res = shm_mq_send(outqh, message_size, message_contents, false);
+	res = shm_mq_send(outqh, message_size, message_contents, false, true);
 	if (res != SHM_MQ_SUCCESS)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
 			break;
 
 		/* Send it back out. */
-		res = shm_mq_send(outqh, len, data, false);
+		res = shm_mq_send(outqh, len, data, false, true);
 		if (res != SHM_MQ_SUCCESS)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
 		 */
 		if (send_count < loop_count)
 		{
-			res = shm_mq_send(outqh, message_size, message_contents, true);
+			res = shm_mq_send(outqh, message_size, message_contents, true,
+							  true);
 			if (res == SHM_MQ_SUCCESS)
 			{
 				++send_count;
diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c
index 2180776..9b037b9 100644
--- a/src/test/modules/test_shm_mq/worker.c
+++ b/src/test/modules/test_shm_mq/worker.c
@@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
 			break;
 
 		/* Send it back out. */
-		res = shm_mq_send(outqh, len, data, false);
+		res = shm_mq_send(outqh, len, data, false, true);
 		if (res != SHM_MQ_SUCCESS)
 			break;
 	}
-- 
1.8.3.1

From 455026b0f70eec8acf3565824c03a9e20588e9cc Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Mon, 26 Jul 2021 20:18:48 +0530
Subject: [PATCH v1 2/2] poc-test-parallel_tuple_queue_size

---
 src/backend/executor/execParallel.c |  3 ++-
 src/backend/utils/misc/guc.c        | 10 ++++++++++
 src/include/storage/pg_shmem.h      |  1 +
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8a4a40..f9dd5fc 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -51,6 +51,7 @@
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 
+int			parallel_tuple_queue_size =	65536;
 /*
  * Magic numbers for parallel executor communication.  We use constants
  * greater than any 32-bit integer here so that values < 2^32 can be used
@@ -67,7 +68,7 @@
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
 
-#define PARALLEL_TUPLE_QUEUE_SIZE		65536
+#define PARALLEL_TUPLE_QUEUE_SIZE		parallel_tuple_queue_size * 1024L
 
 /*
  * Fixed-size random stuff that we need to pass to parallel workers.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index c339acf..4d5dca5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3346,6 +3346,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"parallel_tuple_queue_size", PGC_USERSET, RESOURCES_MEM,
+			gettext_noop("Sets the parallel tuple queue size."),
+			GUC_UNIT_KB
+		},
+		&parallel_tuple_queue_size,
+		64, 64, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
 			gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
 			NULL,
diff --git a/src/include/storage/pg_shmem.h b/src/include/storage/pg_shmem.h
index 059df1b..9182a0e 100644
--- a/src/include/storage/pg_shmem.h
+++ b/src/include/storage/pg_shmem.h
@@ -45,6 +45,7 @@ typedef struct PGShmemHeader	/* standard header for all Postgres shmem */
 extern int	shared_memory_type;
 extern int	huge_pages;
 extern int	huge_page_size;
+extern int	parallel_tuple_queue_size;
 
 /* Possible values for huge_pages */
 typedef enum
-- 
1.8.3.1

Reply via email to