On Tue, Mar 28, 2017 at 10:00 AM, Tomas Vondra <tomas.von...@2ndquadrant.com
> wrote:

>
>
> On 03/27/2017 01:40 PM, Rushabh Lathia wrote:
>
>>
>> ...
>> I was doing more testing with the patch and I found one more server
>> crash with the patch around same area, when we forced the gather
>> merge for the scan having zero rows.
>>
>> create table dept ( deptno numeric, dname varchar(20);
>> set parallel_tuple_cost =0;
>> set parallel_setup_cost =0;
>> set min_parallel_table_scan_size =0;
>> set min_parallel_index_scan_size =0;
>> set force_parallel_mode=regress;
>> explain analyze select * from dept order by deptno;
>>
>> This is because for leader we don't initialize the slot into gm_slots. So
>> in case where launched worker is zero and table having zero rows, we
>> end up having NULL slot into gm_slots array.
>>
>> Currently gather_merge_clear_slots() clear out the tuple table slots for
>> each
>> gather merge input and returns clear slot. In the patch I modified
>> function
>> gather_merge_clear_slots() to just clear out the tuple table slots and
>> always return NULL when All the queues and heap us exhausted.
>>
>>
> Isn't that just another sign the code might be a bit too confusing? I see
> two main issues in the code:
>
> 1) allocating 'slots' as 'nreaders+1' elements, which seems like a good
> way to cause off-by-one errors
>
> 2) mixing objects with different life spans (slots for readers vs. slot
> for the leader) seems like a bad idea too
>
> I wonder how much we gain by reusing the slot from the leader (I'd be
> surprised if it was at all measurable). David posted a patch reworking
> this, and significantly simplifying the GatherMerge node. Why not to accept
> that?
>
>
>
I think we all agree that we should get rid of nreaders from the
GatherMergeState
and need to do some code re-factor. But if I understood correctly that
Robert's
concern was to do that re-factor as separate commit.

I picked David's patch and started reviewing the changes. I applied that
patch
on top of my v2 patch (which does the re-factor of
gather_merge_clear_slots).

In David's patch, into gather_merge_init(), a loop where tuple array is
getting
allocate, that loop need to only up to nworkers_launched. Because we don't
hold the tuple array for leader. I changed that and did some other simple
changes based on mine v2 patch. I also performed manual testing with
the changes.

Please find attached re-factor patch, which is v2 patch submitted for the
server crash fix. (Attaching both the patch here again, for easy of access).

Thanks,


-- 
Rushabh Lathia
www.EnterpriseDB.com
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 62c399e..2d7eb71 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -195,9 +195,9 @@ ExecGatherMerge(GatherMergeState *node)
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				node->nreaders = 0;
-				node->reader = palloc(pcxt->nworkers_launched *
-									  sizeof(TupleQueueReader *));
+				node->reader = (TupleQueueReader **)
+											palloc(pcxt->nworkers_launched *
+												   sizeof(TupleQueueReader *));
 
 				Assert(gm->numCols);
 
@@ -205,7 +205,7 @@ ExecGatherMerge(GatherMergeState *node)
 				{
 					shm_mq_set_handle(node->pei->tqueue[i],
 									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
+					node->reader[i] =
 						CreateTupleQueueReader(node->pei->tqueue[i],
 											   node->tupDesc);
 				}
@@ -298,7 +298,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
 	{
 		int			i;
 
-		for (i = 0; i < node->nreaders; ++i)
+		for (i = 0; i < node->nworkers_launched; ++i)
 			if (node->reader[i])
 				DestroyTupleQueueReader(node->reader[i]);
 
@@ -344,28 +344,26 @@ ExecReScanGatherMerge(GatherMergeState *node)
 static void
 gather_merge_init(GatherMergeState *gm_state)
 {
-	int			nreaders = gm_state->nreaders;
+	int			nslots = gm_state->nworkers_launched + 1;
 	bool		initialize = true;
 	int			i;
 
 	/*
 	 * Allocate gm_slots for the number of worker + one more slot for leader.
-	 * Last slot is always for leader. Leader always calls ExecProcNode() to
-	 * read the tuple which will return the TupleTableSlot. Later it will
-	 * directly get assigned to gm_slot. So just initialize leader gm_slot
-	 * with NULL. For other slots below code will call
-	 * ExecInitExtraTupleSlot() which will do the initialization of worker
-	 * slots.
+	 * The final slot in the array is reserved for the leader process. This
+	 * slot is always populated via ExecProcNode(). This can be set to NULL
+	 * for now.  The remaining slots we'll initialize with a call to
+	 * ExecInitExtraTupleSlot().
 	 */
-	gm_state->gm_slots =
-		palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
-	gm_state->gm_slots[gm_state->nreaders] = NULL;
+	gm_state->gm_slots = (TupleTableSlot **)
+								palloc(nslots * sizeof(TupleTableSlot *));
+	gm_state->gm_slots[nslots - 1] = NULL;	/* nullify leader's slot */
 
-	/* Initialize the tuple slot and tuple array for each worker */
+	/* Initialize the tuple slot and tuple array for each reader */
 	gm_state->gm_tuple_buffers =
-		(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
-										(gm_state->nreaders + 1));
-	for (i = 0; i < gm_state->nreaders; i++)
+		(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * nslots);
+
+	for (i = 0; i < gm_state->nworkers_launched; i++)
 	{
 		/* Allocate the tuple array with MAX_TUPLE_STORE size */
 		gm_state->gm_tuple_buffers[i].tuple =
@@ -378,7 +376,7 @@ gather_merge_init(GatherMergeState *gm_state)
 	}
 
 	/* Allocate the resources for the merge */
-	gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+	gm_state->gm_heap = binaryheap_allocate(nslots,
 											heap_compare_slots,
 											gm_state);
 
@@ -388,10 +386,10 @@ gather_merge_init(GatherMergeState *gm_state)
 	 * leader. After this, if all active workers are unable to produce a
 	 * tuple, then re-read and this time use wait mode. For workers that were
 	 * able to produce a tuple in the earlier loop and are still active, just
-	 * try to fill the tuple array if more tuples are avaiable.
+	 * try to fill the tuple array if more tuples are available.
 	 */
 reread:
-	for (i = 0; i < nreaders + 1; i++)
+	for (i = 0; i < nslots; i++)
 	{
 		if (!gm_state->gm_tuple_buffers[i].done &&
 			(TupIsNull(gm_state->gm_slots[i]) ||
@@ -408,7 +406,7 @@ reread:
 	}
 	initialize = false;
 
-	for (i = 0; i < nreaders; i++)
+	for (i = 0; i < nslots; i++)
 		if (!gm_state->gm_tuple_buffers[i].done &&
 			(TupIsNull(gm_state->gm_slots[i]) ||
 			 gm_state->gm_slots[i]->tts_isempty))
@@ -419,14 +417,14 @@ reread:
 }
 
 /*
- * Clear out the tuple table slots for each gather merge input.
+ * Clear out the tuple table slots for each gather merge workers.
  */
 static void
 gather_merge_clear_slots(GatherMergeState *gm_state)
 {
 	int			i;
 
-	for (i = 0; i < gm_state->nreaders; i++)
+	for (i = 0; i < gm_state->nworkers_launched; i++)
 	{
 		pfree(gm_state->gm_tuple_buffers[i].tuple);
 		gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
@@ -492,13 +490,15 @@ gather_merge_getnext(GatherMergeState *gm_state)
 static void
 form_tuple_array(GatherMergeState *gm_state, int reader)
 {
-	GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+	GMReaderTupleBuffer *tuple_buffer;
 	int			i;
 
 	/* Last slot is for leader and we don't build tuple array for leader */
-	if (reader == gm_state->nreaders)
+	if (reader == gm_state->nworkers_launched)
 		return;
 
+	tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
 	/*
 	 * We here because we already read all the tuples from the tuple array, so
 	 * initialize the counter to zero.
@@ -537,7 +537,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 	 * If we're being asked to generate a tuple from the leader, then we
 	 * just call ExecProcNode as normal to produce one.
 	 */
-	if (gm_state->nreaders == reader)
+	if (gm_state->nworkers_launched == reader)
 	{
 		if (gm_state->need_to_scan_locally)
 		{
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 11a6850..e8c08c6 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1864,18 +1864,20 @@ typedef struct GatherMergeState
 	PlanState	ps;				/* its first field is NodeTag */
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
-	int			nreaders;
-	int			nworkers_launched;
-	struct TupleQueueReader **reader;
+	int			nworkers_launched;	/* number of parallel workers launched */
+	struct TupleQueueReader **reader; /* array of readers, nworkers_launched
+									   * long */
 	TupleDesc	tupDesc;
-	TupleTableSlot **gm_slots;
-	struct binaryheap *gm_heap; /* binary heap of slot indices */
+	TupleTableSlot **gm_slots;	/* array of Tuple slots, nworkers_launched + 1
+								 * long */
+	struct binaryheap *gm_heap;		/* binary heap of slot indices */
 	bool		gm_initialized; /* gather merge initilized ? */
 	bool		need_to_scan_locally;
 	int			gm_nkeys;
 	SortSupport gm_sortkeys;	/* array of length ms_nkeys */
-	struct GMReaderTupleBuffer *gm_tuple_buffers;		/* tuple buffer per
-														 * reader */
+	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* array of tuple buffers,
+													 * nworkers_launched + 1
+													 * long */
 } GatherMergeState;
 
 /* ----------------
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 3f0c3ee..62c399e 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -419,10 +419,9 @@ reread:
 }
 
 /*
- * Clear out the tuple table slots for each gather merge input,
- * and return a cleared slot.
+ * Clear out the tuple table slots for each gather merge input.
  */
-static TupleTableSlot *
+static void
 gather_merge_clear_slots(GatherMergeState *gm_state)
 {
 	int			i;
@@ -437,9 +436,6 @@ gather_merge_clear_slots(GatherMergeState *gm_state)
 	pfree(gm_state->gm_tuple_buffers);
 	/* Free the binaryheap, which was created for sort */
 	binaryheap_free(gm_state->gm_heap);
-
-	/* return any clear slot */
-	return gm_state->gm_slots[0];
 }
 
 /*
@@ -479,7 +475,8 @@ gather_merge_getnext(GatherMergeState *gm_state)
 	if (binaryheap_empty(gm_state->gm_heap))
 	{
 		/* All the queues are exhausted, and so is the heap */
-		return gather_merge_clear_slots(gm_state);
+		gather_merge_clear_slots(gm_state);
+		return NULL;
 	}
 	else
 	{
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to