On Monday, September 26, 2022 6:58 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Mon, Sep 26, 2022 at 8:41 AM wangw.f...@fujitsu.com > <wangw.f...@fujitsu.com> wrote: > > > > On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit.kapil...@gmail.com> > wrote: > > > > > 3. > > > ApplyWorkerMain() > > > { > > > ... > > > ... > > > + > > > + if (server_version >= 160000 && > > > + MySubscription->stream == SUBSTREAM_PARALLEL) > > > + options.proto.logical.streaming = pstrdup("parallel"); > > > > > > After deciding here whether the parallel streaming mode is enabled > > > or not, we recheck the same thing in apply_handle_stream_abort() and > > > parallel_apply_can_start(). In parallel_apply_can_start(), we do it > > > via two different checks. How about storing this information say in > > > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at > > > other places? > > > > Improved as suggested. > > Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker. > > > > Can we name the variable in_parallel_apply as parallel_apply and set it in > logicalrep_worker_launch() instead of in ParallelApplyWorkerMain()?
Changed. > Few other comments: > ================== > 1. > + if (is_subworker && > + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) > + { > + LWLockRelease(LogicalRepWorkerLock); > + > + ereport(DEBUG1, > + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), > + errmsg("out of parallel apply workers"), errhint("You might need to > + increase > max_parallel_apply_workers_per_subscription."))); > > I think it is better to keep the level of this as LOG. Similar messages at > other > places use WARNING or LOG. Here, I prefer LOG because the system can still > proceed without blocking anything. Changed. > 2. > +/* Reset replication origin tracking. */ void > +parallel_apply_replorigin_reset(void) > +{ > + bool started_tx = false; > + > + /* This function might be called inside or outside of transaction. */ > + if (!IsTransactionState()) { StartTransactionCommand(); started_tx = > + true; } > > Why do we need a transaction in this function? I think we don't need it and removed this in the new version patch. > 3. Few suggestions to improve in the patch: > diff --git a/src/backend/replication/logical/worker.c > b/src/backend/replication/logical/worker.c > index 1623c9e2fa..d9c519dfab 100644 > --- a/src/backend/replication/logical/worker.c > +++ b/src/backend/replication/logical/worker.c > @@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s) > case TRANS_LEADER_SEND_TO_PARALLEL: > Assert(winfo); > > + /* > + * The origin can be active only in one process. See > + * apply_handle_stream_commit. > + */ > parallel_apply_replorigin_reset(); > > /* Send STREAM PREPARE message to the parallel apply worker. */ @@ > -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s) > (errcode(ERRCODE_PROTOCOL_VIOLATION), > errmsg_internal("STREAM ABORT message without STREAM STOP"))); > > - /* > - * Check whether the publisher sends abort_lsn and abort_time. > - * > - * Note that the parallel apply worker is only started when the publisher > - * sends abort_lsn and abort_time. > - */ > + /* We receive abort information only when we can apply in parallel. */ > if (MyLogicalRepWorker->in_parallel_apply) > read_abort_info = true; > > @@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s) > Assert(winfo); > > if (subxid == xid) > + { > + /* > + * The origin can be active only in one process. See > + * apply_handle_stream_commit. > + */ > parallel_apply_replorigin_reset(); > + } > > /* Send STREAM ABORT message to the parallel apply worker. */ > parallel_apply_send_data(winfo, s->len, s->data); @@ -1858,6 +1863,12 @@ > apply_handle_stream_commit(StringInfo s) > case TRANS_LEADER_SEND_TO_PARALLEL: > Assert(winfo); > > + /* > + * We need to reset the replication origin before sending the commit > + * message and set it up again after confirming that parallel worker > + * has processed the message. This is required because origin can be > + * active only in one process at-a-time. > + */ > parallel_apply_replorigin_reset(); > > /* Send STREAM COMMIT message to the parallel apply worker. */ diff --git > a/src/include/replication/worker_internal.h > b/src/include/replication/worker_internal.h > index 4cbfb43492..2bd9664f86 100644 > --- a/src/include/replication/worker_internal.h > +++ b/src/include/replication/worker_internal.h > @@ -70,11 +70,7 @@ typedef struct LogicalRepWorker > */ > pid_t apply_leader_pid; > > - /* > - * Indicates whether to use parallel apply workers. > - * > - * Determined based on streaming parameter and publisher version. > - */ > + /* Indicates whether apply can be performed parallelly. */ > bool in_parallel_apply; > Merged, thanks. Best regards, Hou zj