On Wed, Dec 25, 2024 at 8:13 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> Attach the new version patch set which addressed all other comments.
>

Review comments on 0001 and 0002
=============================
1.
/*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement.
+ */
+ data->phase = RCI_GET_CANDIDATE_XID;
+ data->remote_lsn = InvalidXLogRecPtr;

There is no comment in the data-structure RetainConflictInfoData that
indicates the fields used to determine the timing for the next round
of transaction ID advancement. Can we add a comment in
RetainConflictInfoData to indicate the same?

2.
+ int xid_advancement_interval; /* how much time (ms) to wait
+ * before attempting to advance
+ * the non-removable transaction
+ * ID */
 } RetainConflictInfoData;

Shall we rename it to a bit simpler name xid_advance_interval? If you
agree with this change, we can probably rename
adjust_xid_advancement_interval() to adjust_xid_advance_interval() as
well.

3.
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs.
+ */
+#define MIN_XID_ADVANCEMENT_INTERVAL 100
+#define MAX_XID_ADVANCEMENT_INTERVAL 180000

Is there any reason to keep the maximum value as 3 minutes? If not
then mention that it is arbitrary and sufficient to not cause any
undue network traffic.

4.
@@ -4129,7 +4149,7 @@ get_candidate_xid(RetainConflictInfoData *data)
  * can consider the other interval or a separate GUC if the need arises.
  */
  if (!TimestampDifferenceExceeds(data->candidate_xid_time, now,
- wal_receiver_status_interval * 1000))
+ data->xid_advancement_interval))

The comment atop the above change in the second patch needs to change.

5.
+static void
+adjust_xid_advancement_interval(RetainConflictInfoData *data, bool
new_xid_found)

Let's move the location of this function to after
can_advance_nonremovable_xid(). This is to keep the functions to
transition the retain_data_phases together.

Apart from the above, I have made changes in a few comments in the
attached. Please include those after review and combine 0001 and 0002
as one patch.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index d5772a6b22..090aae126b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -391,8 +391,9 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 static BufFile *stream_fd = NULL;
 
 /*
- * The remote WAL position that has been applied and flushed locally. Refer to
- * send_feedback() for details on its usage.
+ * The remote WAL position that has been applied and flushed locally. We
+ * record this information while sending feedback to the server and use this
+ * both while sending feedback and advancing oldest_nonremovable_xid.
  */
 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
 
@@ -3849,7 +3850,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                        wait_time = NAPTIME_PER_CYCLE;
 
                /*
-                * Ensure to wake up when it's possible to attempt advancing the
+                * Ensure to wake up when it's possible to attempt to advance 
the
                 * non-removable transaction ID.
                 */
                if (data.phase == RCI_GET_CANDIDATE_XID && 
data.xid_advancement_interval)
@@ -4066,7 +4067,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool 
requestReply)
  *
  * The overall state progression is: GET_CANDIDATE_XID ->
  * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
- * REQUEST_PUBLISHER_STATUS if concurrent remote transactions persist) ->
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
  * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
  *
  * Retaining the dead tuples for this period is sufficient for ensuring
@@ -4184,11 +4185,14 @@ get_candidate_xid(RetainConflictInfoData *data)
 /*
  * Adjust the interval for advancing non-removable transaction IDs.
  *
- * If no new transaction ID has been assigned since the last advancement, the
- * interval is doubled. This increase is limited by the
- * wal_receiver_status_interval if it is not zero, or otherwise restricted to a
- * maximum of 3 minutes. If a new transaction ID is detected, the interval is
- * reset to a minimum of 100ms.
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
  */
 static void
 adjust_xid_advancement_interval(RetainConflictInfoData *data, bool 
new_xid_found)
@@ -4200,8 +4204,8 @@ adjust_xid_advancement_interval(RetainConflictInfoData 
*data, bool new_xid_found
                        : MAX_XID_ADVANCEMENT_INTERVAL;
 
                /*
-                * No new transaction ID assigned since the last check, so 
double the
-                * interval, but not beyond the maximum allowable value.
+                * No new transaction ID has been assigned since the last 
check, so
+                * double the interval, but not beyond the maximum allowable 
value.
                 */
                data->xid_advancement_interval = 
Min(data->xid_advancement_interval * 2,
                                                                                
         max_interval);
@@ -4331,11 +4335,8 @@ wait_for_local_flush(RetainConflictInfoData *data)
         * effort.
         *
         * It is safe to add new tables with initial states to the subscription
-        * after this check because WAL positions of changes from these new
-        * tables, which will be applied, should be greater than remote_lsn and
-        * are included in transactions with later commit timestamps. So, there 
is
-        * no need to wait for these changes to be applied in this round of
-        * advancement.
+        * after this check because any changes applied to these tables should 
have
+        * a WAL position greater than the data->remote_lsn.
         */
        if (!AllTablesyncsReady())
                return;
@@ -4354,8 +4355,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
        /*
         * Reaching here means the remote WAL position has been received, and 
all
         * transactions up to that position on the publisher have been applied 
and
-        * flushed locally. So, now we can advance the non-removable transaction
-        * ID.
+        * flushed locally. So, we can advance the non-removable transaction ID.
         */
        SpinLockAcquire(&MyLogicalRepWorker->relmutex);
        MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid;
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 1eab8a5e46..22cdd0a591 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -87,20 +87,18 @@ typedef struct LogicalRepWorker
        bool            parallel_apply;
 
        /*
-        * The changes made by this and later transactions are still 
non-removable
-        * to allow for the detection of update_deleted conflicts when applying
+        * The changes made by this and later transactions shouldn't be removed.
+        * This allows the detection of update_deleted conflicts when applying
         * changes in this logical replication worker.
         *
         * Note that this info cannot directly protect dead tuples from being
         * prematurely frozen or removed. The logical replication launcher
         * asynchronously collects this info to determine whether to advance the
-        * xmin value of the replication slot.
+        * xmin value of its replication slot.
         *
-        * Therefore, FullTransactionId that includes both the transaction ID 
and
-        * its epoch is used here instead of a single Transaction ID. This is
-        * critical because without considering the epoch, the transaction ID
-        * alone may appear as if it is in the future due to transaction ID
-        * wraparound.
+        * We need to use FullTransactionId here because without considering the
+        * epoch, the transaction ID alone may appear as if it is in the future 
due
+        * to the transaction ID wraparound.
         */
        FullTransactionId oldest_nonremovable_xid;
 

Reply via email to