On Thu, Aug 4, 2022 at 12:10 PM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > On Mon, Jul 25, 2022 at 21:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > Few comments on 0001: > > ====================== > > Thanks for your comments. >
Review comments on v20-0001-Perform-streaming-logical-transactions-by-backgr =============================================================== 1. + <para> + If set to <literal>on</literal>, the incoming changes are written to + temporary files and then applied only after the transaction is + committed on the publisher. It is not very clear that the transaction is applied when the commit is received by the subscriber. Can we slightly change it to: "If set to <literal>on</literal>, the incoming changes are written to temporary files and then applied only after the transaction is committed on the publisher and received by the subscriber." 2. /* First time through, initialize apply workers hashtable */ + if (ApplyBgworkersHash == NULL) + { + HASHCTL ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(TransactionId); + ctl.entrysize = sizeof(ApplyBgworkerEntry); + ctl.hcxt = ApplyContext; + + ApplyBgworkersHash = hash_create("logical apply workers hash", 8, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); I think it would be better if we start with probably 16 element hash table, 8 seems to be on the lower side. 3. +/* + * Try to look up worker assigned before (see function apply_bgworker_get_free) + * inside ApplyBgworkersHash for requested xid. + */ +ApplyBgworkerState * +apply_bgworker_find(TransactionId xid) The above comment is not very clear. There doesn't seem to be any function named apply_bgworker_get_free in the patch. Can we write this comment as: "Find the previously assigned worker for the given transaction, if any." 4. /* + * Push apply error context callback. Fields will be filled applying a + * change. + */ /Fields will be filled applying a change./Fields will be filled while applying a change. 5. +void +ApplyBgworkerMain(Datum main_arg) +{ ... ... + StartTransactionCommand(); + oldcontext = MemoryContextSwitchTo(ApplyContext); + + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + if (!MySubscription) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription %u will not " + "start because the subscription was removed during startup", + MyLogicalRepWorker->subid))); + proc_exit(0); + } + + MySubscriptionValid = true; + MemoryContextSwitchTo(oldcontext); + + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, + subscription_change_cb, + (Datum) 0); + + CommitTransactionCommand(); ... This part appears of the code appears to be the same as we have in ApplyWorkerMain() except that the patch doesn't check whether the subscription is enabled. Is there a reason to not have that check here as well? Then in ApplyWorkerMain(), we do LOG the type of worker that is also missing here. Unless there is a specific reason to have a different code here, we should move this part to a common function and call it both from ApplyWorkerMain() and ApplyBgworkerMain(). 6. I think the code in ApplyBgworkerMain() to set session_replication_role, search_path, and connect to the database also appears to be the same in ApplyWorkerMain(). If so, that can also be moved to the common function mentioned in the previous point. 7. I think we need to register for subscription rel map invalidation (invalidate_syncing_table_states) in ApplyBgworkerMain similar to ApplyWorkerMain. The reason is that we check the table state after processing a commit or similar change record via a call to process_syncing_tables. 8. In apply_bgworker_setup_dsm(), we should have handling related to dsm_create failure due to max_segments reached as we have in InitializeParallelDSM(). We can follow the regular path of streaming transactions in case we are not able to create DSM instead of parallelizing it. 9. + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(ApplyBgworkerShared)); + shm_toc_estimate_chunk(&e, (Size) queue_size); + + shm_toc_estimate_keys(&e, 1 + 1); Here, you can directly write 2 instead of (1 + 1) stuff. It is quite clear that we need two keys here. 10. apply_bgworker_wait_for() { ... + /* Wait to be signalled. */ + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, + WAIT_EVENT_LOGICAL_APPLY_BGWORKER_STATE_CHANGE); ... } Typecast with the void, if we don't care for the return value. 11. +static void +apply_bgworker_shutdown(int code, Datum arg) +{ + SpinLockAcquire(&MyParallelShared->mutex); + MyParallelShared->status = APPLY_BGWORKER_EXIT; + SpinLockRelease(&MyParallelShared->mutex); Is there a reason to not use apply_bgworker_set_status() directly? 12. + * Special case is if the first change comes from subtransaction, then + * we check that current_xid differs from stream_xid. + */ +void +apply_bgworker_subxact_info_add(TransactionId current_xid) +{ + if (current_xid != stream_xid && + !list_member_int(subxactlist, (int) current_xid)) ... ... I don't understand the above comment. Does that mean we don't need to define a savepoint if the first change is from a subtransaction? Also, keep an empty line before the above comment. 13. +void +apply_bgworker_subxact_info_add(TransactionId current_xid) +{ + if (current_xid != stream_xid && + !list_member_int(subxactlist, (int) current_xid)) + { + MemoryContext oldctx; + char spname[MAXPGPATH]; + + snprintf(spname, MAXPGPATH, "savepoint_for_xid_%u", current_xid); To uniquely generate the savepoint name, it is better to append the subscription id as well? Something like pg_sp_<subid>_<xid>. 14. The CommitTransactionCommand() call in apply_bgworker_subxact_info_add looks a bit odd as that function neither seems to be starting the transaction command nor has any comments explaining it. Shall we do it in caller where it is more apparent to do the same? 15. else snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u", subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); Spurious new line 16. @@ -1153,7 +1162,14 @@ replorigin_session_setup(RepOriginId node) Assert(session_replication_state->roident != InvalidRepOriginId); - session_replication_state->acquired_by = MyProcPid; + if (must_acquire) + session_replication_state->acquired_by = MyProcPid; + else if (session_replication_state->acquired_by == 0) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("apply background worker could not find replication state slot for replication origin with OID %u", + node), + errdetail("There is no replication state slot set by its main apply worker."))); It is not a good idea to give apply workers specific messages from this API because I don't think we can assume this is used by only apply workers. It seems to me that if 'must_acquire' is false, then we should either give elog(ERROR, ..) or there should be an Assert for the same. I am not completely sure but maybe we can request the caller to supply the PID (which already has acquired this origin) in case must_acquire is false and then use it in Assert/elog to ensure the correct usage of API. What do you think? 17. The commit message can explain the abort-related new information this patch sends to the subscribers. 18. + * In streaming case (receiving a block of streamed transaction), for + * SUBSTREAM_ON mode, simply redirect it to a file for the proper toplevel + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to apply + * background workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes + * will also be applied in main apply worker). In this, part of the comment "(LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes will also be applied in main apply worker)" is not very clear. Do you mean to say that these messages are applied by both main and background apply workers, if so, then please state the same explicitly? 19. - /* not in streaming mode */ - if (!in_streamed_transaction) + /* Not in streaming mode */ + if (!(in_streamed_transaction || am_apply_bgworker())) ... ... - /* write the change to the current file */ + /* Write the change to the current file */ stream_write_change(action, s); I don't see the need to change the above comments. 20. static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ... ... + if (am_apply_bgworker()) + { + /* Define a savepoint for a subxact if needed. */ + apply_bgworker_subxact_info_add(current_xid); + + return false; + } + + if (apply_bgworker_active()) Isn't it better to use else if in the above code and probably else for the remaining part of code in this function? -- With Regards, Amit Kapila.