Hi, Bharath : + (void) SetCurrentCommandIdUsedForWorker(); + myState->output_cid = GetCurrentCommandId(false);
SetCurrentCommandIdUsedForWorker already has void as return type. The '(void)' is not needed. + * rd_createSubid is marked invalid, otherwise, the table is + * not allowed to extend by the workers. nit: to extend by the workers -> to be extended by the workers For IsParallelInsertInCTASAllowed, logic is inside 'if (IS_CTAS(into))' block. You can return false when (!IS_CTAS(into)) - this would save some indentation for the body. + if (rel && rel->relpersistence != RELPERSISTENCE_TEMP) + allowed = true; Similarly, when the above condition doesn't hold, you can return false directly - reducing the next if condition to 'if (queryDesc)'. + if (!(ps && IsA(ps, GatherState) && !ps->ps_ProjInfo && + plannedstmt->parallelModeNeeded && + plannedstmt->planTree && + IsA(plannedstmt->planTree, Gather) && + plannedstmt->planTree->lefttree && + plannedstmt->planTree->lefttree->parallel_aware && + plannedstmt->planTree->lefttree->parallel_safe)) The composite condition is negated. Maybe you can write without negation: + return (ps && IsA(ps, GatherState) && !ps->ps_ProjInfo && + plannedstmt->parallelModeNeeded && + plannedstmt->planTree && + IsA(plannedstmt->planTree, Gather) && + plannedstmt->planTree->lefttree && + plannedstmt->planTree->lefttree->parallel_aware && + plannedstmt->planTree->lefttree->parallel_safe) + * Write out the number of tuples this worker has inserted. Leader will use + * it to inform to the end client. 'inform to the end client' -> 'inform the end client' (without to) Cheers On Sun, Dec 6, 2020 at 4:37 PM Bharath Rupireddy < bharath.rupireddyforpostg...@gmail.com> wrote: > Thanks Amit for the review comments. > > On Sat, Dec 5, 2020 at 4:27 PM Amit Kapila <amit.kapil...@gmail.com> > wrote: > > > > > If I'm not wrong, I think currently we have no exec nodes for DDLs. > > > I'm not sure whether we would like to introduce one for this. > > > > Yeah, I am also not in favor of having an executor node for CTAS but > > OTOH, I also don't like the way you have jammed the relevant > > information in generic PlanState. How about keeping it in GatherState > > and initializing it in ExecCreateTableAs() after the executor start. > > You are already doing special treatment for the Gather node in > > ExecCreateTableAs (via IsParallelInsertInCTASAllowed) so we can as > > well initialize the required information in GatherState in > > ExecCreateTableAs. I think that might help in reducing the special > > treatment for intoclause at different places. > > > > Done. Added required info to GatherState node. While this reduced the > changes at many other places, but had to pass the into clause and > object id to ExecInitParallelPlan() as we do not send GatherState node > to it. Hope that's okay. > > > > > Few other assorted comments: > > ========================= > > 1. > > This looks a bit odd. The function name > > 'IsParallelInsertInCTASAllowed' suggests that it just checks whether > > parallelism is allowed but it is internally changing the plan_rows. It > > might be better to do this separately if the parallelism is allowed. > > > > Changed. > > > > > 2. > > static void ExecShutdownGatherWorkers(GatherState *node); > > - > > +static void ExecParallelInsertInCTAS(GatherState *node); > > > > Spurious line removal. > > > > Corrected. > > > > > 3. > > The comment and code appear a bit misleading as the function seems to > > shutdown the workers rather than waiting for them to finish. How about > > using something like below: > > > > /* > > * Next, accumulate buffer and WAL usage. (This must wait for the workers > > * to finish, or we might get incomplete data.) > > */ > > if (nworkers > 0) > > { > > int i; > > > > /* Wait for all vacuum workers to finish */ > > WaitForParallelWorkersToFinish(lps->pcxt); > > > > for (i = 0; i < lps->pcxt->nworkers_launched; i++) > > InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); > > } > > > > This is how it works for parallel vacuum. > > > > Done. > > > > > 4. > > The above comment doesn't seem to convey what it intends to convey. > > How about changing it slightly as: "We don't compute the > > parallel_tuple_cost for CTAS because the number of tuples that are > > transferred from workers to the gather node is zero as each worker > > parallelly inserts the tuples that are resulted from its chunk of plan > > execution. This change may make the parallel plan cheap among all > > other plans, and influence the planner to consider this parallel > > plan." > > > > Changed. > > > > > Then, we can also have an Assert for path->path.rows to zero for the > CTAS case. > > > > We can not have Assert(path->path.rows == 0), because we are not > changing this parameter upstream in or before the planning phase. We > are just skipping to take it into account for CTAS. We may have to do > extra checks over different places in case we have to make planner > path->path.rows to 0 for CTAS. IMHO, that's not necessary. We can just > skip taking this value in cost_gather. Thoughts? > > > > > 5. > > + /* Prallel inserts in CTAS related info is specified below. */ > > + IntoClause *intoclause; > > + Oid objectid; > > + DestReceiver *dest; > > } PlanState; > > > > Typo. /Prallel/Parallel > > > > Corrected. > > > > > 6. > > Currently, it seems the plan look like: > > Gather (actual time=970.524..972.913 rows=0 loops=1) > > -> Create t1_test > > Workers Planned: 2 > > Workers Launched: 2 > > -> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333 > loops=3) > > > > I would prefer it to be: > > Gather (actual time=970.524..972.913 rows=0 loops=1) > > Workers Planned: 2 > > Workers Launched: 2 > > -> Create t1_test > > -> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333 > loops=3) > > > > This way it looks like the writing part is done below the Gather node > > and also it will match the Parallel Insert patch of Greg. > > > > Done. > > Attaching v7 patch. Please review it further. > > With Regards, > Bharath Rupireddy. > EnterpriseDB: http://www.enterprisedb.com >