On Fri, Aug 6, 2021 at 2:00 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
>
> Experiment #1:
> As part of this experiment, I have modified the sender to keep the
> local copy of "mq_bytes_read" and "mq_bytes_written" in the local mqh
> handle so that we don't need to frequently read/write cache sensitive
> shared memory variables.  So now we only read/write from the shared
> memory in the below conditions
>
> 1) If the number of available bytes is not enough to send the tuple,
> read the updated value of bytes read and also inform the reader about
> the new writes.
> 2) After every 4k bytes written, update the shared memory variable and
> inform the reader.
> 3) on detach for sending any remaining data.
...
> Results: (query EXPLAIN ANALYZE SELECT * FROM t;)
> 1) Non-parallel (default)
>  Execution Time: 31627.492 ms
>
> 2) Parallel with 4 workers (force by setting parallel_tuple_cost to 0)
>  Execution Time: 37498.672 ms
>
> 3) Same as above (2) but with the patch.
> Execution Time: 23649.287 ms

Here is the POC patch for the same, apart from this extreme case I am
able to see improvement with this patch for normal parallel queries as
well.

Next, I will perform some more tests with different sets of queries to
see the improvements and post the results.  I will also try to
optimize the reader on the similar line.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 4d19beb74e91a8259fcfd8da7ac8b1395c5f5c6d Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 4 Aug 2021 16:51:01 +0530
Subject: [PATCH] Optimize shm_mq_send_bytes

Instead of freqnetly updating the bytes written in the shared memory,
only update when it crosses some limit (4k), this will avoid frequent
cache invalidation, atomic operations and SetLatch.
---
 src/backend/access/transam/parallel.c |  4 +-
 src/backend/executor/execParallel.c   |  2 +-
 src/backend/storage/ipc/shm_mq.c      | 87 ++++++++++++++++++++++-------------
 src/include/storage/shm_mq.h          |  2 +-
 src/test/modules/test_shm_mq/setup.c  |  2 +-
 5 files changed, 59 insertions(+), 38 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 3550ef1..2571807 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -431,7 +431,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 			shm_mq	   *mq;
 
 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
-			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE, 0);
 			shm_mq_set_receiver(mq, MyProc);
 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
 		}
@@ -497,7 +497,7 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 			shm_mq	   *mq;
 
 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
-			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE, 0);
 			shm_mq_set_receiver(mq, MyProc);
 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
 		}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f9dd5fc..373f920 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -567,7 +567,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 
 		mq = shm_mq_create(tqueuespace +
 						   ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
-						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
+						   (Size) PARALLEL_TUPLE_QUEUE_SIZE, 4096);
 
 		shm_mq_set_receiver(mq, MyProc);
 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 91a7093..eab7bcc 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -53,6 +53,10 @@
  * mq_ring_size and mq_ring_offset never change after initialization, and
  * can therefore be read without the lock.
  *
+ * After every mq_min_send_size bytes are written the sender will update the
+ * shared memory value mq_bytes_written to avoid frequent cache invalidations
+ * and atomic operations.
+ *
  * Importantly, mq_ring can be safely read and written without a lock.
  * At any given time, the difference between mq_bytes_read and
  * mq_bytes_written defines the number of bytes within mq_ring that contain
@@ -77,6 +81,7 @@ struct shm_mq
 	pg_atomic_uint64 mq_bytes_read;
 	pg_atomic_uint64 mq_bytes_written;
 	Size		mq_ring_size;
+	Size		mq_min_send_size;
 	bool		mq_detached;
 	uint8		mq_ring_offset;
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
@@ -139,6 +144,9 @@ struct shm_mq_handle
 	Size		mqh_consume_pending;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
+	uint64		mqh_last_sent_bytes;
+	uint64		mqh_bytes_read;
+	uint64		mqh_bytes_written;
 	bool		mqh_length_word_complete;
 	bool		mqh_counterparty_attached;
 	MemoryContext mqh_context;
@@ -155,7 +163,6 @@ static bool shm_mq_counterparty_gone(shm_mq *mq,
 static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
 								 BackgroundWorkerHandle *handle);
 static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
-static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
@@ -168,7 +175,7 @@ MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
  * Initialize a new shared message queue.
  */
 shm_mq *
-shm_mq_create(void *address, Size size)
+shm_mq_create(void *address, Size size, Size min_send_size)
 {
 	shm_mq	   *mq = address;
 	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
@@ -188,6 +195,7 @@ shm_mq_create(void *address, Size size)
 	mq->mq_ring_size = size - data_offset;
 	mq->mq_detached = false;
 	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
+	mq->mq_min_send_size = min_send_size;
 
 	return mq;
 }
@@ -297,6 +305,9 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_length_word_complete = false;
 	mqh->mqh_counterparty_attached = false;
 	mqh->mqh_context = CurrentMemoryContext;
+	mqh->mqh_bytes_read = 0;
+	mqh->mqh_bytes_written = 0;
+	mqh->mqh_last_sent_bytes = 0;
 
 	if (seg != NULL)
 		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
@@ -518,8 +529,17 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 		mqh->mqh_counterparty_attached = true;
 	}
 
-	/* Notify receiver of the newly-written data, and return. */
-	SetLatch(&receiver->procLatch);
+	/*
+	 * Notify receiver of the newly-written data, only if we have written
+	 * enough data.
+	 */
+	if (mqh->mqh_bytes_written - mqh->mqh_last_sent_bytes > mq->mq_min_send_size)
+	{
+		pg_atomic_write_u64(&mq->mq_bytes_written, mqh->mqh_bytes_written);
+		mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written;
+		SetLatch(&receiver->procLatch);
+	}
+
 	return SHM_MQ_SUCCESS;
 }
 
@@ -816,6 +836,16 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
 void
 shm_mq_detach(shm_mq_handle *mqh)
 {
+
+	if (mqh->mqh_queue->mq_min_send_size > 0 &&
+		mqh->mqh_bytes_written > mqh->mqh_last_sent_bytes)
+	{
+		pg_atomic_write_u64(&mqh->mqh_queue->mq_bytes_written,
+							mqh->mqh_bytes_written);
+		mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written;
+		SetLatch(&mqh->mqh_queue->mq_receiver->procLatch);
+	}
+
 	/* Notify counterparty that we're outta here. */
 	shm_mq_detach_internal(mqh->mqh_queue);
 
@@ -886,15 +916,24 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 	uint64		used;
 	Size		ringsize = mq->mq_ring_size;
 	Size		available;
-
 	while (sent < nbytes)
 	{
 		uint64		rb;
 		uint64		wb;
 
 		/* Compute number of ring buffer bytes used and available. */
-		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
-		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+		wb = mqh->mqh_bytes_written;
+		rb = mqh->mqh_bytes_read;
+
+		/*
+		 * If based on the local values of the mqh_bytes_read, we don't have
+		 * enough size in the ring buffer then read the latest value from the
+		 * shared memory.  Avoid reading everytime from the shared memory will
+		 * reduce the atomic operations as well as cache misses.
+		 */
+		if ((ringsize - (wb-rb)) < nbytes)
+			mqh->mqh_bytes_read = rb = pg_atomic_read_u64(&mq->mq_bytes_read);
+
 		Assert(wb >= rb);
 		used = wb - rb;
 		Assert(used <= ringsize);
@@ -957,6 +996,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			 * Therefore, we can read it without acquiring the spinlock.
 			 */
 			Assert(mqh->mqh_counterparty_attached);
+			pg_atomic_write_u64(&mq->mq_bytes_written, mqh->mqh_bytes_written);
+			mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written;
+
 			SetLatch(&mq->mq_receiver->procLatch);
 
 			/* Skip manipulation of our latch if nowait = true. */
@@ -1009,13 +1051,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			 * MAXIMUM_ALIGNOF, and each read is as well.
 			 */
 			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
-			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
 			/*
-			 * For efficiency, we don't set the reader's latch here.  We'll do
-			 * that only when the buffer fills up or after writing an entire
-			 * message.
+			 * For efficiency, we don't update the bytes written in the shared
+			 * memory and also don't set the reader's latch here.  We'll do
+			 * that only when the buffer fills up or after writing more than
+			 * writing 4k data.
 			 */
+			mqh->mqh_bytes_written += MAXALIGN(sendnow);
 		}
 	}
 
@@ -1253,28 +1296,6 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n)
 	SetLatch(&sender->procLatch);
 }
 
-/*
- * Increment the number of bytes written.
- */
-static void
-shm_mq_inc_bytes_written(shm_mq *mq, Size n)
-{
-	/*
-	 * Separate prior reads of mq_ring from the write of mq_bytes_written
-	 * which we're about to do.  Pairs with the read barrier found in
-	 * shm_mq_receive_bytes.
-	 */
-	pg_write_barrier();
-
-	/*
-	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
-	 * else can be changing this value.  This method avoids taking the bus
-	 * lock unnecessarily.
-	 */
-	pg_atomic_write_u64(&mq->mq_bytes_written,
-						pg_atomic_read_u64(&mq->mq_bytes_written) + n);
-}
-
 /* Shim for on_dsm_detach callback. */
 static void
 shm_mq_detach_callback(dsm_segment *seg, Datum arg)
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index e693f3f..58a34a9 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -47,7 +47,7 @@ typedef enum
  * or written, but they need not be set by the same process.  Each must be
  * set exactly once.
  */
-extern shm_mq *shm_mq_create(void *address, Size size);
+extern shm_mq *shm_mq_create(void *address, Size size, Size min_send_size);
 extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
 extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
 
diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c
index e05e97c..0bbc7cb 100644
--- a/src/test/modules/test_shm_mq/setup.c
+++ b/src/test/modules/test_shm_mq/setup.c
@@ -143,7 +143,7 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers,
 		shm_mq	   *mq;
 
 		mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
-						   (Size) queue_size);
+						   (Size) queue_size, 0);
 		shm_toc_insert(toc, i + 1, mq);
 
 		if (i == 0)
-- 
1.8.3.1

Reply via email to