On Mon, Aug 29, 2022 at 5:01 PM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > On Thursday, August 25, 2022 7:33 PM Amit Kapila <amit.kapil...@gmail.com> > wrote: > > > > > 11. > > + /* > > + * Attach to the message queue. > > + */ > > + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false); > > + shm_mq_set_sender(mq, MyProc); > > + error_mqh = shm_mq_attach(mq, seg, NULL); > > + pq_redirect_to_shm_mq(seg, error_mqh); > > + > > + /* > > + * Now, we have initialized DSM. Attach to slot. > > + */ > > + logicalrep_worker_attach(worker_slot); > > + MyParallelShared->logicalrep_worker_generation = > > MyLogicalRepWorker->generation; > > + MyParallelShared->logicalrep_worker_slot_no = worker_slot; > > + > > + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, > > + InvalidBackendId); > > > > Is there a reason to set parallel_leader immediately after > > pq_redirect_to_shm_mq() as we are doing parallel.c? > > Moved the code. >
Sorry, if I was not clear but what I wanted was something like the below: diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 832e99cd48..6646e00658 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -480,6 +480,9 @@ ApplyParallelWorkerMain(Datum main_arg) mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false); shm_mq_set_sender(mq, MyProc); error_mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(seg, error_mqh); + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, + InvalidBackendId); /* * Primary initialization is complete. Now, we can attach to our slot. This @@ -490,10 +493,6 @@ ApplyParallelWorkerMain(Datum main_arg) MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; - pq_redirect_to_shm_mq(seg, error_mqh); - pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, - InvalidBackendId); - MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = 0; Few other comments on v25-0001* ============================ 1. + { + {"max_apply_parallel_workers_per_subscription", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum number of apply parallel workers per subscription."), + NULL, + }, + &max_apply_parallel_workers_per_subscription, Let's model this to max_parallel_workers_per_gather and name this max_parallel_apply_workers_per_subscription. +typedef struct ApplyParallelWorkerEntry +{ + TransactionId xid; /* Hash key -- must be first */ + ApplyParallelWorkerInfo *winfo; +} ApplyParallelWorkerEntry; + +/* Apply parallel workers hash table (initialized on first use). */ +static HTAB *ApplyParallelWorkersHash = NULL; +static List *ApplyParallelWorkersFreeList = NIL; +static List *ApplyParallelWorkersList = NIL; Similarly, for above let's name them as ParallelApply*. I think in comments/doc changes it is better to refer as parallel apply worker. we can keep filename as it is. 2. + * If there are enough apply parallel workers(reache half of the + * max_apply_parallel_workers_per_subscription) /reache/reached. There should be a space before (. 3. + * The dynamic shared memory segment will contain (1) a shm_mq that can be used + * to transport errors (and other messages reported via elog/ereport) from the + * apply parallel worker to leader apply worker (2) another shm_mq that can + * be used to transport changes in the transaction from leader apply worker to + * apply parallel worker (3) necessary information to be shared among apply + * parallel workers to leader apply worker I think it is better to use send instead of transport in above paragraph. In (3), /apply parallel workers to leader apply worker/apply parallel workers and leader apply worker 4. handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ... ... + else if (apply_action == TA_SEND_TO_PARALLEL_WORKER) + { + parallel_apply_send_data(winfo, s->len, s->data); It is better to have an Assert for winfo being non-null here and other similar usages. -- With Regards, Amit Kapila.