On Thu, May 15, 2025 at 6:02 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > Few minor comments on 0001: > 1. > + if (TimestampDifferenceExceeds(data->reply_time, > + data->candidate_xid_time, 0)) > + ereport(ERROR, > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("oldest_nonremovable_xid transaction ID may be advanced > prematurely"), > > Shouldn't this be elog as this is an internal message? And, instead of > "... may be ..", shall we use ".. could be .." in the error message as > the oldest_nonremovable_xid is not yet advanced by this time. > > 2. > + * It's necessary to use FullTransactionId here to mitigate potential race > + * conditions. Such scenarios might occur if the replication slot is not > + * yet created by the launcher while the apply worker has already > + * initialized this field. > > IIRC, we discussed why it isn't easy to close this race condition. Can > we capture that in comments separately? >
A few more comments: ================= 3. maybe_advance_nonremovable_xid(RetainConflictInfoData *data, bool status_received) { /* Exit early if retaining conflict information is not required */ if (!MySubscription->retainconflictinfo) return; /* * It is sufficient to manage non-removable transaction ID for a * subscription by the main apply worker to detect update_deleted conflict * even for table sync or parallel apply workers. */ if (!am_leader_apply_worker()) return; /* Exit early if we have already stopped retaining */ if (MyLogicalRepWorker->stop_conflict_info_retention) return; ... get_candidate_xid() { ... if (!TimestampDifferenceExceeds(data->candidate_xid_time, now, data->xid_advance_interval)) return; Would it be better to encapsulate all these preliminary checks that decide whether we can move to computing oldest_nonremovable_xid in a separate function? The check in get_candidate_xid would require some additional conditions because it is not required in every phase. Additionally, we can move the core phase processing logic to compute in a separate function. We can try this to see if the code looks better with such a refactoring. 4. + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + */ + if (FullTransactionIdPrecedesOrEquals(data->last_phase_at, + remote_full_xid)) + data->phase = RCI_WAIT_FOR_LOCAL_FLUSH; I think there is a possibility of optimization here for cases where there are no new transactions on the publisher side across the next cycle of advancement of oldest_nonremovable_xid. We can simply set candidate_xid as oldest_nonremovable_xid instead of going into RCI_WAIT_FOR_LOCAL_FLUSH phase. If you want to keep the code simple for the first version, then at least note that down in comments, but OTOH, if it is simple to achieve, then let's do it now. -- With Regards, Amit Kapila.