Amit's parallel sequential scan assumes that we can enter parallel mode when the parallel sequential scan is initialized and exit parallel mode when the scan ends and all the code that runs in between will be happy with that. Unfortunately, that's not necessarily the case. There are two ways it can fail:
1. Some other part of the query can contain functions that are not safe to run in parallel-mode; e.g. a PL/pgsql function that writes data or uses subtransactions. 2. The user can run partially execute the query and then, while execution is suspended, go do something not parallel-safe with the results before resuming query execution. To properly assess whether a query is parallel-safe, we need to inspect the entire query for non-parallel-safe functions. We also need the code that's going to execute the plan to tell us whether or not they might want to do not-parallel-safe things between the time we start running the query and the time we finish running it. So I tried writing some code to address this; a first cut is attached. Here's what it does: 1. As we parse each query, it sets a flag in the parse-state if we see a non-immutable function. For the time being, I'm assuming immutable == parallel-safe, although that's probably not correct in detail. It also sets the flag if it sees a data-modifying operation, meaning an insert, update, delete, or locking clause. The point of this is to avoid making an extra pass over the query just to assess parallel-safety; we want to accumulate that information as we go along. 2. When parsing is complete, the parse-state flag is copied into the Query, similar to what we already do for flags like hasModifyingCTE. 3. When the query is planned, planner() sets a flag in the PlannerGlobal called parallelModeOK if the Query is not marked as parallel-mode unsafe. There's also a new cursor option, CURSOR_OPT_NO_PARALLEL, with forces parallelModeOK to false regardless of what the Query says. It initializes another flag parallelModeNeeded to false as well. The idea here is that before generating a parallel path, the planner should examine parallelModeOK and skip it if that's false. If we end up creating a plan from a parallel path, then the plan-generation function should set parallelModeNeeded. 4. At the conclusion of planning, the parallelModeNeeded flag is copied from the PlannerGlobal to the PlannedStmt. 5. ExecutorStart() calls EnterParallelMode() if parallelModeNeeded is set and we're not already in parallel mode. ExecutorEnd() calls ExitParallelMode() if EnterParallelMode() was called in ExecutorStart(). There are a few problems with this design that I don't immediately know how to solve: 1. I'm concerned that the query-rewrite step could substitute a query that is not parallel-safe for one that is. The upper Query might still be flagged as safe, and that's all that planner() looks at. 2. Interleaving the execution of two parallel queries by firing up two copies of the executor simultaneously can result in leaving parallel mode at the wrong time. 3. Any code using SPI has to think hard about whether to pass OPT_CURSOR_NO_PARALLEL. For example, PL/pgsql doesn't need to pass this flag when caching a plan for a query that will be run to completion each time it's executed. But it DOES need to pass the flag for a FOR loop over an SQL statement, because the code inside the FOR loop might do parallel-unsafe things while the query is suspended. Thoughts, either on the general approach or on what to do about the problems? -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
commit 0896fe0115bd591bde415c8c6e18d711cc988808 Author: Robert Haas <rhaas@postgresql.org> Date: Sat Feb 7 09:06:54 2015 -0500 Assess parallel safety and necessity and have executor switch on parallel mode when required. diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml index c099fcf..f368f00 100644 --- a/doc/src/sgml/spi.sgml +++ b/doc/src/sgml/spi.sgml @@ -1111,7 +1111,8 @@ SPIPlanPtr SPI_prepare_cursor(const char * <parameter>command</parameter>, int < <symbol>CURSOR_OPT_NO_SCROLL</symbol>, <symbol>CURSOR_OPT_FAST_PLAN</symbol>, <symbol>CURSOR_OPT_GENERIC_PLAN</symbol>, and - <symbol>CURSOR_OPT_CUSTOM_PLAN</symbol>. Note in particular that + <symbol>CURSOR_OPT_CUSTOM_PLAN</symbol>, and + <symbol>CURSOR_OPT_NO_PARALLEL</symbol>. Note in particular that <symbol>CURSOR_OPT_HOLD</symbol> is ignored. </para> </refsect1> diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index b56cf28..5257e46 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -674,6 +674,7 @@ lookup_agg_function(List *fnName, { Oid fnOid; bool retset; + bool parallelsafe; int nvargs; Oid vatype; Oid *true_oid_array; @@ -690,7 +691,7 @@ lookup_agg_function(List *fnName, */ fdresult = func_get_detail(fnName, NIL, NIL, nargs, input_types, false, false, - &fnOid, rettype, &retset, + &fnOid, rettype, &retset, ¶llelsafe, &nvargs, &vatype, &true_oid_array, NULL); diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index c961429..48e3852 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -161,8 +161,8 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString, query = (Query *) linitial(rewritten); Assert(query->commandType == CMD_SELECT); - /* plan the query */ - plan = pg_plan_query(query, 0, params); + /* plan the query - suppress parallelism, since we are writing data */ + plan = pg_plan_query(query, CURSOR_OPT_NO_PARALLEL, params); /* * Use a snapshot with an updated command ID to ensure this query sees diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 71b08f0..4f1e550 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -230,8 +230,14 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause, query_string = MemoryContextStrdup(PortalGetHeapMemory(portal), entry->plansource->query_string); - /* Replan if needed, and increment plan refcount for portal */ - cplan = GetCachedPlan(entry->plansource, paramLI, false); + /* + * Replan if needed, and increment plan refcount for portal. + * + * Because parallel queries can't write data, force a non-parallel plan + * if we've got CREATE TABLE .. AS EXECUTE. + */ + cplan = GetCachedPlan(entry->plansource, paramLI, false, + intoClause == NULL); plan_list = cplan->stmt_list; /* @@ -655,7 +661,7 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, } /* Replan if needed, and acquire a transient refcount */ - cplan = GetCachedPlan(entry->plansource, paramLI, true); + cplan = GetCachedPlan(entry->plansource, paramLI, true, true); plan_list = cplan->stmt_list; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 07526e8..e50f642 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -145,6 +145,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->estate == NULL); /* + * If the query requires parallel mode, and we're not already in parallel + * mode, then enter parallel mode now, and set a flag so we remember to + * exit parallel mode. + */ + if (queryDesc->plannedstmt->parallelModeNeeded && + !(eflags & EXEC_FLAG_EXPLAIN_ONLY) && + !IsInParallelMode()) + { + queryDesc->activateParallelMode = true; + EnterParallelMode(); + } + + /* * If the transaction is read-only, we need to check if any writes are * planned to non-temporary tables. EXPLAIN is considered read-only. * @@ -408,6 +421,9 @@ standard_ExecutorFinish(QueryDesc *queryDesc) MemoryContextSwitchTo(oldcontext); + if (queryDesc->activateParallelMode) + ExitParallelMode(); + estate->es_finished = true; } diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 3a93a04..f8a2352 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1259,7 +1259,7 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, */ /* Replan if needed, and increment plan refcount for portal */ - cplan = GetCachedPlan(plansource, paramLI, false); + cplan = GetCachedPlan(plansource, paramLI, false, true); stmt_list = cplan->stmt_list; /* Pop the error context stack */ @@ -1681,7 +1681,7 @@ SPI_plan_get_cached_plan(SPIPlanPtr plan) error_context_stack = &spierrcontext; /* Get the generic plan for the query */ - cplan = GetCachedPlan(plansource, NULL, plan->saved); + cplan = GetCachedPlan(plansource, NULL, plan->saved, true); Assert(cplan == plansource->gplan); /* Pop the error context stack */ @@ -2078,7 +2078,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, * Replan if needed, and increment plan refcount. If it's a saved * plan, the refcount must be backed by the CurrentResourceOwner. */ - cplan = GetCachedPlan(plansource, paramLI, plan->saved); + cplan = GetCachedPlan(plansource, paramLI, plan->saved, true); stmt_list = cplan->stmt_list; /* diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index f1a24f5..05462df 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -93,6 +93,7 @@ _copyPlannedStmt(const PlannedStmt *from) COPY_NODE_FIELD(relationOids); COPY_NODE_FIELD(invalItems); COPY_SCALAR_FIELD(nParamExec); + COPY_SCALAR_FIELD(parallelModeNeeded); return newnode; } @@ -2513,6 +2514,7 @@ _copyQuery(const Query *from) COPY_SCALAR_FIELD(resultRelation); COPY_SCALAR_FIELD(hasAggs); COPY_SCALAR_FIELD(hasWindowFuncs); + COPY_SCALAR_FIELD(hasParallelUnsafe); COPY_SCALAR_FIELD(hasSubLinks); COPY_SCALAR_FIELD(hasDistinctOn); COPY_SCALAR_FIELD(hasRecursive); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 6e8b308..abe22b5 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -852,6 +852,7 @@ _equalQuery(const Query *a, const Query *b) COMPARE_SCALAR_FIELD(resultRelation); COMPARE_SCALAR_FIELD(hasAggs); COMPARE_SCALAR_FIELD(hasWindowFuncs); + COMPARE_SCALAR_FIELD(hasParallelUnsafe); COMPARE_SCALAR_FIELD(hasSubLinks); COMPARE_SCALAR_FIELD(hasDistinctOn); COMPARE_SCALAR_FIELD(hasRecursive); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index dd1278b..9cf51d8 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -255,6 +255,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(invalItems); WRITE_INT_FIELD(nParamExec); + WRITE_BOOL_FIELD(parallelModeNeeded); } /* @@ -1716,6 +1717,8 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) WRITE_UINT_FIELD(lastPHId); WRITE_UINT_FIELD(lastRowMarkId); WRITE_BOOL_FIELD(transientPlan); + WRITE_BOOL_FIELD(parallelModeOK); + WRITE_BOOL_FIELD(parallelModeNeeded); } static void @@ -2290,6 +2293,7 @@ _outQuery(StringInfo str, const Query *node) WRITE_INT_FIELD(resultRelation); WRITE_BOOL_FIELD(hasAggs); WRITE_BOOL_FIELD(hasWindowFuncs); + WRITE_BOOL_FIELD(hasParallelUnsafe); WRITE_BOOL_FIELD(hasSubLinks); WRITE_BOOL_FIELD(hasDistinctOn); WRITE_BOOL_FIELD(hasRecursive); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ae24d05..8f4b30e 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -203,6 +203,7 @@ _readQuery(void) READ_INT_FIELD(resultRelation); READ_BOOL_FIELD(hasAggs); READ_BOOL_FIELD(hasWindowFuncs); + READ_BOOL_FIELD(hasParallelUnsafe); READ_BOOL_FIELD(hasSubLinks); READ_BOOL_FIELD(hasDistinctOn); READ_BOOL_FIELD(hasRecursive); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 9cbbcfb..3a8f931 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -178,6 +178,12 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) glob->lastRowMarkId = 0; glob->transientPlan = false; glob->hasRowSecurity = false; + glob->parallelModeOK = IsUnderPostmaster && !parse->hasParallelUnsafe && + !(cursorOptions & CURSOR_OPT_NO_PARALLEL); + glob->parallelModeNeeded = false; +#ifdef FORCE_PARALLEL_MODE + glob->parallelModeNeeded = glob->parallelModeOK; +#endif /* Determine what fraction of the plan is likely to be scanned */ if (cursorOptions & CURSOR_OPT_FAST_PLAN) @@ -256,6 +262,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) result->invalItems = glob->invalItems; result->nParamExec = glob->nParamExec; result->hasRowSecurity = glob->hasRowSecurity; + result->parallelModeNeeded = glob->parallelModeNeeded; return result; } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index a68f2e8..82f8c03 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -157,6 +157,9 @@ parse_sub_analyze(Node *parseTree, ParseState *parentParseState, query = transformStmt(pstate, parseTree); + if (pstate->p_hasParallelUnsafe) + parentParseState->p_hasParallelUnsafe = true; + free_parsestate(pstate); return query; @@ -343,6 +346,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -391,6 +396,7 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasSubLinks = pstate->p_hasSubLinks; qry->hasWindowFuncs = pstate->p_hasWindowFuncs; qry->hasAggs = pstate->p_hasAggs; + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; if (pstate->p_hasAggs) parseCheckAggregates(pstate, qry); @@ -426,6 +432,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->commandType = CMD_INSERT; pstate->p_is_insert = true; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -523,6 +531,14 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) selectQuery = transformStmt(sub_pstate, stmt->selectStmt); + /* + * Right now this is technically unnecessary because we prohibit + * INSERT in parallel mode categorically. But if that restriction + * is ever lifted then we'll need this. + */ + if (sub_pstate->p_hasParallelUnsafe) + pstate->p_hasParallelUnsafe = true; + free_parsestate(sub_pstate); /* The grammar should have produced a SELECT */ @@ -760,6 +776,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; assign_query_collations(pstate, qry); @@ -1019,6 +1036,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + assign_query_collations(pstate, qry); return qry; @@ -1479,6 +1498,8 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + assign_query_collations(pstate, qry); return qry; @@ -1907,6 +1928,8 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -2012,6 +2035,8 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) assign_query_collations(pstate, qry); + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + return qry; } @@ -2087,6 +2112,13 @@ transformDeclareCursorStmt(ParseState *pstate, DeclareCursorStmt *stmt) (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), errmsg("cannot specify both SCROLL and NO SCROLL"))); + /* + * We don't allow parallelism for prepared queries, because we can't + * support FETCH BACKWARD and similar constructs. Possibly we could + * weaken this to allow some parallel constructs but not others. + */ + pstate->p_hasParallelUnsafe = true; + result = transformStmt(pstate, stmt->query); /* Grammar should not have allowed anything but SELECT */ @@ -2187,6 +2219,8 @@ transformCreateTableAsStmt(ParseState *pstate, CreateTableAsStmt *stmt) Query *result; Query *query; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* transform contained query */ query = transformStmt(pstate, stmt->query); stmt->query = (Node *) query; @@ -2348,6 +2382,8 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, Index i; LockingClause *allrels; + pstate->p_hasParallelUnsafe = true; /* tuple locks are not parallel-safe */ + CheckSelectLocking(qry, lc->strength); /* make a clause we can pass down to subqueries to select all rels */ diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index a200804..00318bb 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -89,6 +89,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, List *argdefaults; Node *retval; bool retset; + bool parallelsafe; int nvargs; Oid vatype; FuncDetailCode fdresult; @@ -238,9 +239,11 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, fdresult = func_get_detail(funcname, fargs, argnames, nargs, actual_arg_types, !func_variadic, true, - &funcid, &rettype, &retset, + &funcid, &rettype, &retset, ¶llelsafe, &nvargs, &vatype, &declared_arg_types, &argdefaults); + if (!parallelsafe) + pstate->p_hasParallelUnsafe = true; if (fdresult == FUNCDETAIL_COERCION) { /* @@ -1256,6 +1259,7 @@ func_get_detail(List *funcname, Oid *funcid, /* return value */ Oid *rettype, /* return value */ bool *retset, /* return value */ + bool *parallelsafe, /* return value */ int *nvargs, /* return value */ Oid *vatype, /* return value */ Oid **true_typeids, /* return value */ @@ -1268,6 +1272,7 @@ func_get_detail(List *funcname, *funcid = InvalidOid; *rettype = InvalidOid; *retset = false; + *parallelsafe = true; *nvargs = 0; *vatype = InvalidOid; *true_typeids = NULL; @@ -1480,6 +1485,14 @@ func_get_detail(List *funcname, *rettype = pform->prorettype; *retset = pform->proretset; *vatype = pform->provariadic; + + /* + * for now, we judge that anything immutable is probably safe for + * paralellism. possibly stable functions should be included also, + * or possibly we should have a dedicated flag just for this purpose. + */ + *parallelsafe = (pform->provolatile == PROVOLATILE_IMMUTABLE); + /* fetch default args if caller wants 'em */ if (argdefaults && best_candidate->ndargs > 0) { diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8899448..f43f8d7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1772,7 +1772,7 @@ exec_bind_message(StringInfo input_message) * will be generated in MessageContext. The plan refcount will be * assigned to the Portal, so it will be released at portal destruction. */ - cplan = GetCachedPlan(psrc, params, false); + cplan = GetCachedPlan(psrc, params, false, true); /* * Now we can define the portal. diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 9c14e8a..8df200b 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,7 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->estate = NULL; qd->planstate = NULL; qd->totaltime = NULL; + qd->activateParallelMode = false; return qd; } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index c1d860c..73a9a93 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9128,6 +9128,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, Oid p_funcid; Oid p_rettype; bool p_retset; + bool p_parallelsafe; int p_nvargs; Oid p_vatype; Oid *p_true_typeids; @@ -9175,7 +9176,8 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, NIL, argnames, nargs, argtypes, !use_variadic, true, &p_funcid, &p_rettype, - &p_retset, &p_nvargs, &p_vatype, + &p_retset, &p_parallelsafe, + &p_nvargs, &p_vatype, &p_true_typeids, NULL); if ((p_result == FUNCDETAIL_NORMAL || p_result == FUNCDETAIL_AGGREGATE || diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 9a26a4e..40a1b4d 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -91,7 +91,7 @@ static void ReleaseGenericPlan(CachedPlanSource *plansource); static List *RevalidateCachedQuery(CachedPlanSource *plansource); static bool CheckCachedPlan(CachedPlanSource *plansource); static CachedPlan *BuildCachedPlan(CachedPlanSource *plansource, List *qlist, - ParamListInfo boundParams); + ParamListInfo boundParams, bool parallel_ok); static bool choose_custom_plan(CachedPlanSource *plansource, ParamListInfo boundParams); static double cached_plan_cost(CachedPlan *plan, bool include_planner); @@ -886,12 +886,13 @@ CheckCachedPlan(CachedPlanSource *plansource) */ static CachedPlan * BuildCachedPlan(CachedPlanSource *plansource, List *qlist, - ParamListInfo boundParams) + ParamListInfo boundParams, bool parallel_ok) { CachedPlan *plan; List *plist; bool snapshot_set; bool spi_pushed; + int cursor_options; MemoryContext plan_context; MemoryContext oldcxt = CurrentMemoryContext; @@ -945,10 +946,15 @@ BuildCachedPlan(CachedPlanSource *plansource, List *qlist, */ spi_pushed = SPI_push_conditional(); + /* Adjust cursor options if parallelism is not OK here. */ + cursor_options = plansource->cursor_options; + if (!parallel_ok) + cursor_options |= CURSOR_OPT_NO_PARALLEL; + /* * Generate the plan. */ - plist = pg_plan_queries(qlist, plansource->cursor_options, boundParams); + plist = pg_plan_queries(qlist, cursor_options, boundParams); /* Clean up SPI state */ SPI_pop_conditional(spi_pushed); @@ -1132,7 +1138,7 @@ cached_plan_cost(CachedPlan *plan, bool include_planner) */ CachedPlan * GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, - bool useResOwner) + bool useResOwner, bool parallel_ok) { CachedPlan *plan; List *qlist; @@ -1148,8 +1154,14 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, /* Make sure the querytree list is valid and we have parse-time locks */ qlist = RevalidateCachedQuery(plansource); - /* Decide whether to use a custom plan */ - customplan = choose_custom_plan(plansource, boundParams); + /* + * Decide whether to use a custom plan. + * + * We expect the !parallel_ok case to be uncommon, so to make things + * simple we just always use a custop plan in that case. Otherwise, + * we make a policy decision. + */ + customplan = !parallel_ok || choose_custom_plan(plansource, boundParams); if (!customplan) { @@ -1162,7 +1174,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, else { /* Build a new generic plan */ - plan = BuildCachedPlan(plansource, qlist, NULL); + plan = BuildCachedPlan(plansource, qlist, NULL, true); /* Just make real sure plansource->gplan is clear */ ReleaseGenericPlan(plansource); /* Link the new generic plan into the plansource */ @@ -1207,7 +1219,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, if (customplan) { /* Build a custom plan */ - plan = BuildCachedPlan(plansource, qlist, boundParams); + plan = BuildCachedPlan(plansource, qlist, boundParams, parallel_ok); /* Accumulate total costs of custom plans, but 'ware overflow */ if (plansource->num_custom_plans < INT_MAX) { diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index a2381cd..f0a0505 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -47,6 +47,7 @@ typedef struct QueryDesc TupleDesc tupDesc; /* descriptor for result tuples */ EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + bool activateParallelMode; /* should we activate parallel mode? */ /* This is always set NULL by the core system, but plugins can change it */ struct Instrumentation *totaltime; /* total time spent in ExecutorRun */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b1dfa85..ab176b5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -116,6 +116,7 @@ typedef struct Query bool hasAggs; /* has aggregates in tlist or havingQual */ bool hasWindowFuncs; /* has window functions in tlist */ + bool hasParallelUnsafe; /* has parallel-unsafe stuff */ bool hasSubLinks; /* has subquery SubLink */ bool hasDistinctOn; /* distinctClause is from DISTINCT ON */ bool hasRecursive; /* WITH RECURSIVE was specified */ @@ -2182,6 +2183,7 @@ typedef struct SecLabelStmt #define CURSOR_OPT_FAST_PLAN 0x0020 /* prefer fast-start plan */ #define CURSOR_OPT_GENERIC_PLAN 0x0040 /* force use of generic plan */ #define CURSOR_OPT_CUSTOM_PLAN 0x0080 /* force use of custom plan */ +#define CURSOR_OPT_NO_PARALLEL 0x0100 /* suppress use of parallel plan */ typedef struct DeclareCursorStmt { diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 316c9ce..1d6ebab 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -72,6 +72,8 @@ typedef struct PlannedStmt bool hasRowSecurity; /* row security applied? */ + bool parallelModeNeeded; /* run in parallel mode? */ + } PlannedStmt; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 6845a40..57920cd 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -103,6 +103,10 @@ typedef struct PlannerGlobal bool hasRowSecurity; /* row security applied? */ + bool parallelModeOK; /* are we allowed to use parallel mode? */ + + bool parallelModeNeeded; /* do we actually need parallel mode? */ + } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/parser/parse_func.h b/src/include/parser/parse_func.h index 3264691..db8384f 100644 --- a/src/include/parser/parse_func.h +++ b/src/include/parser/parse_func.h @@ -37,7 +37,7 @@ extern FuncDetailCode func_get_detail(List *funcname, List *fargs, List *fargnames, int nargs, Oid *argtypes, bool expand_variadic, bool expand_defaults, - Oid *funcid, Oid *rettype, + Oid *funcid, Oid *rettype, bool *parallelsafe, bool *retset, int *nvargs, Oid *vatype, Oid **true_typeids, List **argdefaults); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 3103b71..f86640f 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -149,6 +149,7 @@ struct ParseState Node *p_value_substitute; /* what to replace VALUE with, if any */ bool p_hasAggs; bool p_hasWindowFuncs; + bool p_hasParallelUnsafe; bool p_hasSubLinks; bool p_hasModifyingCTE; bool p_is_insert; diff --git a/src/include/utils/plancache.h b/src/include/utils/plancache.h index ef206c4..65c7976 100644 --- a/src/include/utils/plancache.h +++ b/src/include/utils/plancache.h @@ -173,7 +173,7 @@ extern List *CachedPlanGetTargetList(CachedPlanSource *plansource); extern CachedPlan *GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, - bool useResOwner); + bool useResOwner, bool parallel_ok); extern void ReleaseCachedPlan(CachedPlan *plan, bool useResOwner); #endif /* PLANCACHE_H */ diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c index 492c1ef..1b1d8eb 100644 --- a/src/pl/plperl/plperl.c +++ b/src/pl/plperl/plperl.c @@ -3191,7 +3191,7 @@ plperl_spi_query(char *query) pg_verifymbstr(query, strlen(query), false); /* Create a cursor for the query */ - plan = SPI_prepare(query, 0, NULL); + plan = SPI_prepare_cursor(query, 0, NULL, CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare() failed:%s", SPI_result_code_string(SPI_result)); @@ -3429,7 +3429,8 @@ plperl_spi_prepare(char *query, int argc, SV **argv) /************************************************************ * Prepare the plan and check for errors ************************************************************/ - plan = SPI_prepare(query, argc, qdesc->argtypes); + plan = SPI_prepare_cursor(query, argc, qdesc->argtypes, + CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare() failed:%s", diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index ae5421f..91287da 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -185,8 +185,8 @@ static Datum exec_eval_expr(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, bool *isNull, Oid *rettype); -static int exec_run_select(PLpgSQL_execstate *estate, - PLpgSQL_expr *expr, long maxtuples, Portal *portalP); +static int exec_run_select(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, + long maxtuples, int cursorOptions, Portal *portalP); static int exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt, Portal portal, bool prefetch_ok); static ParamListInfo setup_param_list(PLpgSQL_execstate *estate, @@ -1530,7 +1530,7 @@ exec_stmt_perform(PLpgSQL_execstate *estate, PLpgSQL_stmt_perform *stmt) { PLpgSQL_expr *expr = stmt->expr; - (void) exec_run_select(estate, expr, 0, NULL); + (void) exec_run_select(estate, expr, 0, 0, NULL); exec_set_found(estate, (estate->eval_processed != 0)); exec_eval_cleanup(estate); @@ -2076,7 +2076,7 @@ exec_stmt_fors(PLpgSQL_execstate *estate, PLpgSQL_stmt_fors *stmt) /* * Open the implicit cursor for the statement using exec_run_select */ - exec_run_select(estate, stmt->query, 0, &portal); + exec_run_select(estate, stmt->query, 0, CURSOR_OPT_NO_PARALLEL, &portal); /* * Execute the loop @@ -2167,7 +2167,8 @@ exec_stmt_forc(PLpgSQL_execstate *estate, PLpgSQL_stmt_forc *stmt) Assert(query); if (query->plan == NULL) - exec_prepare_plan(estate, query, curvar->cursor_options); + exec_prepare_plan(estate, query, + curvar->cursor_options | CURSOR_OPT_NO_PARALLEL); /* * Set up ParamListInfo (hook function and possibly data values) @@ -2776,7 +2777,7 @@ exec_stmt_return_query(PLpgSQL_execstate *estate, if (stmt->query != NULL) { /* static query */ - exec_run_select(estate, stmt->query, 0, &portal); + exec_run_select(estate, stmt->query, 0, 0, &portal); } else { @@ -3680,7 +3681,7 @@ exec_stmt_dynfors(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynfors *stmt) int rc; portal = exec_dynquery_with_params(estate, stmt->query, stmt->params, - NULL, 0); + NULL, CURSOR_OPT_NO_PARALLEL); /* * Execute the loop @@ -3739,7 +3740,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) */ query = stmt->query; if (query->plan == NULL) - exec_prepare_plan(estate, query, stmt->cursor_options); + exec_prepare_plan(estate, query, + stmt->cursor_options | CURSOR_OPT_NO_PARALLEL); } else if (stmt->dynquery != NULL) { @@ -3751,7 +3753,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) stmt->dynquery, stmt->params, curname, - stmt->cursor_options); + stmt->cursor_options | + CURSOR_OPT_NO_PARALLEL); /* * If cursor variable was NULL, store the generated portal name in it @@ -3807,7 +3810,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) query = curvar->cursor_explicit_expr; if (query->plan == NULL) - exec_prepare_plan(estate, query, curvar->cursor_options); + exec_prepare_plan(estate, query, curvar->cursor_options + | CURSOR_OPT_NO_PARALLEL); } /* @@ -4793,7 +4797,7 @@ exec_eval_expr(PLpgSQL_execstate *estate, /* * Else do it the hard way via exec_run_select */ - rc = exec_run_select(estate, expr, 2, NULL); + rc = exec_run_select(estate, expr, 2, 0, NULL); if (rc != SPI_OK_SELECT) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -4847,8 +4851,8 @@ exec_eval_expr(PLpgSQL_execstate *estate, * ---------- */ static int -exec_run_select(PLpgSQL_execstate *estate, - PLpgSQL_expr *expr, long maxtuples, Portal *portalP) +exec_run_select(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, + long maxtuples, int cursorOptions, Portal *portalP) { ParamListInfo paramLI; int rc; @@ -4857,7 +4861,7 @@ exec_run_select(PLpgSQL_execstate *estate, * On the first call for this expression generate the plan */ if (expr->plan == NULL) - exec_prepare_plan(estate, expr, 0); + exec_prepare_plan(estate, expr, cursorOptions); /* * Set up ParamListInfo (hook function and possibly data values) diff --git a/src/pl/plpython/plpy_cursorobject.c b/src/pl/plpython/plpy_cursorobject.c index 2c458d3..6a20da0 100644 --- a/src/pl/plpython/plpy_cursorobject.c +++ b/src/pl/plpython/plpy_cursorobject.c @@ -126,7 +126,7 @@ PLy_cursor_query(const char *query) pg_verifymbstr(query, strlen(query), false); - plan = SPI_prepare(query, 0, NULL); + plan = SPI_prepare_cursor(query, 0, NULL, CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare failed: %s", SPI_result_code_string(SPI_result)); diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 465b316..837edd7 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -133,7 +133,8 @@ PLy_spi_prepare(PyObject *self, PyObject *args) } pg_verifymbstr(query, strlen(query), false); - plan->plan = SPI_prepare(query, plan->nargs, plan->types); + plan->plan = SPI_prepare_cursor(query, plan->nargs, plan->types, + CURSOR_OPT_NO_PARALLEL); if (plan->plan == NULL) elog(ERROR, "SPI_prepare failed: %s", SPI_result_code_string(SPI_result)); diff --git a/src/pl/tcl/pltcl.c b/src/pl/tcl/pltcl.c index d6b72f7..739878e 100644 --- a/src/pl/tcl/pltcl.c +++ b/src/pl/tcl/pltcl.c @@ -2174,7 +2174,8 @@ pltcl_SPI_prepare(ClientData cdata, Tcl_Interp *interp, * Prepare the plan and check for errors ************************************************************/ UTF_BEGIN; - qdesc->plan = SPI_prepare(UTF_U2E(argv[1]), nargs, qdesc->argtypes); + qdesc->plan = SPI_prepare_cursor(UTF_U2E(argv[1]), nargs, + qdesc->argtypes, CURSOR_OPT_NO_PARALLEL); UTF_END; if (qdesc->plan == NULL)
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers