On Monday, November 21, 2022 2:26 PM Peter Smith <smithpb2...@gmail.com> wrote: > On Fri, Nov 18, 2022 at 6:03 PM Peter Smith <smithpb2...@gmail.com> > wrote: > > > > Here are some review comments for v47-0001 > > > > (This review is a WIP - I will post more comments for this patch next > > week) > > > > Here are the rest of my comments for v47-0001
Thanks for the comments! > ====== > > doc/src/sgml/monitoring. > > 1. > > @@ -1851,6 +1851,11 @@ postgres 27093 0.0 0.0 30096 2752 ? > Ss 11:34 0:00 postgres: ser > <entry>Waiting to acquire an advisory user lock.</entry> > </row> > <row> > + <entry><literal>applytransaction</literal></entry> > + <entry>Waiting to acquire acquire a lock on a remote transaction being > + applied on the subscriber side.</entry> > + </row> > + <row> > > 1a. > Typo "acquire acquire" Fixed. > ~ > > 1b. > Maybe "on the subscriber side" does not mean much without any context. > Maybe better to word it as below. > > SUGGESTION > Waiting to acquire a lock on a remote transaction being applied by a logical > replication subscriber. Changed. > ====== > > doc/src/sgml/system-views.sgml > > 2. > > @@ -1361,8 +1361,9 @@ > <literal>virtualxid</literal>, > <literal>spectoken</literal>, > <literal>object</literal>, > - <literal>userlock</literal>, or > - <literal>advisory</literal>. > + <literal>userlock</literal>, > + <literal>advisory</literal> or > + <literal>applytransaction</literal>. > > This change removed the Oxford comma that was there before. I assume it was > unintended. Changed. > ====== > > .../replication/logical/applyparallelworker.c > > 3. globals > > The parallel_apply_XXX functions were all shortened to pa_XXX. > > I wondered if the same simplification should be done also to the global > statics... > > e.g. > ParallelApplyWorkersHash -> PAWorkerHash ParallelApplyWorkersList -> > PAWorkerList ParallelApplyMessagePending -> PAMessagePending etc... I personally feel these names looks fine to me. > ~~~ > > 4. pa_get_free_worker > > + foreach(lc, active_workers) > + { > + ParallelApplyWorkerInfo *winfo = NULL; > + > + winfo = (ParallelApplyWorkerInfo *) lfirst(lc); > > No need to assign NULL because the next line just overwrites that anyhow. Changed. > ~ > > 5. > > + /* > + * Try to free the worker first, because we don't wait for the rollback > + * command to finish so the worker may not be freed at the end of the > + * transaction. > + */ > + if (pa_free_worker(winfo, winfo->shared->xid)) continue; > + > + if (!winfo->in_use) > + return winfo; > > Shouldn't the (!winfo->in_use) check be done first as well -- e.g. why are we > trying to free a worker which is maybe not even in_use? > > SUGGESTION (this will need some comment to explain what it is doing) if > (!winfo->in_use || !pa_free_worker(winfo, winfo->shared->xid) && > !winfo->in_use) > return winfo; Since the pa_free_worker will check the in_use flag as well and the current style looks clean to me. So I didn't change this. But it seems we need to first call pa_free_worker for every worker and then choose a free a free, otherwise a stopped worker info(shared memory or ...) might be left for a long time. I will think about this and try to fix it in next version. > ~~~ > > 6. pa_free_worker > > +/* > + * Remove the parallel apply worker entry from the hash table. Stop the > +work if > + * there are enough workers in the pool. > + * > > Typo? "work" -> "worker" > Fixed. > > 7. > > + /* Are there enough workers in the pool? */ if (napplyworkers > > + (max_parallel_apply_workers_per_subscription / 2)) { > > IMO that comment should be something more like "Don't detach/stop the > worker unless..." > Improved. > > 8. pa_send_data > > + /* > + * Retry after 1s to reduce the cost of getting the system time and > + * calculating the time difference. > + */ > + (void) WaitLatch(MyLatch, > + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L, > + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); > > 8a. > I am not sure you need to explain the reason in the comment. Just saying "Wait > before retrying." seems sufficient to me. Changed. > ~ > > 8b. > Instead of the hardwired "1s" in the comment, and 1000L in the code, maybe > better to just have another constant. > > SUGGESTION > #define SHM_SEND_RETRY_INTERVAL_MS 1000 > #define SHM_SEND_TIMEOUT_MS 10000 Changed. > ~ > > 9. > > + if (startTime == 0) > + startTime = GetCurrentTimestamp(); > + else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), > > IMO the initial startTime should be at top of the function otherwise the > timeout > calculation seems wrong. Setting startTime at beginning will bring unnecessary cost if we don't need to retry. And start counting from the first failure looks fine to me. > ====== > > src/backend/replication/logical/worker.c > > 10. handle_streamed_transaction > > + * In streaming case (receiving a block of streamed transaction), for > + * SUBSTREAM_ON mode, simply redirect it to a file for the proper > + toplevel > + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to > + parallel > + * apply workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE > + changes > + * will be applied by both leader apply worker and parallel apply workers). > > I'm not sure this function comment should be referring to SUBSTREAM_ON > and SUBSTREAM_PARALLEL because the function body does not use those > anywhere in the logic. Improved. > ~~~ > > 11. apply_handle_stream_start > > + /* > + * Increment the number of messages waiting to be processed by > + * parallel apply worker. > + */ > + pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1); > + > > The &() parens are not needed. Just write > &winfo->shared->pending_message_count. > > Also, search/replace others like this -- there are a few of them. Changed. > ~~~ > > 12. apply_handle_stream_stop > > + if (!abort_toplevel_transaction && > + pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_message_count), > 1) > + == 0) { pa_lock_stream(MyParallelShared->xid, AccessShareLock); > + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); } > > That lock/unlock seems like it is done just as a way of testing/waiting for an > exclusive lock held on the xid to be released. > But the code is too tricky -- IMO it needs a big comment saying how this trick > works, or maybe better to have a wrapper function for this for clarity. e.g. > pa_wait_nolock_stream(xid); (or some better name) I think the comments atop applyparallelworker.c explained the usage of stream/transaction lock. ``` ... * In order for lmgr to detect this, we have LA acquire a session lock on the * remote transaction (by pa_lock_stream()) and have PA wait on the lock before * trying to receive messages. In other words, LA acquires the lock before * sending STREAM_STOP and releases it if already acquired before sending * STREAM_START, STREAM_ABORT(for toplevel transaction), STREAM_PREPARE and * STREAM_COMMIT. For PA, it always needs to acquire the lock after processing * STREAM_STOP and then release immediately after acquiring it. That way, when * PA is waiting for LA, we can have a wait-edge from PA to LA in lmgr, which * will make a deadlock in lmgr like: ... ``` > ~~~ > > 13. apply_handle_stream_abort > > + if (abort_toplevel_transaction) > + { > + (void) pa_free_worker(winfo, xid); > + } > > Unnecessary { } Removed. > ~~~ > > 14. maybe_reread_subscription > > @@ -3083,8 +3563,9 @@ maybe_reread_subscription(void) > if (!newsub) > { > ereport(LOG, > - (errmsg("logical replication apply worker for subscription \"%s\" will " > - "stop because the subscription was removed", > + /* translator: first %s is the name of logical replication worker */ > + (errmsg("%s for subscription \"%s\" will stop because the " > + "subscription was removed", get_worker_name(), > MySubscription->name))); > > proc_exit(0); > @@ -3094,8 +3575,9 @@ maybe_reread_subscription(void) > if (!newsub->enabled) > { > ereport(LOG, > - (errmsg("logical replication apply worker for subscription \"%s\" will " > - "stop because the subscription was disabled", > + /* translator: first %s is the name of logical replication worker */ > + (errmsg("%s for subscription \"%s\" will stop because the " > + "subscription was disabled", get_worker_name(), > MySubscription->name))); > > IMO better to avoid splitting the string literals over multiple line like > this. > > Please check the rest of the patch too -- there may be many more just like > this. Changed. > ~~~ > > 15. ApplyWorkerMain > > @@ -3726,7 +4236,7 @@ ApplyWorkerMain(Datum main_arg) > } > else > { > - /* This is main apply worker */ > + /* This is leader apply worker */ > RepOriginId originid; > "This is leader" -> "This is the leader" Changed. > ====== > > src/bin/psql/describe.c > > 16. describeSubscriptions > > + if (pset.sversion >= 160000) > + appendPQExpBuffer(&buf, > + ", (CASE substream\n" > + " WHEN 'f' THEN 'off'\n" > + " WHEN 't' THEN 'on'\n" > + " WHEN 'p' THEN 'parallel'\n" > + " END) AS \"%s\"\n", > + gettext_noop("Streaming")); > + else > + appendPQExpBuffer(&buf, > + ", substream AS \"%s\"\n", > + gettext_noop("Streaming")); > > I'm not sure it is an improvement to change the output "t/f/p" to > "on/off/parallel" > > IMO "t/f/parallel" would be better. Then the t/f is consistent with > - how it used to display, and > - all the other boolean fields I think the current style is consistent with the " Synchronous commit" parameter which also shows "on/off/remote_apply/...", so didn't change this. Name | ... | Synchronous commit ------+-----+------------------- sub | ... | on > ====== > > src/include/replication/worker_internal.h > > 17. ParallelTransState > > +/* > + * State of the transaction in parallel apply worker. > + * > + * These enum values are ordered by the order of transaction state > +changes in > + * parallel apply worker. > + */ > +typedef enum ParallelTransState > > "ordered by the order" ?? > > SUGGESTION > The enum values must have the same order as the transaction state transitions. Changed. > ====== > > src/include/storage/lock.h > > 18. > > @@ -149,10 +149,12 @@ typedef enum LockTagType > LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */ > LOCKTAG_OBJECT, /* non-relation database object */ > LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */ > - LOCKTAG_ADVISORY /* advisory user locks */ > + LOCKTAG_ADVISORY, /* advisory user locks */ > LOCKTAG_APPLY_TRANSACTION > + /* transaction being applied on the subscriber > + * side */ > } LockTagType; > > -#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY > +#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION > > extern PGDLLIMPORT const char *const LockTagTypeNames[]; > > @@ -278,6 +280,17 @@ typedef struct LOCKTAG > (locktag).locktag_type = LOCKTAG_ADVISORY, \ > (locktag).locktag_lockmethodid = USER_LOCKMETHOD) > > +/* > + * ID info for a remote transaction on the subscriber side is: > + * DB OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID */ #define > +SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \ > + ((locktag).locktag_field1 = (dboid), \ > + (locktag).locktag_field2 = (suboid), \ > + (locktag).locktag_field3 = (xid), \ > + (locktag).locktag_field4 = (objid), \ > + (locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \ > +(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD) > > Maybe "on the subscriber side" (2 places above) has no meaning here because > there is no context this is talking about logical replication. > Maybe those comments need to say something more like "on a logical > replication subscriber" > Changed. I also addressed all the comments from [1] [1] https://www.postgresql.org/message-id/CAHut%2BPs7TzqqDnuH8r_ct1W_zSBCnuo3wodMt4Y8_Gw7rSRAaw%40mail.gmail.com Best regards, Hou zj