Here is an updated patch that includes 1) added uses of PqMsg_* macros, 2)
new PqReplMsg_* macros, and 3) new PqBackupMsg_* macros.  Thoughts?

-- 
nathan
>From 6a1d03725009837c5ce99dcfc283fa565d587d13 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Tue, 5 Aug 2025 14:53:42 -0500
Subject: [PATCH v4 1/1] Expand usage of protocol characters.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Author: Dave Cramer <davecra...@gmail.com>
Co-authored-by: Fabrízio de Royes Mello <fabriziome...@gmail.com>
Co-authored-by: Nathan Bossart <nathandboss...@gmail.com>
Reviewed-by: Jacob Champion <jacob.champ...@enterprisedb.com>
Reviewed-by: Álvaro Herrera <alvhe...@kurilemu.de>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: 
https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
---
 src/backend/backup/basebackup_copy.c     | 14 +++++++-------
 src/backend/replication/logical/worker.c | 10 +++++-----
 src/backend/replication/walreceiver.c    |  4 ++--
 src/backend/replication/walsender.c      | 20 +++++++++++---------
 src/include/libpq/protocol.h             | 21 +++++++++++++++++++++
 5 files changed, 46 insertions(+), 23 deletions(-)

diff --git a/src/backend/backup/basebackup_copy.c 
b/src/backend/backup/basebackup_copy.c
index 18b0b5a52d3..eb45d3bcb66 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
        buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
        mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
        mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
-       mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+       mysink->msgbuffer[0] = PqMsg_CopyData;  /* archive or manifest data */
 
        /* Tell client the backup start location. */
        SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char 
*archive_name)
 
        ti = list_nth(state->tablespaces, state->tablespace_num);
        pq_beginmessage(&buf, PqMsg_CopyData);
-       pq_sendbyte(&buf, 'n');         /* New archive */
+       pq_sendbyte(&buf, PqBackupMsg_NewArchive);
        pq_sendstring(&buf, archive_name);
        pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
        pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
        if (mysink->send_to_client)
        {
                /* Add one because we're also sending a leading type byte. */
-               pq_putmessage('d', mysink->msgbuffer, len + 1);
+               pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
        }
 
        /* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
                        mysink->last_progress_report_time = now;
 
                        pq_beginmessage(&buf, PqMsg_CopyData);
-                       pq_sendbyte(&buf, 'p'); /* Progress report */
+                       pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
                        pq_sendint64(&buf, state->bytes_done);
                        pq_endmessage(&buf);
                        pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
        mysink->bytes_done_at_last_time_check = state->bytes_done;
        mysink->last_progress_report_time = GetCurrentTimestamp();
        pq_beginmessage(&buf, PqMsg_CopyData);
-       pq_sendbyte(&buf, 'p');         /* Progress report */
+       pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
        pq_sendint64(&buf, state->bytes_done);
        pq_endmessage(&buf);
        pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
        StringInfoData buf;
 
        pq_beginmessage(&buf, PqMsg_CopyData);
-       pq_sendbyte(&buf, 'm');         /* Manifest */
+       pq_sendbyte(&buf, PqBackupMsg_Manifest);
        pq_endmessage(&buf);
 }
 
@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t 
len)
        if (mysink->send_to_client)
        {
                /* Add one because we're also sending a leading type byte. */
-               pq_putmessage('d', mysink->msgbuffer, len + 1);
+               pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
        }
 }
 
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 89e241c8392..2035b54a065 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                                        c = pq_getmsgbyte(&s);
 
-                                       if (c == 'w')
+                                       if (c == PqReplMsg_WALData)
                                        {
                                                XLogRecPtr      start_lsn;
                                                XLogRecPtr      end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                                                
maybe_advance_nonremovable_xid(&rdt_data, false);
                                        }
-                                       else if (c == 'k')
+                                       else if (c == 
PqReplMsg_PrimaryKeepAlive)
                                        {
                                                XLogRecPtr      end_lsn;
                                                TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                                                
UpdateWorkerStats(last_received, timestamp, true);
                                        }
-                                       else if (c == 's')      /* Primary 
status update */
+                                       else if (c == 
PqReplMsg_PrimaryStatusUpdate)
                                        {
                                                rdt_data.remote_lsn = 
pq_getmsgint64(&s);
                                                rdt_data.remote_oldestxid = 
FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool 
requestReply)
        else
                resetStringInfo(reply_message);
 
-       pq_sendbyte(reply_message, 'r');
+       pq_sendbyte(reply_message, PqReplMsg_StandbyStatus);
        pq_sendint64(reply_message, recvpos);   /* write */
        pq_sendint64(reply_message, flushpos);  /* flush */
        pq_sendint64(reply_message, writepos);  /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
         * Send the current time to update the remote walsender's latest reply
         * message received time.
         */
-       pq_sendbyte(request_message, 'p');
+       pq_sendbyte(request_message, PqReplMsg_RequestPrimaryStatus);
        pq_sendint64(request_message, GetCurrentTimestamp());
 
        elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index b6281101711..82798f2aedb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
        applyPtr = GetXLogReplayRecPtr(NULL);
 
        resetStringInfo(&reply_message);
-       pq_sendbyte(&reply_message, 'r');
+       pq_sendbyte(&reply_message, PqReplMsg_StandbyStatus);
        pq_sendint64(&reply_message, writePtr);
        pq_sendint64(&reply_message, flushPtr);
        pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 
        /* Construct the message and send it. */
        resetStringInfo(&reply_message);
-       pq_sendbyte(&reply_message, 'h');
+       pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
        pq_sendint64(&reply_message, GetCurrentTimestamp());
        pq_sendint32(&reply_message, xmin);
        pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index ee911394a23..bf40bfbd3af 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, 
XLogRecPtr lsn, TransactionId xi
 
        resetStringInfo(ctx->out);
 
-       pq_sendbyte(ctx->out, 'w');
+       pq_sendbyte(ctx->out, PqReplMsg_WALData);
        pq_sendint64(ctx->out, lsn);    /* dataStart */
        pq_sendint64(ctx->out, lsn);    /* walEnd */
 
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
                switch (firstchar)
                {
                                /*
-                                * 'd' means a standby reply wrapped in a 
CopyData packet.
+                                * CopyData means a standby reply wrapped in a 
CopyData
+                                * packet.
                                 */
                        case PqMsg_CopyData:
                                ProcessStandbyMessage();
@@ -2315,7 +2316,8 @@ ProcessRepliesIfAny(void)
                                break;
 
                                /*
-                                * 'X' means that the standby is closing down 
the socket.
+                                * PqMsg_Terminate means that the standby is 
closing down the
+                                * socket.
                                 */
                        case PqMsg_Terminate:
                                proc_exit(0);
@@ -2350,15 +2352,15 @@ ProcessStandbyMessage(void)
 
        switch (msgtype)
        {
-               case 'r':
+               case PqReplMsg_StandbyStatus:
                        ProcessStandbyReplyMessage();
                        break;
 
-               case 'h':
+               case PqReplMsg_HotStandbyFeedback:
                        ProcessStandbyHSFeedbackMessage();
                        break;
 
-               case 'p':
+               case PqReplMsg_RequestPrimaryStatus:
                        ProcessStandbyPSRequestMessage();
                        break;
 
@@ -2752,7 +2754,7 @@ ProcessStandbyPSRequestMessage(void)
 
        /* construct the message... */
        resetStringInfo(&output_message);
-       pq_sendbyte(&output_message, 's');
+       pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
        pq_sendint64(&output_message, lsn);
        pq_sendint64(&output_message, (int64) 
U64FromFullTransactionId(fullOldestXidInCommit));
        pq_sendint64(&output_message, (int64) 
U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3366,7 @@ XLogSendPhysical(void)
         * OK to read and send the slice.
         */
        resetStringInfo(&output_message);
-       pq_sendbyte(&output_message, 'w');
+       pq_sendbyte(&output_message, PqReplMsg_WALData);
 
        pq_sendint64(&output_message, startptr);        /* dataStart */
        pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4137,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
 
        /* construct the message... */
        resetStringInfo(&output_message);
-       pq_sendbyte(&output_message, 'k');
+       pq_sendbyte(&output_message, PqReplMsg_PrimaryKeepAlive);
        pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : 
writePtr);
        pq_sendint64(&output_message, GetCurrentTimestamp());
        pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..077a1af46a5 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -86,4 +86,25 @@
 #define AUTH_REQ_SASL_FIN  12  /* Final SASL message */
 #define AUTH_REQ_MAX      AUTH_REQ_SASL_FIN    /* maximum AUTH_REQ_* value */
 
+
+/* Replication codes sent by the primary (wrapped in CopyData messages). */
+
+#define PqReplMsg_PrimaryKeepAlive             'k'
+#define PqReplMsg_PrimaryStatusUpdate  's'
+#define PqReplMsg_WALData                              'w'
+
+
+/* Replication codes sent by the standby (wrapped in CopyData messages). */
+
+#define PqReplMsg_HotStandbyFeedback   'h'
+#define PqReplMsg_RequestPrimaryStatus 'p'
+#define PqReplMsg_StandbyStatus                        'r'
+
+
+/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */
+
+#define PqBackupMsg_Manifest                   'm'
+#define PqBackupMsg_NewArchive                 'n'
+#define PqBackupMsg_ProgressReport             'p'
+
 #endif                                                 /* PROTOCOL_H */
-- 
2.39.5 (Apple Git-154)

Reply via email to