On Fri, Nov 29, 2024 at 7:54 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>
> It is possible to reach here if user creates a subscription with
> (connect=false,detect_update_deleted=true), in which case we could not check 
> it
> during creation. But I agree that it would be better to report an ERROR here,
> so changed as suggested. After this change, there is no need to check the
> invalid remote lsn in apply worker and thus the error can also be removed.
>

1.
if (XLogRecPtrIsInvalid(data.remote_lsn))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot get the latest WAL position from the publisher"),
+ errdetail("The connected publisher is also a standby server."));
+

Instead of removing this message from the patch, we should change it
to elog(ERROR, category of ERROR.

2.
+ Timestamp xid_advance_attempt_time; /* when the candidate_xid is
+ * decided */

How about naming this variable as candidate_xid_time /* time when the
next candidate_xid is computed */?

3.
+ /* Return if the new transaction ID is unchanged */
+ if 
(FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ full_xid))
+ return;

This comment is unclear. Can we change it to: "Return if the
oldest_nonremovable_xid can't be advanced" or something like that?

4.
+request_publisher_status(RetainConflictInfoData *data)
+{
+ if (!reply_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ reply_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(reply_message);
+
+ pq_sendbyte(reply_message, 'S');
+ pq_sendint64(reply_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");

The name 'reply_message' sounds confusing as this is a request
message. Can we change it to request_message? Also, let's avoid
reusing the same variable among different messages as it makes the
code unclear.

Apart from the above, I have modified a few comments after applying
0001 and 0002 in the attached.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 07e9916a1b..75fd3a5f7f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -280,8 +280,8 @@ typedef enum
 /*
  * The phases involved in advancing the non-removable transaction ID.
  *
- * Refer to maybe_advance_nonremovable_xid() for details on how the function
- * transitions between these phases.
+ * See maybe_advance_nonremovable_xid() for details of the transition
+ * between these phases.
  */
 typedef enum
 {
@@ -385,7 +385,7 @@ static BufFile *stream_fd = NULL;
  */
 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
 
-/* Buffers for constructing outgoing messages. */
+/* Buffer for constructing outgoing messages. */
 static StringInfo reply_message = NULL;
 
 typedef struct SubXactInfo
@@ -4082,8 +4082,12 @@ get_candidate_xid(RetainConflictInfoData *data)
        now = GetCurrentTimestamp();
 
        /*
-        * Compute the candidate_xid and send a message at most once per
-        * wal_receiver_status_interval.
+        * Compute the candidate_xid and request the publisher status at most 
once
+        * per wal_receiver_status_interval. This is to avoid using CPU and 
network
+        * resources without making much progress.
+        *
+        * XXX The use of wal_receiver_status_interval is a bit arbitrary so we 
can
+        * consider the other interval or a separate GUC if the need arises.
         */
        if (!TimestampDifferenceExceeds(data->xid_advance_attempt_time, now,
                                                                        
wal_receiver_status_interval * 1000))

Reply via email to