On Fri, Nov 29, 2024 at 7:54 AM Zhijie Hou (Fujitsu) <houzj.f...@fujitsu.com> wrote: > > It is possible to reach here if user creates a subscription with > (connect=false,detect_update_deleted=true), in which case we could not check > it > during creation. But I agree that it would be better to report an ERROR here, > so changed as suggested. After this change, there is no need to check the > invalid remote lsn in apply worker and thus the error can also be removed. >
1. if (XLogRecPtrIsInvalid(data.remote_lsn)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot get the latest WAL position from the publisher"), + errdetail("The connected publisher is also a standby server.")); + Instead of removing this message from the patch, we should change it to elog(ERROR, category of ERROR. 2. + Timestamp xid_advance_attempt_time; /* when the candidate_xid is + * decided */ How about naming this variable as candidate_xid_time /* time when the next candidate_xid is computed */? 3. + /* Return if the new transaction ID is unchanged */ + if (FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_xid)) + return; This comment is unclear. Can we change it to: "Return if the oldest_nonremovable_xid can't be advanced" or something like that? 4. +request_publisher_status(RetainConflictInfoData *data) +{ + if (!reply_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + reply_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(reply_message); + + pq_sendbyte(reply_message, 'S'); + pq_sendint64(reply_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); The name 'reply_message' sounds confusing as this is a request message. Can we change it to request_message? Also, let's avoid reusing the same variable among different messages as it makes the code unclear. Apart from the above, I have modified a few comments after applying 0001 and 0002 in the attached. -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 07e9916a1b..75fd3a5f7f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -280,8 +280,8 @@ typedef enum /* * The phases involved in advancing the non-removable transaction ID. * - * Refer to maybe_advance_nonremovable_xid() for details on how the function - * transitions between these phases. + * See maybe_advance_nonremovable_xid() for details of the transition + * between these phases. */ typedef enum { @@ -385,7 +385,7 @@ static BufFile *stream_fd = NULL; */ static XLogRecPtr last_flushpos = InvalidXLogRecPtr; -/* Buffers for constructing outgoing messages. */ +/* Buffer for constructing outgoing messages. */ static StringInfo reply_message = NULL; typedef struct SubXactInfo @@ -4082,8 +4082,12 @@ get_candidate_xid(RetainConflictInfoData *data) now = GetCurrentTimestamp(); /* - * Compute the candidate_xid and send a message at most once per - * wal_receiver_status_interval. + * Compute the candidate_xid and request the publisher status at most once + * per wal_receiver_status_interval. This is to avoid using CPU and network + * resources without making much progress. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. */ if (!TimestampDifferenceExceeds(data->xid_advance_attempt_time, now, wal_receiver_status_interval * 1000))