On 2025-Jul-28, Dave Cramer wrote: > I chose PqReplMsg patch attached
I think you sent a patch that applied on top of your previous patch instead of a patch on top of master. Here it is as a standalone patch. -- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
>From b21aa102999bf26ee20bf9eb916ea6aeee3e1edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvhe...@kurilemu.de> Date: Mon, 4 Aug 2025 14:27:53 +0200 Subject: [PATCH v3] Replace protocol replication constants with named constants from protocol.h MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Author: Dave Cramer <davecra...@gmail.com> Reviewed-by: Nathan Bossart <nathandboss...@gmail.com> Reviewed-by: Jacob Champion <jacob.champ...@enterprisedb.com> Reviewed-by: Álvaro Herrera <alvhe...@kurilemu.de> Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan --- src/backend/replication/logical/worker.c | 10 +++++----- src/backend/replication/walreceiver.c | 4 ++-- src/backend/replication/walsender.c | 18 +++++++++--------- src/include/libpq/protocol.h | 13 +++++++++++++ 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 89e241c8392..4594db65fec 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) c = pq_getmsgbyte(&s); - if (c == 'w') + if (c == PqReplMsg_WALData) { XLogRecPtr start_lsn; XLogRecPtr end_lsn; @@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_advance_nonremovable_xid(&rdt_data, false); } - else if (c == 'k') + else if (c == PqReplMsg_PrimaryKeepAlive) { XLogRecPtr end_lsn; TimestampTz timestamp; @@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, timestamp, true); } - else if (c == 's') /* Primary status update */ + else if (c == PqReplMsg_PrimaryStatusUpdate) /* Primary status update */ { rdt_data.remote_lsn = pq_getmsgint64(&s); rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); @@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) else resetStringInfo(reply_message); - pq_sendbyte(reply_message, 'r'); + pq_sendbyte(reply_message, PqReplMsg_StandbyStatus); pq_sendint64(reply_message, recvpos); /* write */ pq_sendint64(reply_message, flushpos); /* flush */ pq_sendint64(reply_message, writepos); /* apply */ @@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data) * Send the current time to update the remote walsender's latest reply * message received time. */ - pq_sendbyte(request_message, 'p'); + pq_sendbyte(request_message, PqReplMsg_RequestPrimaryStatus); pq_sendint64(request_message, GetCurrentTimestamp()); elog(DEBUG2, "sending publisher status request message"); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b6281101711..82798f2aedb 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) applyPtr = GetXLogReplayRecPtr(NULL); resetStringInfo(&reply_message); - pq_sendbyte(&reply_message, 'r'); + pq_sendbyte(&reply_message, PqReplMsg_StandbyStatus); pq_sendint64(&reply_message, writePtr); pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); @@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed) /* Construct the message and send it. */ resetStringInfo(&reply_message); - pq_sendbyte(&reply_message, 'h'); + pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendint32(&reply_message, xmin); pq_sendint32(&reply_message, xmin_epoch); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ee911394a23..b19a7551b9f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi resetStringInfo(ctx->out); - pq_sendbyte(ctx->out, 'w'); + pq_sendbyte(ctx->out, PqReplMsg_WALData); pq_sendint64(ctx->out, lsn); /* dataStart */ pq_sendint64(ctx->out, lsn); /* walEnd */ @@ -2292,7 +2292,7 @@ ProcessRepliesIfAny(void) switch (firstchar) { /* - * 'd' means a standby reply wrapped in a CopyData packet. + * standby reply wrapped in a CopyData packet. */ case PqMsg_CopyData: ProcessStandbyMessage(); @@ -2315,7 +2315,7 @@ ProcessRepliesIfAny(void) break; /* - * 'X' means that the standby is closing down the socket. + * the standby is closing down the socket. */ case PqMsg_Terminate: proc_exit(0); @@ -2350,15 +2350,15 @@ ProcessStandbyMessage(void) switch (msgtype) { - case 'r': + case PqReplMsg_StandbyStatus: ProcessStandbyReplyMessage(); break; - case 'h': + case PqReplMsg_HotStandbyFeedback: ProcessStandbyHSFeedbackMessage(); break; - case 'p': + case PqReplMsg_RequestPrimaryStatus: ProcessStandbyPSRequestMessage(); break; @@ -2752,7 +2752,7 @@ ProcessStandbyPSRequestMessage(void) /* construct the message... */ resetStringInfo(&output_message); - pq_sendbyte(&output_message, 's'); + pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate); pq_sendint64(&output_message, lsn); pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit)); pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid)); @@ -3364,7 +3364,7 @@ XLogSendPhysical(void) * OK to read and send the slice. */ resetStringInfo(&output_message); - pq_sendbyte(&output_message, 'w'); + pq_sendbyte(&output_message, PqReplMsg_WALData); pq_sendint64(&output_message, startptr); /* dataStart */ pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ @@ -4135,7 +4135,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) /* construct the message... */ resetStringInfo(&output_message); - pq_sendbyte(&output_message, 'k'); + pq_sendbyte(&output_message, PqReplMsg_PrimaryKeepAlive); pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr); pq_sendint64(&output_message, GetCurrentTimestamp()); pq_sendbyte(&output_message, requestReply ? 1 : 0); diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h index b0bcb3cdc26..ce37e11c3a3 100644 --- a/src/include/libpq/protocol.h +++ b/src/include/libpq/protocol.h @@ -86,4 +86,17 @@ #define AUTH_REQ_SASL_FIN 12 /* Final SASL message */ #define AUTH_REQ_MAX AUTH_REQ_SASL_FIN /* maximum AUTH_REQ_* value */ +/* Replication Protocol, sent by the primary */ + +#define PqReplMsg_WALData 'w' +#define PqReplMsg_PrimaryKeepAlive 'k' +#define PqReplMsg_PrimaryStatusUpdate 's' + +/* Replication Protocol, sent by the standby */ + +#define PqReplMsg_StandbyStatus 'r' +#define PqReplMsg_HotStandbyFeedback 'h' +#define PqReplMsg_RequestPrimaryStatus 'p' + + #endif /* PROTOCOL_H */ -- 2.39.5