On Wed, Dec 4, 2024 at 4:29 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > 1. > + if (can_advance_nonremovable_xid(&data, last_recv_timestamp)) > + maybe_advance_nonremovable_xid(&data); > > In can_advance_nonremovable_xid(), we determine whether to advance the > oldest xid based on 'last_recv_timestamp' and then again in > maybe_advance_nonremovable_xid()->get_candidate_xid(), we compare it > with the current time. How does that make sense? Shall we use > 'last_recv_timestamp' directly in get_candidate_xid() as that will > avoid the additional time check in can_advance_nonremovable_xid()? > > 2. > + TimestampTz next_attempt_time; /* when to attemp to advance the xid during > + * change application */ > +} RetainConflictInfoData; > > This new variable introduced in this version is not used in the patch. > Any reason or just a leftover? > > Apart from the above, I have made a few updates in the comments in the > attached. Please include those after review. >
A few more comments: 1. +static void +wait_for_local_flush(RetainConflictInfoData *data) { ... + + data->phase = RCI_GET_CANDIDATE_XID; + + maybe_advance_nonremovable_xid(data); +} Isn't it better to reset all the fields of data before the next round of GET_CANDIDATE_XID phase? If we do that then we don't need to reset data->remote_lsn = InvalidXLogRecPtr; and data->last_phase_at = InvalidFullTransactionId; individually in request_publisher_status() and get_candidate_xid() respectively. Also, it looks clean and logical to me unless I am missing something. 2. + /* + * Issue a warning if there is a detected clock skew between the publisher + * and subscriber. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(data->reply_time, + data->candidate_xid_time, 0)) + ereport(WARNING, + errmsg("non-removable transaction ID may be advanced prematurely"), + errdetail("The clock on the publisher is behind that of the subscriber.")); Shouldn't this be an ERROR as this will lead to the removal of rows required to detect update_delete conflict? Apart from the above, I have made a few more updates in the comments in the attached. Please include those after review. -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 06ba6d3a64..e89e811c51 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4135,6 +4135,7 @@ get_candidate_xid(RetainConflictInfoData *data) data->last_phase_at = InvalidFullTransactionId; data->phase = RCI_REQUEST_PUBLISHER_STATUS; + /* process the next phase */ maybe_advance_nonremovable_xid(data); } @@ -4156,6 +4157,10 @@ request_publisher_status(RetainConflictInfoData *data) else resetStringInfo(request_message); + /* + * We send the current time to update the remote walsender's latest reply + * message received time. + */ pq_sendbyte(request_message, 'S'); pq_sendint64(request_message, GetCurrentTimestamp()); @@ -4213,6 +4218,7 @@ wait_for_publisher_status(RetainConflictInfoData *data) else data->phase = RCI_REQUEST_PUBLISHER_STATUS; + /* process the next phase */ maybe_advance_nonremovable_xid(data); } @@ -4226,8 +4232,10 @@ wait_for_local_flush(RetainConflictInfoData *data) FullTransactionIdIsValid(data->candidate_xid)); /* - * Issue a warning if there is a detected clock skew between the publisher - * and subscriber. + * We expect the publisher and subscriber clocks to be in sync using + * time sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect update_delete conflict. * * XXX Consider waiting for the publisher's clock to catch up with the * subscriber's before proceeding to the next phase. @@ -4235,7 +4243,7 @@ wait_for_local_flush(RetainConflictInfoData *data) if (TimestampDifferenceExceeds(data->reply_time, data->candidate_xid_time, 0)) ereport(WARNING, - errmsg("non-removable transaction ID may be advanced prematurely"), + errmsg("oldest_nonremovable_xid transaction ID may be advanced prematurely"), errdetail("The clock on the publisher is behind that of the subscriber.")); /* @@ -4276,6 +4284,7 @@ wait_for_local_flush(RetainConflictInfoData *data) data->phase = RCI_GET_CANDIDATE_XID; + /* process the next phase */ maybe_advance_nonremovable_xid(data); }