On Thu, Sep 21, 2017 at 4:50 PM, Rafia Sabih <rafia.sa...@enterprisedb.com> wrote: > On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbal...@gmail.com> wrote: >> On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih >> <rafia.sa...@enterprisedb.com> wrote: >> > > Please find the attached file for the revised version.
Thanks for the updated patch, I have some more comments. +static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh); +static uint64 space_in_local(local_mq * lq, Size tuple_size); +static bool read_local_queue(local_mq * lq, bool shm_mq_full); local_mq * lq -> local_mq *lq same for other places as well. --- +static uint64 space_in_shm(shm_mq *mq); + +static uint64 space_in_local(local_mq * lq, Size tuple_size); we better use Size here instead if uint64 --- + available = ringsize - used; + + ringsize = lq->mq_ring_size; + writer_offset = lq->mq_bytes_written % ringsize; + reader_offset = lq->mq_bytes_read % ringsize; + + if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset) + available = (ringsize - writer_offset); even though there is space in queue but tuple need rotation then we are not allowing it to write into the local queue. If there is some strong reason behind that, please add comments to explain the same. --- + if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size) + return true; + + else + return true; +} Seems like you want to return 'false' in the else case. ---- + read_offset = lq->mq_bytes_read % lq->mq_ring_size; + available = space_in_shm(mqh->mqh_queue); + + /* always read data in the aligned form */ + to_read = MAXALIGN_DOWN(Min(used, available)); + + /* + * if the amount of data to be send from local queue involves wrapping of + * local queue, then send only the data till the end of queue right now + * and rest later. + */ + if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size) You can directly use "read_offset" instead of recalculating lq->mq_bytes_read % lq->mq_ring_size. ---- + do + { + if (shm_mq_is_detached(mqh->mqh_queue)) + return SHM_MQ_DETACHED; + + shm_space = space_in_shm(mqh->mqh_queue); + + /* + * cannot send data to shared queue, unless there is required + * space, so wait till we get some space, since we cannot + * write anymore in local queue as of now + */ + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + shm_space = space_in_shm(mqh->mqh_queue); + + if (read_local_queue(lq, true) && shm_space > 0) + copy_local_to_shared(lq, mqh, false); + + local_space = space_in_local(lq, tuple_size); + + } while (local_space <= tuple_size); + 1. Just after getting the shm_space, you are calling WaitLatch, without even checking whether that space is sufficient to send the tuple. 2. Every time after latch is set (maybe some space freed in the shared queue) you are calling copy_local_to_shared to send as much data as possible from local to shared queue, but that doesn't even guarantee that we will have sufficient space in the local queue to accommodate the current tuple. I think calling copy_local_to_shared multiple time (which will internally acquire mutex), after latch is set you can check the shared queue space, don't attempt copy_local_to_shared unless shm_space >=tuple_size -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers