On Wed, Dec 11, 2024 at 2:32 PM Zhijie Hou (Fujitsu) <houzj.f...@fujitsu.com> wrote: > > Attach the V16 patch set which addressed above comments. >
Review Comments ================ 1. +/* + * Workhorse for the RCI_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainConflictInfoData *data) +{ ... + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. In this case, confirming the apply and flush + * progress across all table sync workers is complex and not worth the + * effort. + */ + if (!AllTablesyncsReady()) + return; How is it ensured that new tables are not added to subscription via refresh immediately after this check that are not yet in ready state? I mean if it happens immediately after this check then the problem described by comment can happen and we may end up advancing the non-removable-xid incorrectly. If this is safe, then please update comments to reflect the same. 2. + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. + */ + memset(data, 0, offsetof(RetainConflictInfoData, candidate_xid_time)); Wouldn't it be better to initialize each field separately? With the current code, adding new fields at the end of the structure RetainConflictInfoData will be difficult. 3. + * TODO: The remote flush location (last_flushpos) is currently not updated + * during change application, making it impossible to satisfy the condition of + * the final phase (RCI_WAIT_FOR_LOCAL_FLUSH) for advancing the transaction ID. + * Consider updating the remote flush position in the final phase to enable + * advancement during change application. + */ +static inline bool +can_advance_nonremovable_xid(RetainConflictInfoData *data) I think we don't need this TODO here as there is XXX comment in wait_for_local_flush() which has the same information. 4. @@ -2314,6 +2316,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'S': + ProcessStandbyPSRequestMessage(); + break; Why do we use the capital message name 'S' when other messages in ProcessStandbyMessage() are all small cases? I see that walsender already handles 'S' message in HandleUploadManifestPacket() though it won't conflict with this case. But still, shouldn't we use a different message here? 5. The apply worker needs to at least twice get the publisher status message to advance oldest_nonremovable_xid once. It then uses the remote_lsn of the last such message to ensure that it has been applied locally. Such a remote_lsn could be a much later value than required leading to delay in advancing oldest_nonremovable_xid. How about if while first time processing the publisher_status message on walsender, we get the latest_transaction_in_commit by having a function GetLatestTransactionIdInCommit() instead of GetOldestTransactionIdInCommit() and then simply wait till that proc has written commit WAL (aka wait till it clears DELAY_CHKPT_IN_COMMIT)? Then get the latest LSN wrote and send that to apply worker waiting for the publisher_status message. If this is feasible then we should be able to advance oldest_nonremovable_xid with just one publisher_status message. Won't that be an improvement over current? If so, we can even further try to improve it by just using commit_LSN of the transaction returned by GetLatestTransactionIdInCommit(). One idea is that we can try to use MyProc->waitLSN which we are using in synchronous replication for our purpose. See SyncRepWaitForLSN. 6. Attached, a few minor comment updates. -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 6b79aa441b..afe007bd50 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4312,12 +4312,6 @@ wait_for_local_flush(RetainConflictInfoData *data) /* * Determine if we can attempt to advance transaction ID. - * - * TODO: The remote flush location (last_flushpos) is currently not updated - * during change application, making it impossible to satisfy the condition of - * the final phase (RCI_WAIT_FOR_LOCAL_FLUSH) for advancing the transaction ID. - * Consider updating the remote flush position in the final phase to enable - * advancement during change application. */ static inline bool can_advance_nonremovable_xid(RetainConflictInfoData *data) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 00b6411c7e..94acca15e3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2678,6 +2678,10 @@ ProcessStandbyPSRequestMessage(void) WalSnd *walsnd = MyWalSnd; TimestampTz replyTime; + /* + * This shouldn't happen because we don't support getting publisher_status + * message from standby. + */ if (RecoveryInProgress()) elog(ERROR, "the primary status is unavailable during recovery");