On Fri, Aug 6, 2021 at 2:00 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > Experiment #1: > As part of this experiment, I have modified the sender to keep the > local copy of "mq_bytes_read" and "mq_bytes_written" in the local mqh > handle so that we don't need to frequently read/write cache sensitive > shared memory variables. So now we only read/write from the shared > memory in the below conditions > > 1) If the number of available bytes is not enough to send the tuple, > read the updated value of bytes read and also inform the reader about > the new writes. > 2) After every 4k bytes written, update the shared memory variable and > inform the reader. > 3) on detach for sending any remaining data. ... > Results: (query EXPLAIN ANALYZE SELECT * FROM t;) > 1) Non-parallel (default) > Execution Time: 31627.492 ms > > 2) Parallel with 4 workers (force by setting parallel_tuple_cost to 0) > Execution Time: 37498.672 ms > > 3) Same as above (2) but with the patch. > Execution Time: 23649.287 ms
Here is the POC patch for the same, apart from this extreme case I am able to see improvement with this patch for normal parallel queries as well. Next, I will perform some more tests with different sets of queries to see the improvements and post the results. I will also try to optimize the reader on the similar line. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From 4d19beb74e91a8259fcfd8da7ac8b1395c5f5c6d Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Wed, 4 Aug 2021 16:51:01 +0530 Subject: [PATCH] Optimize shm_mq_send_bytes Instead of freqnetly updating the bytes written in the shared memory, only update when it crosses some limit (4k), this will avoid frequent cache invalidation, atomic operations and SetLatch. --- src/backend/access/transam/parallel.c | 4 +- src/backend/executor/execParallel.c | 2 +- src/backend/storage/ipc/shm_mq.c | 87 ++++++++++++++++++++++------------- src/include/storage/shm_mq.h | 2 +- src/test/modules/test_shm_mq/setup.c | 2 +- 5 files changed, 59 insertions(+), 38 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 3550ef1..2571807 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -431,7 +431,7 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_mq *mq; start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; - mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE, 0); shm_mq_set_receiver(mq, MyProc); pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } @@ -497,7 +497,7 @@ ReinitializeParallelDSM(ParallelContext *pcxt) shm_mq *mq; start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; - mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE, 0); shm_mq_set_receiver(mq, MyProc); pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f9dd5fc..373f920 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -567,7 +567,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) mq = shm_mq_create(tqueuespace + ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE, - (Size) PARALLEL_TUPLE_QUEUE_SIZE); + (Size) PARALLEL_TUPLE_QUEUE_SIZE, 4096); shm_mq_set_receiver(mq, MyProc); responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL); diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 91a7093..eab7bcc 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -53,6 +53,10 @@ * mq_ring_size and mq_ring_offset never change after initialization, and * can therefore be read without the lock. * + * After every mq_min_send_size bytes are written the sender will update the + * shared memory value mq_bytes_written to avoid frequent cache invalidations + * and atomic operations. + * * Importantly, mq_ring can be safely read and written without a lock. * At any given time, the difference between mq_bytes_read and * mq_bytes_written defines the number of bytes within mq_ring that contain @@ -77,6 +81,7 @@ struct shm_mq pg_atomic_uint64 mq_bytes_read; pg_atomic_uint64 mq_bytes_written; Size mq_ring_size; + Size mq_min_send_size; bool mq_detached; uint8 mq_ring_offset; char mq_ring[FLEXIBLE_ARRAY_MEMBER]; @@ -139,6 +144,9 @@ struct shm_mq_handle Size mqh_consume_pending; Size mqh_partial_bytes; Size mqh_expected_bytes; + uint64 mqh_last_sent_bytes; + uint64 mqh_bytes_read; + uint64 mqh_bytes_written; bool mqh_length_word_complete; bool mqh_counterparty_attached; MemoryContext mqh_context; @@ -155,7 +163,6 @@ static bool shm_mq_counterparty_gone(shm_mq *mq, static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle); static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); -static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); /* Minimum queue size is enough for header and at least one chunk of data. */ @@ -168,7 +175,7 @@ MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; * Initialize a new shared message queue. */ shm_mq * -shm_mq_create(void *address, Size size) +shm_mq_create(void *address, Size size, Size min_send_size) { shm_mq *mq = address; Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); @@ -188,6 +195,7 @@ shm_mq_create(void *address, Size size) mq->mq_ring_size = size - data_offset; mq->mq_detached = false; mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); + mq->mq_min_send_size = min_send_size; return mq; } @@ -297,6 +305,9 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) mqh->mqh_length_word_complete = false; mqh->mqh_counterparty_attached = false; mqh->mqh_context = CurrentMemoryContext; + mqh->mqh_bytes_read = 0; + mqh->mqh_bytes_written = 0; + mqh->mqh_last_sent_bytes = 0; if (seg != NULL) on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); @@ -518,8 +529,17 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) mqh->mqh_counterparty_attached = true; } - /* Notify receiver of the newly-written data, and return. */ - SetLatch(&receiver->procLatch); + /* + * Notify receiver of the newly-written data, only if we have written + * enough data. + */ + if (mqh->mqh_bytes_written - mqh->mqh_last_sent_bytes > mq->mq_min_send_size) + { + pg_atomic_write_u64(&mq->mq_bytes_written, mqh->mqh_bytes_written); + mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written; + SetLatch(&receiver->procLatch); + } + return SHM_MQ_SUCCESS; } @@ -816,6 +836,16 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) void shm_mq_detach(shm_mq_handle *mqh) { + + if (mqh->mqh_queue->mq_min_send_size > 0 && + mqh->mqh_bytes_written > mqh->mqh_last_sent_bytes) + { + pg_atomic_write_u64(&mqh->mqh_queue->mq_bytes_written, + mqh->mqh_bytes_written); + mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written; + SetLatch(&mqh->mqh_queue->mq_receiver->procLatch); + } + /* Notify counterparty that we're outta here. */ shm_mq_detach_internal(mqh->mqh_queue); @@ -886,15 +916,24 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, uint64 used; Size ringsize = mq->mq_ring_size; Size available; - while (sent < nbytes) { uint64 rb; uint64 wb; /* Compute number of ring buffer bytes used and available. */ - rb = pg_atomic_read_u64(&mq->mq_bytes_read); - wb = pg_atomic_read_u64(&mq->mq_bytes_written); + wb = mqh->mqh_bytes_written; + rb = mqh->mqh_bytes_read; + + /* + * If based on the local values of the mqh_bytes_read, we don't have + * enough size in the ring buffer then read the latest value from the + * shared memory. Avoid reading everytime from the shared memory will + * reduce the atomic operations as well as cache misses. + */ + if ((ringsize - (wb-rb)) < nbytes) + mqh->mqh_bytes_read = rb = pg_atomic_read_u64(&mq->mq_bytes_read); + Assert(wb >= rb); used = wb - rb; Assert(used <= ringsize); @@ -957,6 +996,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, * Therefore, we can read it without acquiring the spinlock. */ Assert(mqh->mqh_counterparty_attached); + pg_atomic_write_u64(&mq->mq_bytes_written, mqh->mqh_bytes_written); + mqh->mqh_last_sent_bytes = mqh->mqh_bytes_written; + SetLatch(&mq->mq_receiver->procLatch); /* Skip manipulation of our latch if nowait = true. */ @@ -1009,13 +1051,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, * MAXIMUM_ALIGNOF, and each read is as well. */ Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); - shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); /* - * For efficiency, we don't set the reader's latch here. We'll do - * that only when the buffer fills up or after writing an entire - * message. + * For efficiency, we don't update the bytes written in the shared + * memory and also don't set the reader's latch here. We'll do + * that only when the buffer fills up or after writing more than + * writing 4k data. */ + mqh->mqh_bytes_written += MAXALIGN(sendnow); } } @@ -1253,28 +1296,6 @@ shm_mq_inc_bytes_read(shm_mq *mq, Size n) SetLatch(&sender->procLatch); } -/* - * Increment the number of bytes written. - */ -static void -shm_mq_inc_bytes_written(shm_mq *mq, Size n) -{ - /* - * Separate prior reads of mq_ring from the write of mq_bytes_written - * which we're about to do. Pairs with the read barrier found in - * shm_mq_receive_bytes. - */ - pg_write_barrier(); - - /* - * There's no need to use pg_atomic_fetch_add_u64 here, because nobody - * else can be changing this value. This method avoids taking the bus - * lock unnecessarily. - */ - pg_atomic_write_u64(&mq->mq_bytes_written, - pg_atomic_read_u64(&mq->mq_bytes_written) + n); -} - /* Shim for on_dsm_detach callback. */ static void shm_mq_detach_callback(dsm_segment *seg, Datum arg) diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index e693f3f..58a34a9 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -47,7 +47,7 @@ typedef enum * or written, but they need not be set by the same process. Each must be * set exactly once. */ -extern shm_mq *shm_mq_create(void *address, Size size); +extern shm_mq *shm_mq_create(void *address, Size size, Size min_send_size); extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *); extern void shm_mq_set_sender(shm_mq *mq, PGPROC *); diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c index e05e97c..0bbc7cb 100644 --- a/src/test/modules/test_shm_mq/setup.c +++ b/src/test/modules/test_shm_mq/setup.c @@ -143,7 +143,7 @@ setup_dynamic_shared_memory(int64 queue_size, int nworkers, shm_mq *mq; mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), - (Size) queue_size); + (Size) queue_size, 0); shm_toc_insert(toc, i + 1, mq); if (i == 0) -- 1.8.3.1