Hi. At Thu, 21 Jan 2016 19:09:19 +0900, Amit Langote <langote_amit...@lab.ntt.co.jp> wrote in <56a0ae4f.9000...@lab.ntt.co.jp> > > Hi! > > On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote: > >>> Then, suppose we add a function bool ExecStartAsync(PlanState *target, > >>> ExecCallback callback, PlanState *cb_planstate, void *cb_context). > >>> For non-async-aware plan nodes, this just returns false. async-aware > >>> plan nodes should initiate some work, register some callbacks, and > >>> return. The callback that get registered should arrange in turn to > >>> register the callback passed as an argument when a tuple becomes > >>> available, passing the planstate and context provided by > >>> ExecStartAsync's caller, plus the TupleTableSlot containing the tuple. > >> > >> Although I don't imagine clearly about the case of > >> async-aware-nodes under non-aware-nodes, it seems to have a high > >> affinity with (true) parallel execution framework. > > > > The ExecStartAsync is similar to ExecStartNode of my old > > patch. One of the most annoying things of that is that it needs > > to walk down to their descendents and in turn it needs garbageous > > corresponding additional codes for all type of nodes which can > > have children. > > Unless I am missing something, I wonder if this is where > planstate_tree_walker() introduced by commit 8dd401aa is useful. For > example, I see that it's used in ExecShutdownNode() in a manner that looks > interesting for this discussion.
Oh, that's a part of parallel execution sutff. Thanks for letting me know of that. The walker approach also fits to kick functions that most types of node is unrelated. Only one (or two, including ForeignScan) types of nodes are involved. The attached patches have the same functionality but using planstate_tree_walker instead of callbacks. This seems further simpler the callbacks. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From a7f0f1f9077b474dd212db1fb690413dd7c4ef79 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Jan 2016 09:37:25 +0900 Subject: [PATCH 1/2] PoC: Async start callback for executor using planstate_tree_walker This patch allows async-capable nodes to run the node before ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to request asynchronous execution to children on ExecInit phase. As an example, nodeSeqscan registers dummy callback if requested, and nodeAppend unconditionally requests to its children. So a plan Append(SeqScan, SeqScan) runs the callback and yields LOG messages. --- src/backend/executor/execMain.c | 2 + src/backend/executor/execProcnode.c | 24 +++++ src/backend/executor/nodeAppend.c | 2 + src/backend/executor/nodeGather.c | 167 ++++++++++++++++++++------------- src/backend/executor/nodeMergeAppend.c | 3 + src/backend/executor/nodeNestloop.c | 13 +++ src/backend/executor/nodeSeqscan.c | 16 ++++ src/include/executor/executor.h | 2 + src/include/executor/nodeGather.h | 1 + src/include/executor/nodeSeqscan.h | 1 + src/include/nodes/execnodes.h | 2 + 11 files changed, 167 insertions(+), 66 deletions(-) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 76f7297..32b7bc3 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate, if (use_parallel_mode) EnterParallelMode(); + ExecStartNode(planstate); + /* * Loop until we've processed the proper number of tuples from the plan. */ diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index a31dbc9..2107ced 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -786,6 +786,30 @@ ExecEndNode(PlanState *node) } /* + * ExecStartNode - execute registered early-startup callbacks + */ +bool +ExecStartNode(PlanState *node) +{ + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_GatherState: + return ExecStartGather((GatherState *)node); + break; + case T_SeqScanState: + return ExecStartSeqScan((SeqScanState *)node); + break; + default: + break; + } + + return planstate_tree_walker(node, ExecStartNode, NULL); +} + +/* * ExecShutdownNode * * Give execution nodes a chance to stop asynchronous resource consumption diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index a26bd63..d10364c 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags) { Plan *initNode = (Plan *) lfirst(lc); + /* always request async-execition for children */ + eflags |= EXEC_FLAG_ASYNC; appendplanstates[i] = ExecInitNode(initNode, estate, eflags); i++; } diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 16c981b..097f4bb 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate); static HeapTuple gather_readnext(GatherState *gatherstate); static void ExecShutdownGatherWorkers(GatherState *node); +/* ---------------------------------------------------------------- + * StartGather + * + * Gather node can have an advantage from asynchronous execution in most + * cases because of its startup cost. + * ---------------------------------------------------------------- + */ +bool +ExecStartGather(GatherState *node) +{ + EState *estate = node->ps.state; + Gather *gather = (Gather *) node->ps.plan; + TupleTableSlot *fslot = node->funnel_slot; + int i; + + /* Don't start if already started or explicitly inhibited by the upper */ + if (node->initialized || !node->early_start) + return false; + + /* + * Initialize the parallel context and workers on first execution. We do + * this on first execution rather than during node initialization, as it + * needs to allocate large dynamic segment, so it is better to do if it + * is really needed. + */ + + /* + * Sometimes we might have to run without parallelism; but if + * parallel mode is active then we can try to fire up some workers. + */ + if (gather->num_workers > 0 && IsInParallelMode()) + { + ParallelContext *pcxt; + bool got_any_worker = false; + + /* Initialize the workers required to execute Gather node. */ + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); + + /* + * Register backend workers. We might not get as many as we + * requested, or indeed any at all. + */ + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); + + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers > 0) + { + node->nreaders = 0; + node->reader = + palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].bgwhandle == NULL) + continue; + + shm_mq_set_handle(node->pei->tqueue[i], + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders++] = + CreateTupleQueueReader(node->pei->tqueue[i], + fslot->tts_tupleDescriptor); + got_any_worker = true; + } + } + + /* No workers? Then never mind. */ + if (!got_any_worker) + ExecShutdownGatherWorkers(node); + } + + /* Run plan locally if no workers or not single-copy. */ + node->need_to_scan_locally = (node->reader == NULL) + || !gather->single_copy; + + node->early_start = false; + node->initialized = true; + return false; +} /* ---------------------------------------------------------------- * ExecInitGather @@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) Plan *outerNode; bool hasoid; TupleDesc tupDesc; + int child_eflags; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -97,7 +180,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * now initialize outer plan */ outerNode = outerPlan(node); - outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); + /* + * This outer plan is executed in another process so don't start + * asynchronously in this process + */ + child_eflags = eflags & ~EXEC_FLAG_ASYNC; + outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags); gatherstate->ps.ps_TupFromTlist = false; @@ -115,6 +203,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags) tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc); + /* + * Register asynchronous execution callback for this node. Backend workers + * needs to allocate large dynamic segment, and it is better to execute + * them at the time of first execution from this aspect. So asynchronous + * execution should be decided considering that but we omit the aspect for + * now. + */ + if (eflags & EXEC_FLAG_ASYNC) + gatherstate->early_start = true; + return gatherstate; } @@ -128,77 +226,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags) TupleTableSlot * ExecGather(GatherState *node) { - TupleTableSlot *fslot = node->funnel_slot; - int i; TupleTableSlot *slot; TupleTableSlot *resultSlot; ExprDoneCond isDone; ExprContext *econtext; - /* - * Initialize the parallel context and workers on first execution. We do - * this on first execution rather than during node initialization, as it - * needs to allocate large dynamic segment, so it is better to do if it - * is really needed. - */ + /* Initialize workers if not yet. */ if (!node->initialized) - { - EState *estate = node->ps.state; - Gather *gather = (Gather *) node->ps.plan; - - /* - * Sometimes we might have to run without parallelism; but if - * parallel mode is active then we can try to fire up some workers. - */ - if (gather->num_workers > 0 && IsInParallelMode()) - { - ParallelContext *pcxt; - bool got_any_worker = false; - - /* Initialize the workers required to execute Gather node. */ - if (!node->pei) - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - gather->num_workers); - - /* - * Register backend workers. We might not get as many as we - * requested, or indeed any at all. - */ - pcxt = node->pei->pcxt; - LaunchParallelWorkers(pcxt); - - /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers > 0) - { - node->nreaders = 0; - node->reader = - palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); - - for (i = 0; i < pcxt->nworkers; ++i) - { - if (pcxt->worker[i].bgwhandle == NULL) - continue; - - shm_mq_set_handle(node->pei->tqueue[i], - pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], - fslot->tts_tupleDescriptor); - got_any_worker = true; - } - } - - /* No workers? Then never mind. */ - if (!got_any_worker) - ExecShutdownGatherWorkers(node); - } - - /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->reader == NULL) - || !gather->single_copy; - node->initialized = true; - } + ExecStartGather(node); /* * Check to see if we're still projecting out tuples from a previous scan diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index e271927..65ef13b 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) { Plan *initNode = (Plan *) lfirst(lc); + /* always request async execution for now */ + eflags = eflags | EXEC_FLAG_ASYNC; + mergeplanstates[i] = ExecInitNode(initNode, estate, eflags); i++; } diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c index 555fa09..16c317c 100644 --- a/src/backend/executor/nodeNestloop.c +++ b/src/backend/executor/nodeNestloop.c @@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags) * inner child, because it will always be rescanned with fresh parameter * values. */ + + /* + * async execution of outer plan is benetifical if this join is requested + * as async + */ outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags); if (node->nestParams == NIL) eflags |= EXEC_FLAG_REWIND; else eflags &= ~EXEC_FLAG_REWIND; + + /* + * Async execution of the inner is inhibited if parameterized by the + * outer + */ + if (list_length(node->nestParams) > 0) + eflags &= ~ EXEC_FLAG_ASYNC; + innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags); /* diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index f12921d..3ee678d 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -39,6 +39,18 @@ static TupleTableSlot *SeqNext(SeqScanState *node); * ---------------------------------------------------------------- */ +bool +ExecStartSeqScan(SeqScanState *node) +{ + if (node->early_start) + { + elog(LOG, "dummy_async_cb is called for %p", node); + node->early_start = false; + } + + return false; +} + /* ---------------------------------------------------------------- * SeqNext * @@ -214,6 +226,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) ExecAssignResultTypeFromTL(&scanstate->ss.ps); ExecAssignScanProjectionInfo(&scanstate->ss); + /* Do early-start when requested */ + if (eflags & EXEC_FLAG_ASYNC) + scanstate->early_start = true; + return scanstate; } diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 1a44085..3d13217 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -62,6 +62,7 @@ #define EXEC_FLAG_WITH_OIDS 0x0020 /* force OIDs in returned tuples */ #define EXEC_FLAG_WITHOUT_OIDS 0x0040 /* force no OIDs in returned tuples */ #define EXEC_FLAG_WITH_NO_DATA 0x0080 /* rel scannability doesn't matter */ +#define EXEC_FLAG_ASYNC 0x0100 /* request asynchronous execution */ /* @@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate); extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern TupleTableSlot *ExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node); +extern bool ExecStartNode(PlanState *node); extern void ExecEndNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node); diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h index f76d9be..0a48a03 100644 --- a/src/include/executor/nodeGather.h +++ b/src/include/executor/nodeGather.h @@ -18,6 +18,7 @@ extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags); extern TupleTableSlot *ExecGather(GatherState *node); +extern bool ExecStartGather(GatherState *node); extern void ExecEndGather(GatherState *node); extern void ExecShutdownGather(GatherState *node); extern void ExecReScanGather(GatherState *node); diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index f2e61ff..daf54ac 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -19,6 +19,7 @@ extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags); extern TupleTableSlot *ExecSeqScan(SeqScanState *node); +extern bool ExecStartSeqScan(SeqScanState *node); extern void ExecEndSeqScan(SeqScanState *node); extern void ExecReScanSeqScan(SeqScanState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 07cd20a..4ffc2a8 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1257,6 +1257,7 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + bool early_start; } SeqScanState; /* ---------------- @@ -1968,6 +1969,7 @@ typedef struct UniqueState typedef struct GatherState { PlanState ps; /* its first field is NodeTag */ + bool early_start; bool initialized; struct ParallelExecutorInfo *pei; int nreaders; -- 1.8.3.1
>From ee2e01b8edfec09584822f94663e9bb2e03b7e95 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Thu, 21 Jan 2016 16:47:31 +0900 Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing Aside from early node execution, tuples from multiple children of a node can be received asynchronously. This patch makes ExecProcNode to return the third status EXEC_NOT_READY using estate addition to that previously returned via result. It means that the node may have more tuple to return but not available for the time. As an example, this patch also modifies nodeSeqscan to return EXEC_NOT_READY by certain probability and nodeAppend skips to the next child if it is returned. Conflicts: src/backend/executor/nodeGather.c src/backend/executor/nodeSeqscan.c src/include/nodes/execnodes.h --- src/backend/executor/execProcnode.c | 6 ++++ src/backend/executor/nodeAppend.c | 64 ++++++++++++++++++++++--------------- src/backend/executor/nodeSeqscan.c | 8 ++++- src/include/nodes/execnodes.h | 11 +++++++ 4 files changed, 62 insertions(+), 27 deletions(-) diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 2107ced..5398ca0 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -383,6 +383,8 @@ ExecProcNode(PlanState *node) if (node->instrument) InstrStartNode(node->instrument); + node->state->exec_status = EXEC_READY; + switch (nodeTag(node)) { /* @@ -540,6 +542,10 @@ ExecProcNode(PlanState *node) if (node->instrument) InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0); + if (TupIsNull(result) && + node->state->exec_status == EXEC_READY) + node->state->exec_status = EXEC_EOT; + return result; } diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index d10364c..6ba13e9 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) { AppendState *appendstate = makeNode(AppendState); PlanState **appendplanstates; + bool *stopped; int nplans; int i; ListCell *lc; @@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) nplans = list_length(node->appendplans); appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *)); + stopped = (bool *) palloc0(nplans * sizeof(bool)); /* * create new AppendState for our append node @@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.plan = (Plan *) node; appendstate->ps.state = estate; appendstate->appendplans = appendplanstates; + appendstate->stopped = stopped; appendstate->as_nplans = nplans; /* @@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags) TupleTableSlot * ExecAppend(AppendState *node) { - for (;;) + bool all_eot = false; + EState *estate = node->ps.state; + TupleTableSlot *result; + + /*!!!! This node currently works only for monotonic-forwarding scan */ + while (!all_eot) { PlanState *subnode; - TupleTableSlot *result; + int i; - /* - * figure out which subplan we are currently processing - */ - subnode = node->appendplans[node->as_whichplan]; + all_eot = true; + /* Scan the children in registered order. */ + for (i = node->as_whichplan ; i < node->as_nplans ; i++) + { + if (node->stopped[i]) + continue; - /* - * get a tuple from the subplan - */ - result = ExecProcNode(subnode); + subnode = node->appendplans[i]; + + result = ExecProcNode(subnode); - if (!TupIsNull(result)) - { /* * If the subplan gave us something then return it as-is. We do * NOT make use of the result slot that was set up in * ExecInitAppend; there's no need for it. */ - return result; + switch (estate->exec_status) + { + case EXEC_READY: + return result; + + case EXEC_NOT_READY: + all_eot = false; + break; + + case EXEC_EOT: + node->stopped[i] = true; + break; + + default: + elog(ERROR, "Unkown node status: %d", estate->exec_status); + } } - /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. - */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; - else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); - - /* Else loop back and try to get a tuple from the new subplan */ + /* XXXXX: some waiting measure is needed to wait new tuple */ } + + return NULL; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 3ee678d..61916bf 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -136,6 +136,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot) TupleTableSlot * ExecSeqScan(SeqScanState *node) { + /* Make the caller wait by some probability */ + if (random() < RAND_MAX / 10) + { + node->ss.ps.state->exec_status = EXEC_NOT_READY; + return NULL; + } + return ExecScan((ScanState *) node, (ExecScanAccessMtd) SeqNext, (ExecScanRecheckMtd) SeqRecheck); @@ -166,7 +173,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags) ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); } - /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4ffc2a8..45c6fba 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -343,6 +343,14 @@ typedef struct ResultRelInfo List *ri_onConflictSetWhere; } ResultRelInfo; +/* Enum for async awareness */ +typedef enum NodeStatus +{ + EXEC_NOT_READY, + EXEC_READY, + EXEC_EOT +} NodeStatus; + /* ---------------- * EState information * @@ -419,6 +427,8 @@ typedef struct EState HeapTuple *es_epqTuple; /* array of EPQ substitute tuples */ bool *es_epqTupleSet; /* true if EPQ tuple is provided */ bool *es_epqScanDone; /* true if EPQ tuple has been fetched */ + + NodeStatus exec_status; } EState; @@ -1147,6 +1157,7 @@ typedef struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ + bool *stopped; int as_nplans; int as_whichplan; } AppendState; -- 1.8.3.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers