On Mon, Dec 26, 2022 at 6:33 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > --- > + if (!pa_can_start(xid)) > + return; > + > + /* First time through, initialize parallel apply worker state > hashtable. */ > + if (!ParallelApplyTxnHash) > + { > + HASHCTL ctl; > + > + MemSet(&ctl, 0, sizeof(ctl)); > + ctl.keysize = sizeof(TransactionId); > + ctl.entrysize = sizeof(ParallelApplyWorkerEntry); > + ctl.hcxt = ApplyContext; > + > + ParallelApplyTxnHash = hash_create("logical > replication parallel apply workershash", > + > 16, &ctl, > + > HASH_ELEM |HASH_BLOBS | HASH_CONTEXT); > + } > + > + /* > + * It's necessary to reread the subscription information > before assigning > + * the transaction to a parallel apply worker. Otherwise, the > leader may > + * not be able to reread the subscription information if streaming > + * transactions keep coming and are handled by parallel apply > workers. > + */ > + maybe_reread_subscription(); > > pa_can_start() checks if the skiplsn is an invalid xid or not, and > then maybe_reread_subscription() could update the skiplsn to a valid > value. As the comments in pa_can_start() says, it won't work. I think > we should call maybe_reread_subscription() in > apply_handle_stream_start() before calling pa_allocate_worker(). >
But I think a similar thing can happen when we start the worker and then before the transaction ends, we do maybe_reread_subscription(). I think we should try to call maybe_reread_subscription() when we are reasonably sure that we are going to enter parallel mode, otherwise, anyway, it will be later called by the leader worker. -- With Regards, Amit Kapila.