Some places in walsender.c and basebackup_copy.c open-code the sending of RowDescription and DataRow protocol messages. But there are already more compact and robust solutions available for this, using DestRemoteSimple and associated machinery, already in use in walsender.c.

The attached patches 0001 and 0002 are tiny bug fixes I found during this.

Patches 0003 and 0004 are the main refactorings. They should probably be combined into one patch eventually, but this way the treatment of RowDescription and DataRow is presented separately.
From 069b9896832412555470e30b481df8cc1e6bebec Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 19 May 2022 12:26:45 +0200
Subject: [PATCH 1/4] Fix attlen in RowDescription of BASE_BACKUP response

Should be 8 for int8, not -1.
---
 src/backend/replication/basebackup_copy.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/backend/replication/basebackup_copy.c 
b/src/backend/replication/basebackup_copy.c
index cabb077240..1eed9d8c3f 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -361,7 +361,7 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
         * would not be wide enough for this, as TimeLineID is unsigned.
         */
        pq_sendint32(&buf, INT8OID);    /* type oid */
-       pq_sendint16(&buf, -1);
+       pq_sendint16(&buf, 8);
        pq_sendint32(&buf, 0);
        pq_sendint16(&buf, 0);
        pq_endmessage(&buf);

base-commit: ac0e2d387a044faed310cbfe2fae78ecb0f6a4b6
-- 
2.36.1

From 045730e92bb67806f31fee033e1f87c69c8ae08e Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 10:56:55 +0200
Subject: [PATCH 2/4] Change timeline field of IDENTIFY_SYSTEM to int8

It was int4, but in the other replication commands, timelines are
returned as int8.
---
 doc/src/sgml/protocol.sgml          | 2 +-
 src/backend/replication/walsender.c | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a94743b587..c0b89a3c01 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1838,7 +1838,7 @@ <title>Streaming Replication Protocol</title>
        </varlistentry>
 
        <varlistentry>
-        <term><literal>timeline</literal> (<type>int4</type>)</term>
+        <term><literal>timeline</literal> (<type>int8</type>)</term>
         <listitem>
          <para>
           Current timeline ID. Also useful to check that the standby is
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index e42671722a..fa60c92e13 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -444,7 +444,7 @@ IdentifySystem(void)
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
                                                          TEXTOID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
-                                                         INT4OID, -1, 0);
+                                                         INT8OID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
                                                          TEXTOID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
-- 
2.36.1

From 9f30600e54e79a6d1e0ca4feda1511d90092148a Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 10:55:09 +0200
Subject: [PATCH 3/4] Refactor sending of RowDescription messages in
 replication protocol

Some routines open-coded the construction of RowDescription messages.
Instead, we have support for doing this using tuple descriptors and
DestRemoteSimple, so use that instead.
---
 src/backend/access/common/tupdesc.c       |  9 +++
 src/backend/replication/basebackup_copy.c | 74 +++++++----------------
 src/backend/replication/walsender.c       | 29 +++------
 3 files changed, 40 insertions(+), 72 deletions(-)

diff --git a/src/backend/access/common/tupdesc.c 
b/src/backend/access/common/tupdesc.c
index 9f41b1e854..d6fb261e20 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
                        att->attcollation = InvalidOid;
                        break;
 
+               case OIDOID:
+                       att->attlen = 4;
+                       att->attbyval = true;
+                       att->attalign = TYPALIGN_INT;
+                       att->attstorage = TYPSTORAGE_PLAIN;
+                       att->attcompression = InvalidCompressionMethod;
+                       att->attcollation = InvalidOid;
+                       break;
+
                default:
                        elog(ERROR, "unsupported type %u", oidtypeid);
        }
diff --git a/src/backend/replication/basebackup_copy.c 
b/src/backend/replication/basebackup_copy.c
index 1eed9d8c3f..df0471a7a4 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -25,11 +25,13 @@
  */
 #include "postgres.h"
 
+#include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
+#include "tcop/dest.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -336,35 +338,24 @@ SendCopyDone(void)
 static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        char            str[MAXFNAMELEN];
        Size            len;
 
-       pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint16(&buf, 2);          /* 2 fields */
-
-       /* Field headers */
-       pq_sendstring(&buf, "recptr");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-
-       pq_sendstring(&buf, "tli");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
+       dest = CreateDestReceiver(DestRemoteSimple);
 
+       tupdesc = CreateTemplateTupleDesc(2);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, 
-1, 0);
        /*
         * int8 may seem like a surprising data type for this, but in theory 
int4
         * would not be wide enough for this, as TimeLineID is unsigned.
         */
-       pq_sendint32(&buf, INT8OID);    /* type oid */
-       pq_sendint16(&buf, 8);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_endmessage(&buf);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 
0);
+
+       /* send RowDescription */
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
 
        /* Data row */
        pq_beginmessage(&buf, 'D');
@@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 static void
 SendTablespaceList(List *tablespaces)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        ListCell   *lc;
 
-       /* Construct and send the directory information */
-       pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint16(&buf, 3);          /* 3 fields */
-
-       /* First field - spcoid */
-       pq_sendstring(&buf, "spcoid");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, OIDOID); /* type oid */
-       pq_sendint16(&buf, 4);          /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-
-       /* Second field - spclocation */
-       pq_sendstring(&buf, "spclocation");
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_sendint32(&buf, TEXTOID);
-       pq_sendint16(&buf, -1);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-
-       /* Third field - size */
-       pq_sendstring(&buf, "size");
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_sendint32(&buf, INT8OID);
-       pq_sendint16(&buf, 8);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_endmessage(&buf);
+       dest = CreateDestReceiver(DestRemoteSimple);
+
+       tupdesc = CreateTemplateTupleDesc(3);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, 
-1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", 
TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 
0);
 
+       /* send RowDescription */
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
+
+       /* Construct and send the directory information */
        foreach(lc, tablespaces)
        {
                tablespaceinfo *ti = lfirst(lc);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index fa60c92e13..2c9e190685 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
 static void
 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        char            histfname[MAXFNAMELEN];
        char            path[MAXPGPATH];
@@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
        off_t           bytesleft;
        Size            len;
 
+       dest = CreateDestReceiver(DestRemoteSimple);
+
        /*
         * Reply with a result set with one row, and two columns. The first col 
is
         * the name of the history file, 2nd is the contents.
         */
+       tupdesc = CreateTemplateTupleDesc(2);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, 
-1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, 
-1, 0);
 
        TLHistoryFileName(histfname, cmd->timeline);
        TLHistoryFilePath(path, cmd->timeline);
 
        /* Send a RowDescription message */
-       pq_beginmessage(&buf, 'T');
-       pq_sendint16(&buf, 2);          /* 2 fields */
-
-       /* first field */
-       pq_sendstring(&buf, "filename");        /* col name */
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);         /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-
-       /* second field */
-       pq_sendstring(&buf, "content"); /* col name */
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);         /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-       pq_endmessage(&buf);
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
 
        /* Send a DataRow message */
        pq_beginmessage(&buf, 'D');
-- 
2.36.1

From 0614958dfdc53c42a5fdc2db234ccf137323d12e Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 15:25:32 +0200
Subject: [PATCH 4/4] Refactor sending of DataRow messages in replication
 protocol

Some routines open-coded the construction of DataRow messages.  Use
TupOutputState struct and associated functions instead, which was
already done in some places.

SendTimeLineHistory() is a bit more complicated and isn't converted by
this.
---
 src/backend/access/common/printsimple.c   | 11 ++++
 src/backend/replication/basebackup_copy.c | 66 ++++++++---------------
 2 files changed, 33 insertions(+), 44 deletions(-)

diff --git a/src/backend/access/common/printsimple.c 
b/src/backend/access/common/printsimple.c
index e99aa279f6..5874026e58 100644
--- a/src/backend/access/common/printsimple.c
+++ b/src/backend/access/common/printsimple.c
@@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
                                }
                                break;
 
+                       case OIDOID:
+                               {
+                                       Oid                     num = 
ObjectIdGetDatum(value);
+                                       char            str[11];        /* 10 
digits and '\0' */
+                                       int                     len;
+
+                                       len = pg_lltoa(num, str); // XXX this 
is enough but should we make a pg_utoa?
+                                       pq_sendcountedtext(&buf, str, len, 
false);
+                               }
+                               break;
+
                        default:
                                elog(ERROR, "unsupported type OID: %u", 
attr->atttypid);
                }
diff --git a/src/backend/replication/basebackup_copy.c 
b/src/backend/replication/basebackup_copy.c
index df0471a7a4..b6e399a5c4 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -27,11 +27,13 @@
 
 #include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
+#include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
 #include "tcop/dest.h"
+#include "utils/builtins.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -86,7 +88,6 @@ static void SendCopyOutResponse(void);
 static void SendCopyDone(void);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static void SendTablespaceList(List *tablespaces);
-static void send_int8_string(StringInfoData *buf, int64 intval);
 
 static const bbsink_ops bbsink_copystream_ops = {
        .begin_backup = bbsink_copystream_begin_backup,
@@ -339,10 +340,10 @@ static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
        DestReceiver *dest;
+       TupOutputState *tstate;
        TupleDesc       tupdesc;
-       StringInfoData buf;
-       char            str[MAXFNAMELEN];
-       Size            len;
+       Datum           values[2];
+       bool            nulls[2] = {0};
 
        dest = CreateDestReceiver(DestRemoteSimple);
 
@@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 
0);
 
        /* send RowDescription */
-       dest->rStartup(dest, CMD_SELECT, tupdesc);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
        /* Data row */
-       pq_beginmessage(&buf, 'D');
-       pq_sendint16(&buf, 2);          /* number of columns */
-
-       len = snprintf(str, sizeof(str),
-                                  "%X/%X", LSN_FORMAT_ARGS(ptr));
-       pq_sendint32(&buf, len);
-       pq_sendbytes(&buf, str, len);
-
-       len = snprintf(str, sizeof(str), "%u", tli);
-       pq_sendint32(&buf, len);
-       pq_sendbytes(&buf, str, len);
+       values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
+       values[1] = Int64GetDatum(tli);
+       do_tup_output(tstate, values, nulls);
 
-       pq_endmessage(&buf);
+       end_tup_output(tstate);
 
        /* Send a CommandComplete message */
        pq_puttextmessage('C', "SELECT");
@@ -383,6 +376,7 @@ static void
 SendTablespaceList(List *tablespaces)
 {
        DestReceiver *dest;
+       TupOutputState *tstate;
        TupleDesc       tupdesc;
        StringInfoData buf;
        ListCell   *lc;
@@ -395,51 +389,35 @@ SendTablespaceList(List *tablespaces)
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 
0);
 
        /* send RowDescription */
-       dest->rStartup(dest, CMD_SELECT, tupdesc);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
        /* Construct and send the directory information */
        foreach(lc, tablespaces)
        {
                tablespaceinfo *ti = lfirst(lc);
+               Datum           values[3];
+               bool            nulls[3] = {0};
 
                /* Send one datarow message */
                pq_beginmessage(&buf, 'D');
                pq_sendint16(&buf, 3);  /* number of columns */
                if (ti->path == NULL)
                {
-                       pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
-                       pq_sendint32(&buf, -1);
+                       nulls[0] = true;
+                       nulls[1] = true;
                }
                else
                {
-                       Size            len;
-
-                       len = strlen(ti->oid);
-                       pq_sendint32(&buf, len);
-                       pq_sendbytes(&buf, ti->oid, len);
-
-                       len = strlen(ti->path);
-                       pq_sendint32(&buf, len);
-                       pq_sendbytes(&buf, ti->path, len);
+                       values[0] = ObjectIdGetDatum(ti->oid);
+                       values[1] = CStringGetTextDatum(ti->path);
                }
                if (ti->size >= 0)
-                       send_int8_string(&buf, ti->size / 1024);
+                       values[2] = Int64GetDatum(ti->size / 1024);
                else
-                       pq_sendint32(&buf, -1); /* NULL */
+                       nulls[2] = true;
 
-               pq_endmessage(&buf);
+               do_tup_output(tstate, values, nulls);
        }
-}
-
-/*
- * Send a 64-bit integer as a string via the wire protocol.
- */
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
-       char            is[32];
 
-       sprintf(is, INT64_FORMAT, intval);
-       pq_sendint32(buf, strlen(is));
-       pq_sendbytes(buf, is, strlen(is));
+       end_tup_output(tstate);
 }
-- 
2.36.1

Reply via email to