On Monday, November 7, 2022 7:43 PM Kuroda, Hayato/黒田 隼人 <kuroda.hay...@fujitsu.com> wrote: > > Dear Hou, > > The followings are my comments. I want to consider the patch more, but I sent > it once.
Thanks for the comments. > > === > worker.c > > 01. typedef enum TransApplyAction > > ``` > /* > * What action to take for the transaction. > * > * TRANS_LEADER_APPLY means that we are in the leader apply worker and > changes > * of the transaction are applied directly in the worker. > * > * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or > table > * sync worker. Changes are written to temporary files and then applied when > * the final commit arrives. > * > * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply > worker > * and need to send the changes to the parallel apply worker. > * > * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and > * changes of the transaction are applied directly in the worker. > */ > ``` > > TRANS_LEADER_PARTIAL_SERIALIZE should be listed in. > Added. > 02. handle_streamed_transaction() > > ``` > + StringInfoData origin_msg; > ... > + origin_msg = *s; > ... > + /* Write the change to the current file */ > + stream_write_change(action, > + > apply_action == TRANS_LEADER_SERIALIZE ? > + > + s : &origin_msg); > ``` > > I'm not sure why origin_msg is needed. Can we remove the conditional > operator? Currently, the parallel apply worker would need the transaction xid of this change to define savepoint. So, it need to write the original message to file. > > 03. apply_handle_stream_start() > > ``` > + * XXX We can avoid sending pairs of the START/STOP messages to the > + parallel > + * worker because unlike apply worker it will process only one > + transaction at a > + * time. However, it is not clear whether any optimization is > + worthwhile > + * because these messages are sent only when the > + logical_decoding_work_mem > + * threshold is exceeded. > ``` > > This comment should be modified because PA must acquire and release locks at > that time. > > > 04. apply_handle_stream_prepare() > > ``` > + /* > + * After sending the data to the parallel apply > worker, > wait for > + * that worker to finish. This is necessary to > maintain > commit > + * order which avoids failures due to transaction > dependencies and > + * deadlocks. > + */ > + > + parallel_apply_wait_for_xact_finish(winfo->shared); > ``` > > Here seems not to be correct. LA may not send data but spill changes to file. Changed. > 05. apply_handle_stream_commit() > > ``` > + if (apply_action == > TRANS_LEADER_PARTIAL_SERIALIZE) > + > + stream_cleanup_files(MyLogicalRepWorker->subid, xid); > ``` > > I'm not sure whether the stream files should be removed by LA or PAs. Could > you tell me the reason why you choose LA? I think the logic would be natural that only LA can write/delete/create the file and PA only need to read from it. > === > applyparallelworker.c > > 05. parallel_apply_can_start() > > ``` > + if (switching_to_serialize) > + return false; > ``` > > Could you add a comment like: > Don't start a new parallel apply worker if the leader apply worker has been > spilling changes to the disk temporarily. These codes have been removed. > 06. parallel_apply_start_worker() > > ``` > + /* > + * Set the xact_state flag in the leader instead of the > + * parallel apply worker to avoid the race condition where the leader > has > + * already started waiting for the parallel apply worker to finish > + * processing the transaction while the child process has not yet > + * processed the first STREAM_START and has not set the > + * xact_state to true. > + */ > ``` > > I thinkg the word "flag" should be used for boolean, so the comment should be > modified. > (There are so many such code-comments, all of them should be modified.) Changed. > > 07. parallel_apply_get_unique_id() > > ``` > +/* > + * Returns the unique id among all parallel apply workers in the subscriber. > + */ > +static uint16 > +parallel_apply_get_unique_id() > ``` > > I think this function is inefficient: the computational complexity will be > increased > linearly when the number of PAs is increased. I think the Bitmapset data > structure may be used. This function is removed. > 08. parallel_apply_send_data() > > ``` > #define CHANGES_THRESHOLD 1000 > #define SHM_SEND_TIMEOUT_MS 10000 > ``` > > I think the timeout may be too long. Could you tell me the background about > it? Serializing data to file would affect the performance, so I tried to make it difficult to happen unless the PA is really blocked by another PA or BA. > 09. parallel_apply_send_data() > > ``` > /* > * Close the stream file if not in a streaming block, > the > file will > * be reopened later. > */ > if (!stream_apply_worker) > serialize_stream_stop(winfo->shared->xid); > ``` > > a. > IIUC the timings when LA tries to send data but stream_apply_worker is NULL > are: > * apply_handle_stream_prepare, > * apply_handle_stream_start, > * apply_handle_stream_abort, and > * apply_handle_stream_commit. > And at that time the state of TransApplyAction may be > TRANS_LEADER_SEND_TO_PARALLEL. When should be close the file? Changed to use another condition to check. > b. > Even if this is needed, I think the name of the called function should be > modified. > Here LA may not handle STREAM_STOP message. close_stream_file() or > something? > > > 10. parallel_apply_send_data() > > ``` > /* Initialize the stream fileset. */ > serialize_stream_start(winfo->shared->xid, true); ``` > > I think the name of the called function should be modified. Here LA may not > handle STREAM_START message. open_stream_file() or something? > > 11. parallel_apply_send_data() > > ``` > if (++retry >= CHANGES_THRESHOLD) > { > MemoryContext oldcontext; > StringInfoData msg; > ... > initStringInfo(&msg); > appendBinaryStringInfo(&msg, data, nbytes); ... > switching_to_serialize = true; > apply_dispatch(&msg); > switching_to_serialize = false; > > break; > } > ``` > > pfree(msg.data) may be needed. > > === > 12. worker_internal.h > > ``` > + pg_atomic_uint32 left_message; > ``` > > > ParallelApplyWorkerShared has been already controlled by mutex locks. Why > did you add an atomic variable to the data structure? I personally feel this value is modified more frequently, so use an atomic variable here. > === > 13. typedefs.list > > ParallelTransState should be added. Added. > === > 14. General > > I have already said old about it directly, but I point it out to notify other > members > again. > I have caused a deadlock with two PAs. Indeed it could be solved by the lmgr, > but > the output seemed not to be kind. Followings were copied from the log and we > could see that commands executed by apply workers were not output. Can we > extend it, or is it the out of scope? > > > ``` > 2022-11-07 11:11:27.449 UTC [11262] ERROR: deadlock detected > 2022-11-07 11:11:27.449 UTC [11262] DETAIL: Process 11262 waits for > AccessExclusiveLock on object 16393 of class 6100 of database 0; blocked by > process 11320. > Process 11320 waits for ShareLock on transaction 742; blocked by > process 11266. > Process 11266 waits for AccessShareLock on object 16393 of class 6100 > of > database 0; blocked by process 11262. > Process 11262: <command string not enabled> > Process 11320: <command string not enabled> > Process 11266: <command string not enabled> ``` On HEAD, a apply worker could also cause a deadlock with a user backend. Like: Tx1 (backend) begin; insert into tbl1 values (100); Tx2 (replaying streaming transaction) begin; insert into tbl1 values (1); delete from tbl2; insert into tbl1 values (1); insert into tbl1 values (100); logical replication worker ERROR: deadlock detected logical replication worker DETAIL: Process 2158391 waits for ShareLock on transaction 749; blocked by process 2158410. Process 2158410 waits for ShareLock on transaction 750; blocked by process 2158391. Process 2158391: <command string not enabled> Process 2158410: insert into tbl1 values (1); So, it looks like the existing behavior. I agree that it would be better to show something, but maybe we can do that as a separate patch. Best regards, Hou zj