On Wed, Dec 11, 2024 at 2:32 PM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> Attach the V16 patch set which addressed above comments.
>

Review Comments
================
1.
+/*
+ * Workhorse for the RCI_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainConflictInfoData *data)
+{
...
+ /*
+ * Do not attempt to advance the non-removable transaction ID when table
+ * sync is in progress. During this time, changes from a single
+ * transaction may be applied by multiple table sync workers corresponding
+ * to the target tables. In this case, confirming the apply and flush
+ * progress across all table sync workers is complex and not worth the
+ * effort.
+ */
+ if (!AllTablesyncsReady())
+ return;

How is it ensured that new tables are not added to subscription via
refresh immediately after this check that are not yet in ready state?
I mean if it happens immediately after this check then the problem
described by comment can happen and we may end up advancing the
non-removable-xid incorrectly. If this is safe, then please update
comments to reflect the same.

2.
+ /*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement.
+ */
+ memset(data, 0, offsetof(RetainConflictInfoData, candidate_xid_time));

Wouldn't it be better to initialize each field separately? With the
current code, adding new fields at the end of the structure
RetainConflictInfoData will be difficult.

3.
+ * TODO: The remote flush location (last_flushpos) is currently not updated
+ * during change application, making it impossible to satisfy the condition of
+ * the final phase (RCI_WAIT_FOR_LOCAL_FLUSH) for advancing the transaction ID.
+ * Consider updating the remote flush position in the final phase to enable
+ * advancement during change application.
+ */
+static inline bool
+can_advance_nonremovable_xid(RetainConflictInfoData *data)

I think we don't need this TODO here as there is XXX comment in
wait_for_local_flush() which has the same information.

4.
@@ -2314,6 +2316,10 @@ ProcessStandbyMessage(void)
  ProcessStandbyHSFeedbackMessage();
  break;

+ case 'S':
+ ProcessStandbyPSRequestMessage();
+ break;

Why do we use the capital message name 'S' when other messages in
ProcessStandbyMessage() are all small cases? I see that walsender
already handles 'S' message in HandleUploadManifestPacket() though it
won't conflict with this case. But still, shouldn't we use a different
message here?

5. The apply worker needs to at least twice get the publisher status
message to advance oldest_nonremovable_xid once. It then uses the
remote_lsn of the last such message to ensure that it has been applied
locally. Such a remote_lsn could be a much later value than required
leading to delay in advancing oldest_nonremovable_xid. How about if
while first time processing the publisher_status message on walsender,
we get the latest_transaction_in_commit by having a function
GetLatestTransactionIdInCommit() instead of
GetOldestTransactionIdInCommit() and then simply wait till that proc
has written commit WAL (aka wait till it clears
DELAY_CHKPT_IN_COMMIT)? Then get the latest LSN wrote and send that to
apply worker waiting for the publisher_status message. If this is
feasible then we should be able to advance oldest_nonremovable_xid
with just one publisher_status message. Won't that be an improvement
over current? If so, we can even further try to improve it by just
using commit_LSN of the transaction returned by
GetLatestTransactionIdInCommit(). One idea is that we can try to use
MyProc->waitLSN which we are using in synchronous replication for our
purpose. See SyncRepWaitForLSN.

6. Attached, a few minor comment updates.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 6b79aa441b..afe007bd50 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4312,12 +4312,6 @@ wait_for_local_flush(RetainConflictInfoData *data)
 
 /*
  * Determine if we can attempt to advance transaction ID.
- *
- * TODO: The remote flush location (last_flushpos) is currently not updated
- * during change application, making it impossible to satisfy the condition of
- * the final phase (RCI_WAIT_FOR_LOCAL_FLUSH) for advancing the transaction ID.
- * Consider updating the remote flush position in the final phase to enable
- * advancement during change application.
  */
 static inline bool
 can_advance_nonremovable_xid(RetainConflictInfoData *data)
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 00b6411c7e..94acca15e3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2678,6 +2678,10 @@ ProcessStandbyPSRequestMessage(void)
        WalSnd     *walsnd = MyWalSnd;
        TimestampTz replyTime;
 
+       /*
+        * This shouldn't happen because we don't support getting 
publisher_status
+        * message from standby.
+        */
        if (RecoveryInProgress())
                elog(ERROR, "the primary status is unavailable during 
recovery");
 

Reply via email to