On Wed, Dec 4, 2024 at 4:29 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> 1.
> + if (can_advance_nonremovable_xid(&data, last_recv_timestamp))
> + maybe_advance_nonremovable_xid(&data);
>
> In can_advance_nonremovable_xid(), we determine whether to advance the
> oldest xid based on 'last_recv_timestamp' and then again in
> maybe_advance_nonremovable_xid()->get_candidate_xid(), we compare it
> with the current time. How does that make sense? Shall we use
> 'last_recv_timestamp' directly in get_candidate_xid() as that will
> avoid the additional time check in can_advance_nonremovable_xid()?
>
> 2.
> + TimestampTz next_attempt_time; /* when to attemp to advance the xid during
> + * change application */
> +} RetainConflictInfoData;
>
> This new variable introduced in this version is not used in the patch.
> Any reason or just a leftover?
>
> Apart from the above, I have made a few updates in the comments in the
> attached. Please include those after review.
>

A few more comments:
1.
+static void
+wait_for_local_flush(RetainConflictInfoData *data)
{
...
+
+ data->phase = RCI_GET_CANDIDATE_XID;
+
+ maybe_advance_nonremovable_xid(data);
+}

Isn't it better to reset all the fields of data before the next round
of GET_CANDIDATE_XID phase? If we do that then we don't need to reset
data->remote_lsn = InvalidXLogRecPtr; and data->last_phase_at =
InvalidFullTransactionId; individually in request_publisher_status()
and get_candidate_xid() respectively. Also, it looks clean and logical
to me unless I am missing something.

2.
+ /*
+ * Issue a warning if there is a detected clock skew between the publisher
+ * and subscriber.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(data->reply_time,
+    data->candidate_xid_time, 0))
+ ereport(WARNING,
+ errmsg("non-removable transaction ID may be advanced prematurely"),
+ errdetail("The clock on the publisher is behind that of the subscriber."));

Shouldn't this be an ERROR as this will lead to the removal of rows
required to detect update_delete conflict?

Apart from the above, I have made a few more updates in the comments
in the attached. Please include those after review.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 06ba6d3a64..e89e811c51 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4135,6 +4135,7 @@ get_candidate_xid(RetainConflictInfoData *data)
        data->last_phase_at = InvalidFullTransactionId;
        data->phase = RCI_REQUEST_PUBLISHER_STATUS;
 
+       /* process the next phase */
        maybe_advance_nonremovable_xid(data);
 }
 
@@ -4156,6 +4157,10 @@ request_publisher_status(RetainConflictInfoData *data)
        else
                resetStringInfo(request_message);
 
+       /*
+        * We send the current time to update the remote walsender's latest 
reply
+        * message received time.
+        */
        pq_sendbyte(request_message, 'S');
        pq_sendint64(request_message, GetCurrentTimestamp());
 
@@ -4213,6 +4218,7 @@ wait_for_publisher_status(RetainConflictInfoData *data)
        else
                data->phase = RCI_REQUEST_PUBLISHER_STATUS;
 
+       /* process the next phase */
        maybe_advance_nonremovable_xid(data);
 }
 
@@ -4226,8 +4232,10 @@ wait_for_local_flush(RetainConflictInfoData *data)
                   FullTransactionIdIsValid(data->candidate_xid));
 
        /*
-        * Issue a warning if there is a detected clock skew between the 
publisher
-        * and subscriber.
+        * We expect the publisher and subscriber clocks to be in sync using
+        * time sync service like NTP. Otherwise, we will advance this worker's
+        * oldest_nonremovable_xid prematurely, leading to the removal of rows
+        * required to detect update_delete conflict.
         *
         * XXX Consider waiting for the publisher's clock to catch up with the
         * subscriber's before proceeding to the next phase.
@@ -4235,7 +4243,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
        if (TimestampDifferenceExceeds(data->reply_time,
                                                                   
data->candidate_xid_time, 0))
                ereport(WARNING,
-                               errmsg("non-removable transaction ID may be 
advanced prematurely"),
+                               errmsg("oldest_nonremovable_xid transaction ID 
may be advanced prematurely"),
                                errdetail("The clock on the publisher is behind 
that of the subscriber."));
 
        /*
@@ -4276,6 +4284,7 @@ wait_for_local_flush(RetainConflictInfoData *data)
 
        data->phase = RCI_GET_CANDIDATE_XID;
 
+       /* process the next phase */
        maybe_advance_nonremovable_xid(data);
 }
 

Reply via email to