On Tue, Dec 27, 2022 at 11:28 AM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > On Mon, Dec 26, 2022 at 10:29 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > On Mon, Dec 26, 2022 at 6:33 PM Masahiko Sawada <sawada.m...@gmail.com> > > wrote: > > > > > > --- > > > + if (!pa_can_start(xid)) > > > + return; > > > + > > > + /* First time through, initialize parallel apply worker state > > > hashtable. */ > > > + if (!ParallelApplyTxnHash) > > > + { > > > + HASHCTL ctl; > > > + > > > + MemSet(&ctl, 0, sizeof(ctl)); > > > + ctl.keysize = sizeof(TransactionId); > > > + ctl.entrysize = sizeof(ParallelApplyWorkerEntry); > > > + ctl.hcxt = ApplyContext; > > > + > > > + ParallelApplyTxnHash = hash_create("logical > > > replication parallel apply workershash", > > > + > > > 16, &ctl, > > > + > > > HASH_ELEM |HASH_BLOBS | HASH_CONTEXT); > > > + } > > > + > > > + /* > > > + * It's necessary to reread the subscription information > > > before assigning > > > + * the transaction to a parallel apply worker. Otherwise, the > > > leader may > > > + * not be able to reread the subscription information if > > > streaming > > > + * transactions keep coming and are handled by parallel apply > > > workers. > > > + */ > > > + maybe_reread_subscription(); > > > > > > pa_can_start() checks if the skiplsn is an invalid xid or not, and > > > then maybe_reread_subscription() could update the skiplsn to a valid > > > value. As the comments in pa_can_start() says, it won't work. I think > > > we should call maybe_reread_subscription() in > > > apply_handle_stream_start() before calling pa_allocate_worker(). > > > > > > > But I think a similar thing can happen when we start the worker and > > then before the transaction ends, we do maybe_reread_subscription(). > > Where do we do maybe_reread_subscription() in this case? IIUC if the > leader sends all changes to the worker, there is no chance for the > leader to do maybe_reread_subscription except for when waiting for the > input.
Yes, this is the point where it can happen. IT can happen when there is some delay between different streaming chunks. > On reflection, adding maybe_reread_subscription() to > apply_handle_stream_start() adds one extra call of it so it's not > good. Alternatively, we can do that in pa_can_start() before checking > the skiplsn. I think we do a similar thing in AllTablesyncsRead() -- > update the information before the check if necessary. > > > I think we should try to call maybe_reread_subscription() when we are > > reasonably sure that we are going to enter parallel mode, otherwise, > > anyway, it will be later called by the leader worker. > > It isn't a big problem even if we update the skiplsn after launching a > worker since we will skip the transaction the next time. But it would > be more consistent with the current behavior. As I mentioned above, > doing it in pa_can_start() seems to be reasonable to me. What do you > think? > Okay, we can do it in pa_can_start but then let's do it before we check the parallel_apply flag as that can also be changed if the streaming mode is changed. Please see the changes in the attached patch which is atop the 0001 and 0002 patches. I have made a few comment improvements as well. -- With Regards, Amit Kapila.
v68-0001-changes_amit_1.patch
Description: Binary data