diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e9a5d5a1a5..b4f8386899 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -90,7 +90,10 @@ tqueueShutdownReceiver(DestReceiver *self)
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
 	if (tqueue->queue != NULL)
+	{
+		empty_queue(tqueue->queue);
 		shm_mq_detach(tqueue->queue);
+	}
 	tqueue->queue = NULL;
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 770559a03e..fae7eb9c98 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -25,6 +25,8 @@
 #include "storage/shm_mq.h"
 #include "storage/spin.h"
 
+#define LOCAL_TUPLE_QUEUE_SIZE		6553600
+
 /*
  * This structure represents the actual queue, stored in shared memory.
  *
@@ -79,6 +81,25 @@ struct shm_mq
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
 };
 
+/* This is the structure for local queue where a worker can write
+ * tuples when it's shared queue is full.
+ *
+ * Each worker has it's own local queue where it can store tuples
+ * when master is busy and worker's shared queue gets full. Tuples
+ * are copied into shared queue via single memcpy equal to the space
+ * available in shared queue. Since, local queue is never shared with
+ * the master, we do not require any locking mechanism to write tuples
+ * in it, hence writing in local queue is a cheap operation.
+ */
+struct local_mq
+{
+	uint64		mq_bytes_read;
+	uint64		mq_bytes_written;
+	Size		mq_ring_size;
+	uint8		mq_ring_offset;
+	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * This structure is a backend-private handle for access to a queue.
  *
@@ -128,7 +149,9 @@ struct shm_mq
  */
 struct shm_mq_handle
 {
+	bool		mqh_local;
 	shm_mq	   *mqh_queue;
+	local_mq   *mqh_local_queue;
 	dsm_segment *mqh_segment;
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
@@ -150,12 +173,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 					 BackgroundWorkerHandle *handle);
+static bool shm_mq_is_detached(volatile shm_mq *mq);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+static Size space_in_shm(shm_mq *mq);
+
+/* Routines required for local queue */
+static local_mq * local_mq_create(void *address, Size size);
+static shm_mq_handle * local_mq_attach(shm_mq_handle *mqh);
+static Size space_in_local(local_mq *lq, Size tuple_size);
+static bool read_local_queue(local_mq *lq);
+static shm_mq_result write_in_local_queue(local_mq *mq, shm_mq_iovec *iov);
+static void local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written);
+static shm_mq_result copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool read_anyway);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
 const Size	shm_mq_minimum_size =
@@ -289,6 +323,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
 
 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+	mqh->mqh_local = false;
+	mqh->mqh_local_queue = NULL;
 	mqh->mqh_queue = mq;
 	mqh->mqh_segment = seg;
 	mqh->mqh_handle = handle;
@@ -319,17 +355,120 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 }
 
 /*
- * Write a message into a shared message queue.
+ * Write a message into a shared or local message queue, as per the space
+ * availability in these queues. If space is available in shared queue then
+ * we simply write the message there and return. Else we write it in local
+ * queue. Once both the queues are full, we wait till some of the data in
+ * shared queue is read and then copy the data from local to shared queue
+ * and continue writing in local queue. After writing in local queue we
+ * check if there is space available in shared queue and we copy the data
+ * from local to shared queue then itself.
  */
 shm_mq_result
 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 {
 	shm_mq_iovec iov;
+	local_mq   *lq;
+	shm_mq_result res;
+	Size		tuple_size,
+				local_space,
+				shm_space;
 
 	iov.data = data;
 	iov.len = nbytes;
+	shm_space = space_in_shm(mqh->mqh_queue);
+
+	/* this is actual size for this tuple which will be written in queue */
+	tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+	/*
+	 * if there is enough space in shared_queue and never been to local queue
+	 * then write the tuple in shared queue.
+	 */
+	if (shm_space > tuple_size && !mqh->mqh_local)
+		res = shm_mq_sendv(mqh, &iov, 1, nowait);
+
+	else
+	{
+		/* if queue is detached for some reason, nothing to do */
+		if (shm_mq_is_detached(mqh->mqh_queue))
+			return SHM_MQ_DETACHED;
+
+		/*
+		 * once started with local queue, the tuples will flow from local to
+		 * shared queue untill local queue is empty
+		 */
+		mqh->mqh_local = true;
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+		/* create and attach a local queue, if it is not yet created */
+		if (mqh->mqh_local_queue == NULL)
+			mqh = local_mq_attach(mqh);
+
+		lq = mqh->mqh_local_queue;
+		local_space = space_in_local(lq, tuple_size);
+
+		/* write in local queue if there is enough space */
+		if (local_space >= tuple_size)
+		{
+			res = write_in_local_queue(lq, &iov);
+
+			/*
+			 * if we have some data in local queue and some space in shared
+			 * queue then copy it to shared queue
+			 */
+			if (read_local_queue(lq) && shm_space > 0)
+				copy_local_to_shared(lq, mqh, false);
+		}
+		else
+		{
+			/*
+			 * if local queue is full, then copy some data to shared queue
+			 * till enough space becomes available in local queue
+			 */
+			do
+			{
+				while (shm_space < tuple_size)
+				{
+					/*
+					 * cannot send data to shared queue, unless there is
+					 * required space, so wait till we get some space, since
+					 * we cannot write anymore in local queue as of now
+					 */
+					WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+					/* Reset the latch so we don't spin. */
+					ResetLatch(MyLatch);
+
+					/* An interrupt may have occurred while we were waiting. */
+					CHECK_FOR_INTERRUPTS();
+
+					/* if queue is detached then nothing to do */
+					if (shm_mq_is_detached(mqh->mqh_queue))
+						return SHM_MQ_DETACHED;
+				}
+				if (read_local_queue(lq) && shm_space >= tuple_size)
+					copy_local_to_shared(lq, mqh, false);
+
+				local_space = space_in_local(lq, tuple_size);
+
+			} while (local_space <= tuple_size);
+
+			/*
+			 * once space is available in local queue, write the tuple
+			 * appropriately. If local queue has become empty, then write the
+			 * tuple in shared queue itself, otherwise continue with local
+			 * queue itself.
+			 */
+			if (local_space > 0)
+				res = write_in_local_queue(lq, &iov);
+			else
+			{
+				mqh->mqh_local = false;
+				res = shm_mq_sendv(mqh, &iov, 1, nowait);
+			}
+		}
+	}
+	return res;
 }
 
 /*
@@ -1133,6 +1272,20 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 }
 
 /*
+ * Get if the shm_mq is deatched.
+ */
+static bool
+shm_mq_is_detached(volatile shm_mq *mq)
+{
+	bool		ret;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	ret = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+	return ret;
+}
+
+/*
  * Get the number of bytes read.  The receiver need not use this to access
  * the count of bytes read, but the sender must.
  */
@@ -1224,3 +1377,259 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 
 	shm_mq_detach_internal(mq);
 }
+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar to shm_mq_create.
+ */
+static local_mq *
+local_mq_create(void *address, Size size)
+{
+	local_mq   *mq = address;
+	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+	size = MAXALIGN_DOWN(size);
+
+	/* Queue size must be large enough to hold some data. */
+	Assert(size > data_offset);
+
+	/* Initialize queue header. */
+	mq->mq_bytes_read = 0;
+	mq->mq_bytes_written = 0;
+	mq->mq_ring_size = size - data_offset;
+	mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring);
+	return mq;
+}
+
+/* routine to create and attach local_mq to the shm_mq_handle */
+static shm_mq_handle *
+local_mq_attach(shm_mq_handle *mqh)
+{
+	/*
+	 * create a local queue, the size of this queue should be way higher than
+	 * PARALLEL_TUPLE_QUEUE_SIZE
+	 */
+	char	   *mq;
+	Size		len;
+
+	len = LOCAL_TUPLE_QUEUE_SIZE;
+	mq = palloc0(len);
+	mqh->mqh_local_queue = local_mq_create(mq, len);
+
+	return mqh;
+}
+
+/* check the space availability in local queue */
+static Size
+space_in_local(local_mq *lq, Size tuple_size)
+{
+	uint64		read,
+				written;
+	Size		used,
+				available,
+				ringsize;
+
+	ringsize = lq->mq_ring_size;
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+	used = written - read;
+	available = ringsize - used;
+
+	return available;
+}
+
+/* routine to check if there is enough space in shared_queue */
+static Size
+space_in_shm(shm_mq *mq)
+{
+	uint64		read,
+				written;
+	Size		used,
+				available,
+				ringsize;
+	bool		detached = false;
+
+	ringsize = mq->mq_ring_size;
+	read = shm_mq_get_bytes_read(mq, &detached);
+	written = shm_mq_get_bytes_written(mq, &detached);
+
+	used = written - read;
+	available = ringsize - used;
+
+	return available;
+}
+
+/*
+ * Routine to check if reading from local queue is possible. If local
+ * queue is atleast 5% used then we allow reading from local queue
+ */
+static bool
+read_local_queue(local_mq *lq)
+{
+	uint64		written,
+				read;
+
+	written = lq->mq_bytes_written;
+	read = lq->mq_bytes_read;
+
+	if ((written - read) >= .05 * lq->mq_ring_size)
+		return true;
+	else
+		return false;
+}
+
+/* Routine to write tuple in local queue. */
+static shm_mq_result
+write_in_local_queue(local_mq *lq, shm_mq_iovec *iov)
+{
+	uint64		bytes_written,
+				nbytes,
+				tuple_size;
+	Size		chunksize;
+	int			i;
+
+	tuple_size = sizeof(Size) + iov->len;
+	nbytes = 0;
+	bytes_written = 0;
+
+	/* Compute total size of write. */
+	for (i = 0; i < 1; ++i)
+		nbytes += iov[i].len;
+
+	local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written);
+
+	chunksize = iov[0].len;
+	local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written);
+
+	Assert(bytes_written > 0);
+	Assert(bytes_written == tuple_size);
+	return SHM_MQ_SUCCESS;
+}
+
+/* Routine to pass a batch of tuples from local to shared queue in one go */
+static shm_mq_result
+copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool nowait)
+{
+	uint64		to_read,
+				bytes_read;
+	Size		read_offset,
+				available,
+				used;
+	char	   *data;
+	shm_mq_result res;
+
+	bytes_read = 0;
+
+	if (shm_mq_is_detached(mqh->mqh_queue))
+		return SHM_MQ_DETACHED;
+
+	used = lq->mq_bytes_written - lq->mq_bytes_read;
+	Assert(used <= lq->mq_ring_size);
+	Assert(lq->mq_bytes_read <= lq->mq_bytes_written);
+	read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+	available = space_in_shm(mqh->mqh_queue);
+
+	/* always read data in aligned form */
+	to_read = MAXALIGN_DOWN(Min(used, available));
+
+	/*
+	 * if the amount of data to be send from local queue involves wrapping of
+	 * local queue, then send only the data till the end of queue right now
+	 * and rest later.
+	 */
+	if (read_offset + to_read > lq->mq_ring_size)
+		to_read = lq->mq_ring_size - read_offset;
+
+	data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]);
+	res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read);
+
+	if (res != SHM_MQ_SUCCESS)
+		return res;
+
+	Assert(bytes_read == to_read);
+	lq->mq_bytes_read += bytes_read;
+	shm_mq_notify_receiver(mqh->mqh_queue);
+
+	return res;
+}
+
+/*
+ * This is the function which actually writes the tuple in the local_queue,
+ * it is same as shm_mq_send_bytes is for shm_mq.
+ */
+static void
+local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written)
+{
+	uint64		used;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available,
+				sent = 0,
+				sendnow;
+
+	uint64		rb;
+
+	while (sent < nbytes)
+	{
+		/* Compute number of ring buffer bytes used and available. */
+		rb = mq->mq_bytes_read;
+		Assert(mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		if (available == 0)
+			elog(ERROR, "local queue full, this should never be reached");
+
+		else
+		{
+			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
+
+			sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow);
+			sent += sendnow;
+
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			mq->mq_bytes_written += MAXALIGN(sendnow);
+		}
+	}
+	*bytes_written += sent;
+}
+
+/*
+ * Empty the local queue by copying all the data from local to shared queue.
+ * This is required before shutdown of worker.
+ */
+void
+empty_queue(shm_mq_handle *mqh)
+{
+	local_mq   *lq;
+	uint64		read,
+				written;
+
+	lq = mqh->mqh_local_queue;
+
+	if (lq == NULL || lq->mq_bytes_written == 0)
+		return;
+
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+
+	while (written > read && !shm_mq_is_detached(mqh->mqh_queue))
+	{
+		copy_local_to_shared(lq, mqh, false);
+		read = lq->mq_bytes_read;
+		written = lq->mq_bytes_written;
+	}
+	/* this local queue is not required anymore, hence free the space. */
+	pfree(mqh->mqh_local_queue);
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7709efcc48..80afb435e0 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
+/* Routines and structures required for local and shared queue type architecture */
+extern void empty_queue(shm_mq_handle *mqh);
+struct local_mq;
+typedef struct local_mq local_mq;
 #endif							/* SHM_MQ_H */
