diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 8d7e711b3b..e3405b255d 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -59,6 +59,7 @@
  */
 #define TUPLE_QUEUE_MODE_CONTROL	'c' /* mode-switch message contents */
 #define TUPLE_QUEUE_MODE_DATA		'd'
+#define LOCAL_TUPLE_QUEUE_SIZE		32768
 
 /*
  * Both the sender and receiver build trees of TupleRemapInfo nodes to help
@@ -145,6 +146,10 @@ typedef struct TQueueDestReceiver
 	char		mode;			/* current message mode */
 	TupleDesc	tupledesc;		/* current top-level tuple descriptor */
 	TupleRemapInfo **field_remapinfo;	/* current top-level remap info */
+	char *iovec;
+	int         length;
+	int         count;
+
 } TQueueDestReceiver;
 
 /*
@@ -213,6 +218,7 @@ static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
 					MemoryContext mycontext);
 static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
 					MemoryContext mycontext);
+static void empty_tqueue(TQueueDestReceiver *tqueue);
 
 
 /*
@@ -304,10 +310,53 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 		}
 	}
 
-	/* Send the tuple itself. */
+	/* Store tuples in the local queue. */
 	tuple = ExecMaterializeSlot(slot);
-	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
-
+	if(TupIsNull(slot))
+		empty_tqueue(tqueue);
+	else
+	{
+		if (tqueue->length + tuple->t_len < LOCAL_TUPLE_QUEUE_SIZE)
+		{
+			shm_mq_iovec *iov = tqueue->iovec + tqueue->length;
+			iov->len = tuple->t_len;
+			tqueue->length += sizeof (shm_mq_iovec);
+			iov->data = tqueue->iovec + tqueue->length;
+
+			/*store the tuple */
+			memcpy(iov->data, tuple->t_data, tuple->t_len);
+			tqueue->length += tuple->t_len;
+			tqueue->count++;
+			return true;
+		}
+		/* once local tuple queue is full, pass them to the shared queue */
+		else
+		{
+			int byte = 0;
+			tqueue->length = 0;
+			while(tqueue->count-- > 0)
+			{
+				shm_mq_iovec *iov = tqueue->iovec + byte;
+				/* notify the receiver only when all the tuples are sent to share queue */
+				if (tqueue->count == 0)
+					result = local_mq_send(tqueue->queue, iov->len, iov->data, false, true);
+				else
+					result = local_mq_send(tqueue->queue, iov->len, iov->data, false, false);
+				byte += sizeof(shm_mq_iovec) + iov->len;
+			}
+			tqueue->count = 0;
+			shm_mq_iovec *iov = tqueue->iovec + tqueue->length;
+			iov->len = tuple->t_len;
+			tqueue->length += sizeof (shm_mq_iovec);
+			iov->data = tqueue->iovec + tqueue->length;
+
+			/*store the tuple */
+			memcpy(iov->data, tuple->t_data, tuple->t_len);
+			tqueue->length += tuple->t_len;
+			tqueue->count++;
+			return true;
+		}
+	}
 	/* Check for failure. */
 	if (result == SHM_MQ_DETACHED)
 		return false;
@@ -318,6 +367,22 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 
 	return true;
 }
+/* Empty the slot, if there is any content left in it */
+static void
+empty_tqueue(TQueueDestReceiver *tqueue)
+{
+		int byte = 0;
+		shm_mq_result result;
+		while(tqueue->count-- > 0)
+		{
+			shm_mq_iovec *iov = tqueue->iovec + byte;
+			if (tqueue->count == 0)
+				local_mq_send(tqueue->queue, iov->len, iov->data, false, true);
+			else
+				local_mq_send(tqueue->queue, iov->len, iov->data, false, false);
+			byte += sizeof(shm_mq_iovec) + iov->len;
+		}
+}
 
 /*
  * Examine the given datum and send any necessary control messages for
@@ -577,7 +642,7 @@ static void
 tqueueShutdownReceiver(DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
-
+	empty_tqueue(tqueue);
 	shm_mq_detach(shm_mq_get_queue(tqueue->queue));
 }
 
@@ -622,6 +687,8 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
 	/* Top-level tupledesc is not known yet */
 	self->tupledesc = NULL;
 	self->field_remapinfo = NULL;
+	self->iovec = palloc0(LOCAL_TUPLE_QUEUE_SIZE);
+	self->length = 0;
 
 	return (DestReceiver *) self;
 }
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f5bf807cd6..3e5f17756e 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -141,6 +141,8 @@ struct shm_mq_handle
 
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
 				  const void *data, bool nowait, Size *bytes_written);
+static shm_mq_result local_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+				  const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
 					 bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
@@ -327,6 +329,16 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 
 	return shm_mq_sendv(mqh, &iov, 1, nowait);
 }
+shm_mq_result
+local_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool notify)
+{
+	shm_mq_iovec iov;
+
+	iov.data = data;
+	iov.len = nbytes;
+
+	return local_mq_sendv(mqh, &iov, 1, nowait, notify);
+}
 
 /*
  * Write a message into a shared message queue, gathered from multiple
@@ -491,6 +503,158 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 	/* Notify receiver of the newly-written data, and return. */
 	return shm_mq_notify_receiver(mq);
 }
+shm_mq_result
+local_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool notify)
+{
+	shm_mq_result res;
+	shm_mq	   *mq = mqh->mqh_queue;
+	Size		nbytes = 0;
+	Size		bytes_written;
+	int			i;
+	int			which_iov = 0;
+	Size		offset;
+
+	Assert(mq->mq_sender == MyProc);
+
+	/* Compute total size of write. */
+	for (i = 0; i < iovcnt; ++i)
+		nbytes += iov[i].len;
+
+	/* Try to write, or finish writing, the length word into the buffer. */
+	while (!mqh->mqh_length_word_complete)
+	{
+		Assert(mqh->mqh_partial_bytes < sizeof(Size));
+		res = local_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+								((char *) &nbytes) +mqh->mqh_partial_bytes,
+								nowait, &bytes_written);
+
+		if (res == SHM_MQ_DETACHED)
+		{
+			/* Reset state in case caller tries to send another message. */
+			mqh->mqh_partial_bytes = 0;
+			mqh->mqh_length_word_complete = false;
+			return res;
+		}
+		mqh->mqh_partial_bytes += bytes_written;
+
+		if (mqh->mqh_partial_bytes >= sizeof(Size))
+		{
+			Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+			mqh->mqh_partial_bytes = 0;
+			mqh->mqh_length_word_complete = true;
+		}
+
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+
+		/* Length word can't be split unless bigger than required alignment. */
+		Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
+	}
+
+	/* Write the actual data bytes into the buffer. */
+	Assert(mqh->mqh_partial_bytes <= nbytes);
+	offset = mqh->mqh_partial_bytes;
+	do
+	{
+		Size		chunksize;
+
+		/* Figure out which bytes need to be sent next. */
+		if (offset >= iov[which_iov].len)
+		{
+			offset -= iov[which_iov].len;
+			++which_iov;
+			if (which_iov >= iovcnt)
+				break;
+			continue;
+		}
+
+		/*
+		 * We want to avoid copying the data if at all possible, but every
+		 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
+		 * the last.  Thus, if a chunk other than the last one ends on a
+		 * non-MAXALIGN'd boundary, we have to combine the tail end of its
+		 * data with data from one or more following chunks until we either
+		 * reach the last chunk or accumulate a number of bytes which is
+		 * MAXALIGN'd.
+		 */
+		if (which_iov + 1 < iovcnt &&
+			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+		{
+			char		tmpbuf[MAXIMUM_ALIGNOF];
+			int			j = 0;
+
+			for (;;)
+			{
+				if (offset < iov[which_iov].len)
+				{
+					tmpbuf[j] = iov[which_iov].data[offset];
+					j++;
+					offset++;
+					if (j == MAXIMUM_ALIGNOF)
+						break;
+				}
+				else
+				{
+					offset -= iov[which_iov].len;
+					which_iov++;
+					if (which_iov >= iovcnt)
+						break;
+				}
+			}
+
+			res = local_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+
+			if (res == SHM_MQ_DETACHED)
+			{
+				/* Reset state in case caller tries to send another message. */
+				mqh->mqh_partial_bytes = 0;
+				mqh->mqh_length_word_complete = false;
+				return res;
+			}
+
+			mqh->mqh_partial_bytes += bytes_written;
+			if (res != SHM_MQ_SUCCESS)
+				return res;
+			continue;
+		}
+
+		/*
+		 * If this is the last chunk, we can write all the data, even if it
+		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
+		 * MAXALIGN_DOWN the write size.
+		 */
+		chunksize = iov[which_iov].len - offset;
+		if (which_iov + 1 < iovcnt)
+			chunksize = MAXALIGN_DOWN(chunksize);
+		res = local_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+								nowait, &bytes_written);
+
+		if (res == SHM_MQ_DETACHED)
+		{
+			/* Reset state in case caller tries to send another message. */
+			mqh->mqh_length_word_complete = false;
+			mqh->mqh_partial_bytes = 0;
+			return res;
+		}
+
+		mqh->mqh_partial_bytes += bytes_written;
+		offset += bytes_written;
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+	} while (mqh->mqh_partial_bytes < nbytes);
+
+	/* Reset for next message. */
+	mqh->mqh_partial_bytes = 0;
+	mqh->mqh_length_word_complete = false;
+
+	/* Notify receiver of the newly-written data, and return. */
+	if(notify)
+	{
+		return shm_mq_notify_receiver(mq);
+	}
+	else return res;
+}
 
 /*
  * Receive a message from a shared message queue.
@@ -933,6 +1097,125 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 	*bytes_written = sent;
 	return SHM_MQ_SUCCESS;
 }
+static shm_mq_result
+local_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+				  bool nowait, Size *bytes_written)
+{
+	shm_mq	   *mq = mqh->mqh_queue;
+	Size		sent = 0;
+	uint64		used;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available;
+
+	while (sent < nbytes)
+	{
+		bool		detached = false;
+		uint64		rb;
+
+		/* Compute number of ring buffer bytes used and available. */
+		rb = mq->mq_bytes_read;
+		Assert(mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		if (available == 0 && !mqh->mqh_counterparty_attached)
+		{
+			/*
+			 * The queue is full, so if the receiver isn't yet known to be
+			 * attached, we must wait for that to happen.
+			 */
+			if (nowait)
+			{
+				if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
+				{
+					*bytes_written = sent;
+					return SHM_MQ_DETACHED;
+				}
+				if (shm_mq_get_receiver(mq) == NULL)
+				{
+					*bytes_written = sent;
+					return SHM_MQ_WOULD_BLOCK;
+				}
+			}
+			else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
+										   mqh->mqh_handle))
+			{
+				mq->mq_detached = true;
+				*bytes_written = sent;
+				return SHM_MQ_DETACHED;
+			}
+
+			mqh->mqh_counterparty_attached = true;
+
+			/*
+			 * The receiver may have read some data after attaching, so we
+			 * must not wait without rechecking the queue state.
+			 */
+		}
+		else if (available == 0)
+		{
+			shm_mq_result res;
+			/* Let the receiver know that we need them to read some data. */
+			res = shm_mq_notify_receiver(mq);
+
+			if (res != SHM_MQ_SUCCESS)
+			{
+				*bytes_written = sent;
+				return res;
+			}
+
+			/* Skip manipulation of our latch if nowait = true. */
+			if (nowait)
+			{
+				*bytes_written = sent;
+				return SHM_MQ_WOULD_BLOCK;
+			}
+
+			/*
+			 * Wait for our latch to be set.  It might already be set for some
+			 * unrelated reason, but that'll just result in one extra trip
+			 * through the loop.  It's worth it to avoid resetting the latch
+			 * at top of loop, because setting an already-set latch is much
+			 * cheaper than setting one that has been reset.
+			 */
+			WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+			/* Reset the latch so we don't spin. */
+			ResetLatch(MyLatch);
+
+			/* An interrupt may have occurred while we were waiting. */
+			CHECK_FOR_INTERRUPTS();
+		}
+		else
+		{
+			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
+			Size		sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
+				   (char *) data + sent, sendnow);
+			sent += sendnow;
+
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			mq->mq_bytes_written += MAXALIGN(sendnow);
+
+			/*
+			 * For efficiency, we don't set the reader's latch here.  We'll do
+			 * that only when the buffer fills up or after writing an entire
+			 * message.
+			 */
+		}
+	}
+	*bytes_written = sent;
+	return SHM_MQ_SUCCESS;
+}
 
 /*
  * Wait until at least *nbytesp bytes are available to be read from the
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7a37535ab3..74bb681717 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -76,6 +76,11 @@ extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 			   Size *nbytesp, void **datap, bool nowait);
 
+extern shm_mq_result local_mq_send(shm_mq_handle *mqh,
+			Size nbytes, const void *data, bool nowait, bool notify);
+extern shm_mq_result local_mq_sendv(shm_mq_handle *mqh,
+			 shm_mq_iovec *iov, int iovcnt, bool nowait, bool notify);
+
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 
