On Wed, Jul 27, 2022 at 16:03 PM Peter Smith <smithpb2...@gmail.com> wrote: > Here are some review comments for the patch v19-0004:
Thanks for your kindly review and comments. To avoid making this thread too long, I will reply to all of your comments (0001-patch ~ 0004-patch) in this email. In addition, in order not to confuse the replies, I added the following serial number above your comments on 0004-patch: ``` 4.2 && 4.3 4.4 4.5 ``` > 1.6 src/backend/replication/logical/applybgworker.c - LogicalApplyBgwLoop > > +/* Apply Background Worker main loop */ > +static void > +LogicalApplyBgwLoop(shm_mq_handle *mqh, volatile ApplyBgworkerShared > *shared) > > 'shared' seems a very vague param name. Maybe can be 'bgw_shared' or > 'parallel_shared' or something better? > > ~~~ > > 1.7 src/backend/replication/logical/applybgworker.c - ApplyBgworkerMain > > +/* > + * Apply Background Worker entry point > + */ > +void > +ApplyBgworkerMain(Datum main_arg) > +{ > + volatile ApplyBgworkerShared *shared; > > 'shared' seems a very vague var name. Maybe can be 'bgw_shared' or > 'parallel_shared' or something better? > > ~~~ > > 1.8 src/backend/replication/logical/applybgworker.c - > apply_bgworker_setup_dsm > > +static void > +apply_bgworker_setup_dsm(ApplyBgworkerState *wstate) > +{ > + shm_toc_estimator e; > + Size segsize; > + dsm_segment *seg; > + shm_toc *toc; > + ApplyBgworkerShared *shared; > + shm_mq *mq; > > 'shared' seems a very vague var name. Maybe can be 'bgw_shared' or > 'parallel_shared' or something better? > > ~~~ Not sure about this. > 3.3 .../replication/logical/applybgworker.c > > @@ -800,3 +800,47 @@ apply_bgworker_subxact_info_add(TransactionId > current_xid) > MemoryContextSwitchTo(oldctx); > } > } > + > +/* > + * Check if changes on this relation can be applied by an apply background > + * worker. > + * > + * Although the commit order is maintained only allowing one process to > commit > + * at a time, the access order to the relation has changed. This could cause > + * unexpected problems if the unique column on the replicated table is > + * inconsistent with the publisher-side or contains non-immutable functions > + * when applying transactions in the apply background worker. > + */ > +void > +apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) > > "only allowing" -> "by only allowing" (I think you mean this, right?) Since I'm not a native English speaker, I'm not quite sure which of the two descriptions you suggested is better. See #3.4 in [1]. Now I overwrite your last suggestion with your suggestion this time. > 3.4 > > + /* > + * Return if changes on this relation can be applied by an apply background > + * worker. > + */ > + if (rel->parallel_apply == PARALLEL_APPLY_SAFE) > + return; > + > + /* We are in error mode and should give user correct error. */ > + ereport(ERROR, > + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("cannot replicate target relation \"%s.%s\" using " > + "subscription parameter streaming=parallel", > + rel->remoterel.nspname, rel->remoterel.relname), > + errdetail("The unique column on subscriber is not the unique " > + "column on publisher or there is at least one " > + "non-immutable function."), > + errhint("Please change to use subscription parameter " > + "streaming=on."))); > > 3.4a. > Of course, the code should give the user the "correct error" if there > is an error (!), but having a comment explicitly saying so does not > serve any purpose. > > 3.4b. > The logic might be simplified if it was written differently like: > > + if (rel->parallel_apply != PARALLEL_APPLY_SAFE) > + ereport(ERROR, ... Just to keep the style consistent with the function apply_bgworker_relation_check. > 3.8 > > + /* Initialize the flag. */ > + entry->parallel_apply = PARALLEL_APPLY_SAFE; > > I previously suggested [1] (#3.6b) to move this. Consider, that you > cannot logically flag the entry as "safe" until you are certain that > it is safe. And you cannot be sure of that until you've passed all the > checks this function is doing. Therefore IMO the assignment to > PARALLEL_APPLY_SAFE should be the last line of the function. Not sure about this. > 3.11 src/backend/utils/cache/typcache.c - GetDomainConstraints > > @@ -2540,6 +2540,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, > Oid arg1, Oid arg2) > return 0; > } > > +/* > + * GetDomainConstraints --- get DomainConstraintState list of > specified domain type > + */ > +List * > +GetDomainConstraints(Oid type_id) > +{ > + TypeCacheEntry *typentry; > + List *constraints = NIL; > + > + typentry = lookup_type_cache(type_id, > TYPECACHE_DOMAIN_CONSTR_INFO); > + > + if(typentry->domainData != NULL) > + constraints = typentry->domainData->constraints; > + > + return constraints; > +} > > This function can be simplified (if you want). e.g. > > List * > GetDomainConstraints(Oid type_id) > { > TypeCacheEntry *typentry; > > typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); > > return typentry->domainData ? typentry->domainData->constraints : NIL; > } I just think the former one looks clearer. 4.2 && 4.3 > 2. src/backend/replication/logical/worker.c - start_table_sync > > @@ -3902,20 +3925,28 @@ start_table_sync(XLogRecPtr *origin_startpos, > char **myslotname) > } > PG_CATCH(); > { > + /* > + * Emit the error message, and recover from the error state to an idle > + * state > + */ > + HOLD_INTERRUPTS(); > + > + EmitErrorReport(); > + AbortOutOfAnyTransaction(); > + FlushErrorState(); > + > + RESUME_INTERRUPTS(); > + > + /* Report the worker failed during table synchronization */ > + pgstat_report_subscription_error(MySubscription->oid, false); > + > + /* Set the retry flag. */ > + set_subscription_retry(true); > + > if (MySubscription->disableonerr) > DisableSubscriptionAndExit(); > - else > - { > - /* > - * Report the worker failed during table synchronization. Abort > - * the current transaction so that the stats message is sent in an > - * idle state. > - */ > - AbortOutOfAnyTransaction(); > - pgstat_report_subscription_error(MySubscription->oid, false); > > - PG_RE_THROW(); > - } > + proc_exit(0); > } > > But is it correct to set the 'retry' flag even if the > MySubscription->disableonerr is true? Won’t that mean even after the > user corrects the problem and then re-enabled the subscription it > still won't let the streaming=parallel work, because that retry flag > is set? > > Also, Something seems wrong to me here - IIUC the patch changed this > code because of the potential risk of an error within the > set_subscription_retry function, but now if such an error happens the > current code will bypass even getting to DisableSubscriptionAndExit, > so the subscription won't have a chance to get disabled as the user > might have wanted. > 3. src/backend/replication/logical/worker.c - start_apply > > @@ -3940,20 +3971,27 @@ start_apply(XLogRecPtr origin_startpos) > } > PG_CATCH(); > { > + /* > + * Emit the error message, and recover from the error state to an idle > + * state > + */ > + HOLD_INTERRUPTS(); > + > + EmitErrorReport(); > + AbortOutOfAnyTransaction(); > + FlushErrorState(); > + > + RESUME_INTERRUPTS(); > + > + /* Report the worker failed while applying changes */ > + pgstat_report_subscription_error(MySubscription->oid, > + !am_tablesync_worker()); > + > + /* Set the retry flag. */ > + set_subscription_retry(true); > + > if (MySubscription->disableonerr) > DisableSubscriptionAndExit(); > - else > - { > - /* > - * Report the worker failed while applying changes. Abort the > - * current transaction so that the stats message is sent in an > - * idle state. > - */ > - AbortOutOfAnyTransaction(); > - pgstat_report_subscription_error(MySubscription- > >oid, !am_tablesync_worker()); > - > - PG_RE_THROW(); > - } > } > > (Same as previous review comment #2) > > But is it correct to set the 'retry' flag even if the > MySubscription->disableonerr is true? Won’t that mean even after the > user corrects the problem and then re-enabled the subscription it > still won't let the streaming=parallel work, because that retry flag > is set? > > Also, Something seems wrong to me here - IIUC the patch changed this > code because of the potential risk of an error within the > set_subscription_retry function, but now if such an error happens the > current code will bypass even getting to DisableSubscriptionAndExit, > so the subscription won't have a chance to get disabled as the user > might have wanted. =>4.2.a =>4.3.a I think this is the expected behavior. =>4.2.b =>4.3.b Fixed this point. (Invoke function set_subscription_retry after handling the "disableonerr" parameter.) 4.4 > 4. src/backend/replication/logical/worker.c - DisableSubscriptionAndExit > > /* > - * After error recovery, disable the subscription in a new transaction > - * and exit cleanly. > + * Disable the subscription in a new transaction. > */ > static void > DisableSubscriptionAndExit(void) > { > - /* > - * Emit the error message, and recover from the error state to an idle > - * state > - */ > - HOLD_INTERRUPTS(); > - > - EmitErrorReport(); > - AbortOutOfAnyTransaction(); > - FlushErrorState(); > - > - RESUME_INTERRUPTS(); > - > - /* Report the worker failed during either table synchronization or apply */ > - pgstat_report_subscription_error(MyLogicalRepWorker->subid, > - !am_tablesync_worker()); > - > /* Disable the subscription */ > StartTransactionCommand(); > DisableSubscription(MySubscription->oid); > @@ -4231,8 +4252,6 @@ DisableSubscriptionAndExit(void) > ereport(LOG, > errmsg("logical replication subscription \"%s\" has been disabled > due to an error", > MySubscription->name)); > - > - proc_exit(0); > } > > 4a. > Hmm, I think it is a bad idea to remove the "exiting" code from the > function but still leave the function name the same as before saying > "AndExit". > > 4b. > Also, now the patch is unconditionally doing proc_exit(0) in the > calling code where previously it would do PG_RE_THROW. So it's a > subtle difference from the path the code used to take for worker > errors.. =>4.a Fixed as suggested. =>4.b I think function PG_RE_THROW will try to report the error and go away (see function StartBackgroundWorker). So I think that since the error has been reported at the beginning, it is fine to invoke function proc_exit to go away at the end. 4.5 > 5. src/backend/replication/logical/worker.c - set_subscription_retry > > @@ -4467,3 +4486,63 @@ reset_apply_error_context_info(void) > apply_error_callback_arg.remote_attnum = -1; > set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); > } > + > +/* > + * Set subretry of pg_subscription catalog. > + * > + * If retry is true, subscriber is about to exit with an error. Otherwise, it > + * means that the transaction was applied successfully. > + */ > +static void > +set_subscription_retry(bool retry) > +{ > + Relation rel; > + HeapTuple tup; > + bool started_tx = false; > + bool nulls[Natts_pg_subscription]; > + bool replaces[Natts_pg_subscription]; > + Datum values[Natts_pg_subscription]; > + > + if (MySubscription->retry == retry || > + am_apply_bgworker()) > + return; > > Currently, I think this new 'subretry' field is only used to decide > whether a retry can use an apply background worker or not. I think all > this logic is *only* used when streaming=parallel. But AFAICT the > logic for setting/clearing the retry flag is executed *always* > regardless of the streaming mode. > > So for all the times when the user did not ask for streaming=parallel > doesn't this just cause unnecessary overhead for every transaction? I think it is fine. Because for one transaction, only the first time the transaction is applied with failure and the first time it is successfully retried, the catalog pg_subscription will be really modified. The rest of the comments are improved as suggested. The new patches were attached in [2]. [1] - https://www.postgresql.org/message-id/CAHut%2BPtRNAOwFtBp_TnDWdC7UpcTxPJzQnrm%3DNytN7cVBt5zRQ%40mail.gmail.com [2] - https://www.postgresql.org/message-id/OS3PR01MB6275D64BE7726B0221B15F389E9F9%40OS3PR01MB6275.jpnprd01.prod.outlook.com Regards, Wang wei