On Fri, Sep 9, 2022 at 15:02 PM Peter Smith <smithpb2...@gmail.com> wrote: > Here are my review comments for the v28-0001 patch: > > (There may be some overlap with other people's review comments and/or > some fixes already made).
Thanks for your comments. > 5. src/backend/libpq/pqmq.c > > + { > + if (IsParallelWorker()) > + SendProcSignal(pq_mq_parallel_leader_pid, > + PROCSIG_PARALLEL_MESSAGE, > + pq_mq_parallel_leader_backend_id); > + else > + { > + Assert(IsLogicalParallelApplyWorker()); > + SendProcSignal(pq_mq_parallel_leader_pid, > + PROCSIG_PARALLEL_APPLY_MESSAGE, > + pq_mq_parallel_leader_backend_id); > + } > + } > > This code can be simplified if you want to. For example, > > { > ProcSignalReason reason; > Assert(IsParallelWorker() || IsLogicalParallelApplyWorker()); > reason = IsParallelWorker() ? PROCSIG_PARALLEL_MESSAGE : > PROCSIG_PARALLEL_APPLY_MESSAGE; > SendProcSignal(pq_mq_parallel_leader_pid, reason, > pq_mq_parallel_leader_backend_id); > } Not sure this would be better. > 14. > > + /* Failed to start a new parallel apply worker. */ > + if (winfo == NULL) > + return; > > There seem to be quite a lot of places (like this example) where > something may go wrong and the behaviour apparently will just silently > fall-back to using the non-parallel streaming. Maybe that is OK, but I > am just wondering how can the user ever know this has happened? Maybe > the docs can mention that this could happen and give some description > of what processes users can look for (or some other strategy) so they > can just confirm that the parallel streaming is really working like > they assume it to be? I think user could refer to the view pg_stat_subscription to check if the parallel apply worker started. BTW, we have documented the case if no parallel worker are available. > 17. src/backend/replication/logical/applyparallelworker.c - > parallel_apply_free_worker > > +/* > + * Remove the parallel apply worker entry from the hash table. And stop the > + * worker if there are enough workers in the pool. > + */ > +void > +parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId > xid) > > I think the reason for doing the "enough workers in the pool" logic > needs some more explanation. Because the process is always running, So stop it to reduce waste of resources. > 19. src/backend/replication/logical/applyparallelworker.c - > LogicalParallelApplyLoop > > + ApplyMessageContext = AllocSetContextCreate(ApplyContext, > + "ApplyMessageContext", > + ALLOCSET_DEFAULT_SIZES); > > Should the name of this context be "ParallelApplyMessageContext"? I think it is okay to use "ApplyMessageContext" here just like "ApplyContext". I will change this if more people have the same idea as you. > 20. src/backend/replication/logical/applyparallelworker.c - > HandleParallelApplyMessage > > + default: > + { > + elog(ERROR, "unrecognized message type received from parallel apply > worker: %c (message length %d bytes)", > + msgtype, msg->len); > + } > > "received from" -> "received by" > > ~~~ > > > 21. src/backend/replication/logical/applyparallelworker.c - > HandleParallelApplyMessages > > +/* > + * Handle any queued protocol messages received from parallel apply workers. > + */ > +void > +HandleParallelApplyMessages(void) > > 21a. > "received from" -> "received by" > > ~ > > 21b. > I wonder if this comment should give some credit to the function in > parallel.c - because this seems almost a copy of all that code. Since the message is from parallel apply worker to main apply worker, I think "from" looks a little better. > 27. src/backend/replication/logical/launcher.c - logicalrep_worker_detach > > + /* > + * This is the leader apply worker; stop all the parallel apply workers > + * previously started from here. > + */ > + if (!isParallelApplyWorker(MyLogicalRepWorker)) > > 27a. > The comment does not match the code. If this *is* the leader apply > worker then why do we have the condition to check that? > > Maybe only needs a comment update like > > SUGGESTION > If this is the leader apply worker then stop all the parallel... > > ~ > > 27b. > Code seems also assuming it cannot be a tablesync worker but it is not > checking that. I am wondering if it will be better to have yet another > macro/inline to do isLeaderApplyWorker() that will make sure this > really is the leader apply worker. (This review comment suggestion is > repeated later below). =>27a. Improved as suggested. =>27b. Changed the if-statement to `if (!am_parallel_apply_worker() && !am_tablesync_worker())`. > 42. src/backend/replication/logical/worker.c - InitializeApplyWorker > > +/* > + * Initialize the database connection, in-memory subscription and necessary > + * config options. > + */ > > I still think this should mention that this is common initialization > code for "both leader apply workers, and parallel apply workers" I'm not sure about this. I will change this if more people have the same idea as you. > 44. src/backend/replication/logical/worker.c - IsLogicalParallelApplyWorker > > +/* > + * Is current process a logical replication parallel apply worker? > + */ > +bool > +IsLogicalParallelApplyWorker(void) > +{ > + return am_parallel_apply_worker(); > +} > + > > It seems a bit strange to have this function > IsLogicalParallelApplyWorker, and also am_parallel_apply_worker() > which are basically identical except one of them is static and one is > not. > > I wonder if there should be just one function. And if you really do > need 2 names for consistency then you can just define a synonym like > > #define am_parallel_apply_worker IsLogicalParallelApplyWorker I am not sure whether this will be better. But I can change this if more people prefer. > 49. src/include/replication/worker_internal.h > > @@ -60,6 +64,12 @@ typedef struct LogicalRepWorker > */ > FileSet *stream_fileset; > > + /* > + * PID of leader apply worker if this slot is used for a parallel apply > + * worker, InvalidPid otherwise. > + */ > + pid_t apply_leader_pid; > + > /* Stats. */ > XLogRecPtr last_lsn; > TimestampTz last_send_time; > Whitespace indent of the new member ok? I will run pgindent later. The rest of the comments are changed as suggested. The new patches were attached in [1]. [1] - https://www.postgresql.org/message-id/OS3PR01MB6275F145878B4A44586C46CE9E499%40OS3PR01MB6275.jpnprd01.prod.outlook.com Regards, Wang wei