On Wed, Dec 25, 2024 at 8:13 AM Zhijie Hou (Fujitsu) <houzj.f...@fujitsu.com> wrote: > > Attach the new version patch set which addressed all other comments. >
Review comments on 0001 and 0002 ============================= 1. /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. + */ + data->phase = RCI_GET_CANDIDATE_XID; + data->remote_lsn = InvalidXLogRecPtr; There is no comment in the data-structure RetainConflictInfoData that indicates the fields used to determine the timing for the next round of transaction ID advancement. Can we add a comment in RetainConflictInfoData to indicate the same? 2. + int xid_advancement_interval; /* how much time (ms) to wait + * before attempting to advance + * the non-removable transaction + * ID */ } RetainConflictInfoData; Shall we rename it to a bit simpler name xid_advance_interval? If you agree with this change, we can probably rename adjust_xid_advancement_interval() to adjust_xid_advance_interval() as well. 3. +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. + */ +#define MIN_XID_ADVANCEMENT_INTERVAL 100 +#define MAX_XID_ADVANCEMENT_INTERVAL 180000 Is there any reason to keep the maximum value as 3 minutes? If not then mention that it is arbitrary and sufficient to not cause any undue network traffic. 4. @@ -4129,7 +4149,7 @@ get_candidate_xid(RetainConflictInfoData *data) * can consider the other interval or a separate GUC if the need arises. */ if (!TimestampDifferenceExceeds(data->candidate_xid_time, now, - wal_receiver_status_interval * 1000)) + data->xid_advancement_interval)) The comment atop the above change in the second patch needs to change. 5. +static void +adjust_xid_advancement_interval(RetainConflictInfoData *data, bool new_xid_found) Let's move the location of this function to after can_advance_nonremovable_xid(). This is to keep the functions to transition the retain_data_phases together. Apart from the above, I have made changes in a few comments in the attached. Please include those after review and combine 0001 and 0002 as one patch. -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d5772a6b22..090aae126b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -391,8 +391,9 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; static BufFile *stream_fd = NULL; /* - * The remote WAL position that has been applied and flushed locally. Refer to - * send_feedback() for details on its usage. + * The remote WAL position that has been applied and flushed locally. We + * record this information while sending feedback to the server and use this + * both while sending feedback and advancing oldest_nonremovable_xid. */ static XLogRecPtr last_flushpos = InvalidXLogRecPtr; @@ -3849,7 +3850,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) wait_time = NAPTIME_PER_CYCLE; /* - * Ensure to wake up when it's possible to attempt advancing the + * Ensure to wake up when it's possible to attempt to advance the * non-removable transaction ID. */ if (data.phase == RCI_GET_CANDIDATE_XID && data.xid_advancement_interval) @@ -4066,7 +4067,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to - * REQUEST_PUBLISHER_STATUS if concurrent remote transactions persist) -> + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. * * Retaining the dead tuples for this period is sufficient for ensuring @@ -4184,11 +4185,14 @@ get_candidate_xid(RetainConflictInfoData *data) /* * Adjust the interval for advancing non-removable transaction IDs. * - * If no new transaction ID has been assigned since the last advancement, the - * interval is doubled. This increase is limited by the - * wal_receiver_status_interval if it is not zero, or otherwise restricted to a - * maximum of 3 minutes. If a new transaction ID is detected, the interval is - * reset to a minimum of 100ms. + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. */ static void adjust_xid_advancement_interval(RetainConflictInfoData *data, bool new_xid_found) @@ -4200,8 +4204,8 @@ adjust_xid_advancement_interval(RetainConflictInfoData *data, bool new_xid_found : MAX_XID_ADVANCEMENT_INTERVAL; /* - * No new transaction ID assigned since the last check, so double the - * interval, but not beyond the maximum allowable value. + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. */ data->xid_advancement_interval = Min(data->xid_advancement_interval * 2, max_interval); @@ -4331,11 +4335,8 @@ wait_for_local_flush(RetainConflictInfoData *data) * effort. * * It is safe to add new tables with initial states to the subscription - * after this check because WAL positions of changes from these new - * tables, which will be applied, should be greater than remote_lsn and - * are included in transactions with later commit timestamps. So, there is - * no need to wait for these changes to be applied in this round of - * advancement. + * after this check because any changes applied to these tables should have + * a WAL position greater than the data->remote_lsn. */ if (!AllTablesyncsReady()) return; @@ -4354,8 +4355,7 @@ wait_for_local_flush(RetainConflictInfoData *data) /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and - * flushed locally. So, now we can advance the non-removable transaction - * ID. + * flushed locally. So, we can advance the non-removable transaction ID. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1eab8a5e46..22cdd0a591 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -87,20 +87,18 @@ typedef struct LogicalRepWorker bool parallel_apply; /* - * The changes made by this and later transactions are still non-removable - * to allow for the detection of update_deleted conflicts when applying + * The changes made by this and later transactions shouldn't be removed. + * This allows the detection of update_deleted conflicts when applying * changes in this logical replication worker. * * Note that this info cannot directly protect dead tuples from being * prematurely frozen or removed. The logical replication launcher * asynchronously collects this info to determine whether to advance the - * xmin value of the replication slot. + * xmin value of its replication slot. * - * Therefore, FullTransactionId that includes both the transaction ID and - * its epoch is used here instead of a single Transaction ID. This is - * critical because without considering the epoch, the transaction ID - * alone may appear as if it is in the future due to transaction ID - * wraparound. + * We need to use FullTransactionId here because without considering the + * epoch, the transaction ID alone may appear as if it is in the future due + * to the transaction ID wraparound. */ FullTransactionId oldest_nonremovable_xid;