On Mon, Sep 12, 2022 at 18:58 PM Kuroda, Hayato/黒田 隼人 <kuroda.hay...@fujitsu.com> wrote: > Dear Hou-san, > > Thank you for updating the patch! Followings are comments for v28-0001. > I will dig your patch more, but I send partially to keep the activity of the > thread.
Thanks for your comments. > === > For applyparallelworker.c > > 01. filename > The word-ordering of filename seems not good > because you defined the new worker as "parallel apply worker". As the Amit said, keep it consistent with other file name format. > 02. global variable > > ``` > +/* Parallel apply workers hash table (initialized on first use). */ > +static HTAB *ParallelApplyWorkersHash = NULL; > + > +/* > + * List that stores the information of parallel apply workers that were > + * started. Newly added worker information will be removed from the list at > the > + * end of the transaction when there are enough workers in the pool. Besides, > + * exited workers will be removed from the list after being detected. > + */ > +static List *ParallelApplyWorkersList = NIL; > ``` > > Could you add descriptions about difference between the list and hash table? > IIUC the Hash stores the parallel workers that > are assigned to transacitons, and the list stores all alive ones. Did some modifications to the comments above ParallelApplyWorkersList. And I think we could know the difference between these two variables by referring to the functions parallel_apply_start_worker and parallel_apply_free_worker. > 03. parallel_apply_find_worker > > ``` > + /* Return the cached parallel apply worker if valid. */ > + if (stream_apply_worker != NULL) > + return stream_apply_worker; > ``` > > This is just a question - > Why the given xid and the assigned xid to the worker are not checked here? > Is there chance to find wrong worker? I think it is okay to not check the worker's xid here. Please refer to the comments above `stream_apply_worker`. "stream_apply_worker" will only be returned during a stream block, which means the xid is the same as the xid in the STREAM_START message. > 04. parallel_apply_start_worker > > ``` > +/* > + * Start a parallel apply worker that will be used for the specified xid. > + * > + * If a parallel apply worker is not in use then re-use it, otherwise start a > + * fresh one. Cache the worker information in ParallelApplyWorkersHash > keyed by > + * the specified xid. > + */ > +void > +parallel_apply_start_worker(TransactionId xid) > ``` > > "parallel_apply_start_worker" should be "start_parallel_apply_worker", I think For code readability, similar functions are named in this format: `parallel_apply_.*_worker`. > 05. parallel_apply_stream_abort > > ``` > for (i = list_length(subxactlist) - 1; i >= 0; i--) > { > xid = list_nth_xid(subxactlist, i); > if (xid == subxid) > { > found = true; > break; > } > } > ``` > > Please not reuse the xid, declare and use another variable in the else block > or > something. Added a temporary variable "xid_tmp" inside the for-statement. > 06. parallel_apply_free_worker > > ``` > + if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) > + { > ``` > > Please add a comment like: "Do we have enough workers in the pool?" or > something. Added the following comment according to your suggestion: `Are there enough workers in the pool?` > For worker.c > > 07. general > > In many lines if-else statement is used for apply_action, but I think they > should > rewrite as switch-case statement. Changed. > 08. global variable > > ``` > -static bool in_streamed_transaction = false; > +bool in_streamed_transaction = false; > ``` > > a. > > It seems that in_streamed_transaction is used only in the worker.c, so we can > change to stati variable. > > b. > > That flag is set only when an apply worker spill the transaction to the disk. > How about "in_streamed_transaction" -> "in_spilled_transaction"? =>8a. Improved. =>8b. I am not sure if we could rename this existing variable for this. So I kept the name. > 09. apply_handle_stream_prepare > > ``` > - elog(DEBUG1, "received prepare for streamed transaction %u", > prepare_data.xid); > ``` > > I think this debug message is still useful. Since I think it is not appropriate to log the xid here, added back the following message: `finished processing the transaction finish command`. > 10. apply_handle_stream_stop > > ``` > + if (apply_action == TA_APPLY_IN_PARALLEL_WORKER) > + { > + pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL); > + } > + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) > + { > ``` > > The ordering of the STREAM {STOP, START} is checked only when an apply > worker spill the transaction to the disk. > (This is done via in_streamed_transaction) > I think checks should be added here, like if (!stream_apply_worker) or > something. > > 11. apply_handle_stream_abort > > ``` > + if (in_streamed_transaction) > + ereport(ERROR, > + (errcode(ERRCODE_PROTOCOL_VIOLATION), > + errmsg_internal("STREAM ABORT message > without STREAM > STOP"))); > ``` > > I think the check by stream_apply_worker should be added. Because "in_streamed_transaction" is only used for non-parallel apply. So I used stream_apply_worker to confirm the ordering of the STREAM {STOP, START}. BTW, I move the reset of in_streamed_transaction into the block of `else if (apply_action == TA_SERIALIZE_TO_FILE)`. > 12. apply_handle_stream_commit > > a. > > ``` > if (in_streamed_transaction) > ereport(ERROR, > (errcode(ERRCODE_PROTOCOL_VIOLATION), > errmsg_internal("STREAM COMMIT message > without STREAM STOP"))); > ``` > > I think the check by stream_apply_worker should be added. > > b. > > ``` > - elog(DEBUG1, "received commit for streamed transaction %u", xid); > ``` > > I think this debug message is still useful. =>12a. See the reply to #10 && #11. =>12b. See the reply to #09. > === > For launcher.c > > 13. logicalrep_worker_stop_by_slot > > ``` > + LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no]; > + > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); > + > + /* Return if the generation doesn't match or the worker is not alive. > */ > + if (worker->generation != generation || > + worker->proc == NULL) > + return; > + > ``` > > a. > > LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots. > > b. > > LWLockRelease(LogicalRepWorkerLock) is needed even if worker is not found. Fixed. The new patches were attached in [1]. [1] - https://www.postgresql.org/message-id/OS3PR01MB6275F145878B4A44586C46CE9E499%40OS3PR01MB6275.jpnprd01.prod.outlook.com Regards, Wang wei