On Tue, Mar 28, 2017 at 10:00 AM, Tomas Vondra <tomas.von...@2ndquadrant.com > wrote:
> > > On 03/27/2017 01:40 PM, Rushabh Lathia wrote: > >> >> ... >> I was doing more testing with the patch and I found one more server >> crash with the patch around same area, when we forced the gather >> merge for the scan having zero rows. >> >> create table dept ( deptno numeric, dname varchar(20); >> set parallel_tuple_cost =0; >> set parallel_setup_cost =0; >> set min_parallel_table_scan_size =0; >> set min_parallel_index_scan_size =0; >> set force_parallel_mode=regress; >> explain analyze select * from dept order by deptno; >> >> This is because for leader we don't initialize the slot into gm_slots. So >> in case where launched worker is zero and table having zero rows, we >> end up having NULL slot into gm_slots array. >> >> Currently gather_merge_clear_slots() clear out the tuple table slots for >> each >> gather merge input and returns clear slot. In the patch I modified >> function >> gather_merge_clear_slots() to just clear out the tuple table slots and >> always return NULL when All the queues and heap us exhausted. >> >> > Isn't that just another sign the code might be a bit too confusing? I see > two main issues in the code: > > 1) allocating 'slots' as 'nreaders+1' elements, which seems like a good > way to cause off-by-one errors > > 2) mixing objects with different life spans (slots for readers vs. slot > for the leader) seems like a bad idea too > > I wonder how much we gain by reusing the slot from the leader (I'd be > surprised if it was at all measurable). David posted a patch reworking > this, and significantly simplifying the GatherMerge node. Why not to accept > that? > > > I think we all agree that we should get rid of nreaders from the GatherMergeState and need to do some code re-factor. But if I understood correctly that Robert's concern was to do that re-factor as separate commit. I picked David's patch and started reviewing the changes. I applied that patch on top of my v2 patch (which does the re-factor of gather_merge_clear_slots). In David's patch, into gather_merge_init(), a loop where tuple array is getting allocate, that loop need to only up to nworkers_launched. Because we don't hold the tuple array for leader. I changed that and did some other simple changes based on mine v2 patch. I also performed manual testing with the changes. Please find attached re-factor patch, which is v2 patch submitted for the server crash fix. (Attaching both the patch here again, for easy of access). Thanks, -- Rushabh Lathia www.EnterpriseDB.com
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 62c399e..2d7eb71 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -195,9 +195,9 @@ ExecGatherMerge(GatherMergeState *node) /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->nreaders = 0; - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); + node->reader = (TupleQueueReader **) + palloc(pcxt->nworkers_launched * + sizeof(TupleQueueReader *)); Assert(gm->numCols); @@ -205,7 +205,7 @@ ExecGatherMerge(GatherMergeState *node) { shm_mq_set_handle(node->pei->tqueue[i], pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = + node->reader[i] = CreateTupleQueueReader(node->pei->tqueue[i], node->tupDesc); } @@ -298,7 +298,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node) { int i; - for (i = 0; i < node->nreaders; ++i) + for (i = 0; i < node->nworkers_launched; ++i) if (node->reader[i]) DestroyTupleQueueReader(node->reader[i]); @@ -344,28 +344,26 @@ ExecReScanGatherMerge(GatherMergeState *node) static void gather_merge_init(GatherMergeState *gm_state) { - int nreaders = gm_state->nreaders; + int nslots = gm_state->nworkers_launched + 1; bool initialize = true; int i; /* * Allocate gm_slots for the number of worker + one more slot for leader. - * Last slot is always for leader. Leader always calls ExecProcNode() to - * read the tuple which will return the TupleTableSlot. Later it will - * directly get assigned to gm_slot. So just initialize leader gm_slot - * with NULL. For other slots below code will call - * ExecInitExtraTupleSlot() which will do the initialization of worker - * slots. + * The final slot in the array is reserved for the leader process. This + * slot is always populated via ExecProcNode(). This can be set to NULL + * for now. The remaining slots we'll initialize with a call to + * ExecInitExtraTupleSlot(). */ - gm_state->gm_slots = - palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); - gm_state->gm_slots[gm_state->nreaders] = NULL; + gm_state->gm_slots = (TupleTableSlot **) + palloc(nslots * sizeof(TupleTableSlot *)); + gm_state->gm_slots[nslots - 1] = NULL; /* nullify leader's slot */ - /* Initialize the tuple slot and tuple array for each worker */ + /* Initialize the tuple slot and tuple array for each reader */ gm_state->gm_tuple_buffers = - (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * - (gm_state->nreaders + 1)); - for (i = 0; i < gm_state->nreaders; i++) + (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * nslots); + + for (i = 0; i < gm_state->nworkers_launched; i++) { /* Allocate the tuple array with MAX_TUPLE_STORE size */ gm_state->gm_tuple_buffers[i].tuple = @@ -378,7 +376,7 @@ gather_merge_init(GatherMergeState *gm_state) } /* Allocate the resources for the merge */ - gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, + gm_state->gm_heap = binaryheap_allocate(nslots, heap_compare_slots, gm_state); @@ -388,10 +386,10 @@ gather_merge_init(GatherMergeState *gm_state) * leader. After this, if all active workers are unable to produce a * tuple, then re-read and this time use wait mode. For workers that were * able to produce a tuple in the earlier loop and are still active, just - * try to fill the tuple array if more tuples are avaiable. + * try to fill the tuple array if more tuples are available. */ reread: - for (i = 0; i < nreaders + 1; i++) + for (i = 0; i < nslots; i++) { if (!gm_state->gm_tuple_buffers[i].done && (TupIsNull(gm_state->gm_slots[i]) || @@ -408,7 +406,7 @@ reread: } initialize = false; - for (i = 0; i < nreaders; i++) + for (i = 0; i < nslots; i++) if (!gm_state->gm_tuple_buffers[i].done && (TupIsNull(gm_state->gm_slots[i]) || gm_state->gm_slots[i]->tts_isempty)) @@ -419,14 +417,14 @@ reread: } /* - * Clear out the tuple table slots for each gather merge input. + * Clear out the tuple table slots for each gather merge workers. */ static void gather_merge_clear_slots(GatherMergeState *gm_state) { int i; - for (i = 0; i < gm_state->nreaders; i++) + for (i = 0; i < gm_state->nworkers_launched; i++) { pfree(gm_state->gm_tuple_buffers[i].tuple); gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); @@ -492,13 +490,15 @@ gather_merge_getnext(GatherMergeState *gm_state) static void form_tuple_array(GatherMergeState *gm_state, int reader) { - GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + GMReaderTupleBuffer *tuple_buffer; int i; /* Last slot is for leader and we don't build tuple array for leader */ - if (reader == gm_state->nreaders) + if (reader == gm_state->nworkers_launched) return; + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + /* * We here because we already read all the tuples from the tuple array, so * initialize the counter to zero. @@ -537,7 +537,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) * If we're being asked to generate a tuple from the leader, then we * just call ExecProcNode as normal to produce one. */ - if (gm_state->nreaders == reader) + if (gm_state->nworkers_launched == reader) { if (gm_state->need_to_scan_locally) { diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 11a6850..e8c08c6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1864,18 +1864,20 @@ typedef struct GatherMergeState PlanState ps; /* its first field is NodeTag */ bool initialized; struct ParallelExecutorInfo *pei; - int nreaders; - int nworkers_launched; - struct TupleQueueReader **reader; + int nworkers_launched; /* number of parallel workers launched */ + struct TupleQueueReader **reader; /* array of readers, nworkers_launched + * long */ TupleDesc tupDesc; - TupleTableSlot **gm_slots; - struct binaryheap *gm_heap; /* binary heap of slot indices */ + TupleTableSlot **gm_slots; /* array of Tuple slots, nworkers_launched + 1 + * long */ + struct binaryheap *gm_heap; /* binary heap of slot indices */ bool gm_initialized; /* gather merge initilized ? */ bool need_to_scan_locally; int gm_nkeys; SortSupport gm_sortkeys; /* array of length ms_nkeys */ - struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per - * reader */ + struct GMReaderTupleBuffer *gm_tuple_buffers; /* array of tuple buffers, + * nworkers_launched + 1 + * long */ } GatherMergeState; /* ----------------
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 3f0c3ee..62c399e 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -419,10 +419,9 @@ reread: } /* - * Clear out the tuple table slots for each gather merge input, - * and return a cleared slot. + * Clear out the tuple table slots for each gather merge input. */ -static TupleTableSlot * +static void gather_merge_clear_slots(GatherMergeState *gm_state) { int i; @@ -437,9 +436,6 @@ gather_merge_clear_slots(GatherMergeState *gm_state) pfree(gm_state->gm_tuple_buffers); /* Free the binaryheap, which was created for sort */ binaryheap_free(gm_state->gm_heap); - - /* return any clear slot */ - return gm_state->gm_slots[0]; } /* @@ -479,7 +475,8 @@ gather_merge_getnext(GatherMergeState *gm_state) if (binaryheap_empty(gm_state->gm_heap)) { /* All the queues are exhausted, and so is the heap */ - return gather_merge_clear_slots(gm_state); + gather_merge_clear_slots(gm_state); + return NULL; } else {
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers