Some places in walsender.c and basebackup_copy.c open-code the sending
of RowDescription and DataRow protocol messages. But there are already
more compact and robust solutions available for this, using
DestRemoteSimple and associated machinery, already in use in walsender.c.
The attached patches 0001 and 0002 are tiny bug fixes I found during this.
Patches 0003 and 0004 are the main refactorings. They should probably
be combined into one patch eventually, but this way the treatment of
RowDescription and DataRow is presented separately.From 069b9896832412555470e30b481df8cc1e6bebec Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 19 May 2022 12:26:45 +0200
Subject: [PATCH 1/4] Fix attlen in RowDescription of BASE_BACKUP response
Should be 8 for int8, not -1.
---
src/backend/replication/basebackup_copy.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/backend/replication/basebackup_copy.c
b/src/backend/replication/basebackup_copy.c
index cabb077240..1eed9d8c3f 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -361,7 +361,7 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
* would not be wide enough for this, as TimeLineID is unsigned.
*/
pq_sendint32(&buf, INT8OID); /* type oid */
- pq_sendint16(&buf, -1);
+ pq_sendint16(&buf, 8);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage(&buf);
base-commit: ac0e2d387a044faed310cbfe2fae78ecb0f6a4b6
--
2.36.1
From 045730e92bb67806f31fee033e1f87c69c8ae08e Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 10:56:55 +0200
Subject: [PATCH 2/4] Change timeline field of IDENTIFY_SYSTEM to int8
It was int4, but in the other replication commands, timelines are
returned as int8.
---
doc/src/sgml/protocol.sgml | 2 +-
src/backend/replication/walsender.c | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a94743b587..c0b89a3c01 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1838,7 +1838,7 @@ <title>Streaming Replication Protocol</title>
</varlistentry>
<varlistentry>
- <term><literal>timeline</literal> (<type>int4</type>)</term>
+ <term><literal>timeline</literal> (<type>int8</type>)</term>
<listitem>
<para>
Current timeline ID. Also useful to check that the standby is
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index e42671722a..fa60c92e13 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -444,7 +444,7 @@ IdentifySystem(void)
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
- INT4OID, -1, 0);
+ INT8OID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
TEXTOID, -1, 0);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
--
2.36.1
From 9f30600e54e79a6d1e0ca4feda1511d90092148a Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 10:55:09 +0200
Subject: [PATCH 3/4] Refactor sending of RowDescription messages in
replication protocol
Some routines open-coded the construction of RowDescription messages.
Instead, we have support for doing this using tuple descriptors and
DestRemoteSimple, so use that instead.
---
src/backend/access/common/tupdesc.c | 9 +++
src/backend/replication/basebackup_copy.c | 74 +++++++----------------
src/backend/replication/walsender.c | 29 +++------
3 files changed, 40 insertions(+), 72 deletions(-)
diff --git a/src/backend/access/common/tupdesc.c
b/src/backend/access/common/tupdesc.c
index 9f41b1e854..d6fb261e20 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
att->attcollation = InvalidOid;
break;
+ case OIDOID:
+ att->attlen = 4;
+ att->attbyval = true;
+ att->attalign = TYPALIGN_INT;
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = InvalidOid;
+ break;
+
default:
elog(ERROR, "unsupported type %u", oidtypeid);
}
diff --git a/src/backend/replication/basebackup_copy.c
b/src/backend/replication/basebackup_copy.c
index 1eed9d8c3f..df0471a7a4 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -25,11 +25,13 @@
*/
#include "postgres.h"
+#include "access/tupdesc.h"
#include "catalog/pg_type_d.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
+#include "tcop/dest.h"
#include "utils/timestamp.h"
typedef struct bbsink_copystream
@@ -336,35 +338,24 @@ SendCopyDone(void)
static void
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{
+ DestReceiver *dest;
+ TupleDesc tupdesc;
StringInfoData buf;
char str[MAXFNAMELEN];
Size len;
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 2); /* 2 fields */
-
- /* Field headers */
- pq_sendstring(&buf, "recptr");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- pq_sendstring(&buf, "tli");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
+ dest = CreateDestReceiver(DestRemoteSimple);
+ tupdesc = CreateTemplateTupleDesc(2);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID,
-1, 0);
/*
* int8 may seem like a surprising data type for this, but in theory
int4
* would not be wide enough for this, as TimeLineID is unsigned.
*/
- pq_sendint32(&buf, INT8OID); /* type oid */
- pq_sendint16(&buf, 8);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1,
0);
+
+ /* send RowDescription */
+ dest->rStartup(dest, CMD_SELECT, tupdesc);
/* Data row */
pq_beginmessage(&buf, 'D');
@@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
static void
SendTablespaceList(List *tablespaces)
{
+ DestReceiver *dest;
+ TupleDesc tupdesc;
StringInfoData buf;
ListCell *lc;
- /* Construct and send the directory information */
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 3); /* 3 fields */
-
- /* First field - spcoid */
- pq_sendstring(&buf, "spcoid");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, OIDOID); /* type oid */
- pq_sendint16(&buf, 4); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
-
- /* Second field - spclocation */
- pq_sendstring(&buf, "spclocation");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, TEXTOID);
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- /* Third field - size */
- pq_sendstring(&buf, "size");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, INT8OID);
- pq_sendint16(&buf, 8);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
+ dest = CreateDestReceiver(DestRemoteSimple);
+
+ tupdesc = CreateTemplateTupleDesc(3);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID,
-1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation",
TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1,
0);
+ /* send RowDescription */
+ dest->rStartup(dest, CMD_SELECT, tupdesc);
+
+ /* Construct and send the directory information */
foreach(lc, tablespaces)
{
tablespaceinfo *ti = lfirst(lc);
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index fa60c92e13..2c9e190685 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
static void
SendTimeLineHistory(TimeLineHistoryCmd *cmd)
{
+ DestReceiver *dest;
+ TupleDesc tupdesc;
StringInfoData buf;
char histfname[MAXFNAMELEN];
char path[MAXPGPATH];
@@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
off_t bytesleft;
Size len;
+ dest = CreateDestReceiver(DestRemoteSimple);
+
/*
* Reply with a result set with one row, and two columns. The first col
is
* the name of the history file, 2nd is the contents.
*/
+ tupdesc = CreateTemplateTupleDesc(2);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID,
-1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID,
-1, 0);
TLHistoryFileName(histfname, cmd->timeline);
TLHistoryFilePath(path, cmd->timeline);
/* Send a RowDescription message */
- pq_beginmessage(&buf, 'T');
- pq_sendint16(&buf, 2); /* 2 fields */
-
- /* first field */
- pq_sendstring(&buf, "filename"); /* col name */
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
-
- /* second field */
- pq_sendstring(&buf, "content"); /* col name */
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
- pq_endmessage(&buf);
+ dest->rStartup(dest, CMD_SELECT, tupdesc);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
--
2.36.1
From 0614958dfdc53c42a5fdc2db234ccf137323d12e Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Thu, 23 Jun 2022 15:25:32 +0200
Subject: [PATCH 4/4] Refactor sending of DataRow messages in replication
protocol
Some routines open-coded the construction of DataRow messages. Use
TupOutputState struct and associated functions instead, which was
already done in some places.
SendTimeLineHistory() is a bit more complicated and isn't converted by
this.
---
src/backend/access/common/printsimple.c | 11 ++++
src/backend/replication/basebackup_copy.c | 66 ++++++++---------------
2 files changed, 33 insertions(+), 44 deletions(-)
diff --git a/src/backend/access/common/printsimple.c
b/src/backend/access/common/printsimple.c
index e99aa279f6..5874026e58 100644
--- a/src/backend/access/common/printsimple.c
+++ b/src/backend/access/common/printsimple.c
@@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
}
break;
+ case OIDOID:
+ {
+ Oid num =
ObjectIdGetDatum(value);
+ char str[11]; /* 10
digits and '\0' */
+ int len;
+
+ len = pg_lltoa(num, str); // XXX this
is enough but should we make a pg_utoa?
+ pq_sendcountedtext(&buf, str, len,
false);
+ }
+ break;
+
default:
elog(ERROR, "unsupported type OID: %u",
attr->atttypid);
}
diff --git a/src/backend/replication/basebackup_copy.c
b/src/backend/replication/basebackup_copy.c
index df0471a7a4..b6e399a5c4 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -27,11 +27,13 @@
#include "access/tupdesc.h"
#include "catalog/pg_type_d.h"
+#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
#include "tcop/dest.h"
+#include "utils/builtins.h"
#include "utils/timestamp.h"
typedef struct bbsink_copystream
@@ -86,7 +88,6 @@ static void SendCopyOutResponse(void);
static void SendCopyDone(void);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static void SendTablespaceList(List *tablespaces);
-static void send_int8_string(StringInfoData *buf, int64 intval);
static const bbsink_ops bbsink_copystream_ops = {
.begin_backup = bbsink_copystream_begin_backup,
@@ -339,10 +340,10 @@ static void
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{
DestReceiver *dest;
+ TupOutputState *tstate;
TupleDesc tupdesc;
- StringInfoData buf;
- char str[MAXFNAMELEN];
- Size len;
+ Datum values[2];
+ bool nulls[2] = {0};
dest = CreateDestReceiver(DestRemoteSimple);
@@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1,
0);
/* send RowDescription */
- dest->rStartup(dest, CMD_SELECT, tupdesc);
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 2); /* number of columns */
-
- len = snprintf(str, sizeof(str),
- "%X/%X", LSN_FORMAT_ARGS(ptr));
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- len = snprintf(str, sizeof(str), "%u", tli);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
+ values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
+ values[1] = Int64GetDatum(tli);
+ do_tup_output(tstate, values, nulls);
- pq_endmessage(&buf);
+ end_tup_output(tstate);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
@@ -383,6 +376,7 @@ static void
SendTablespaceList(List *tablespaces)
{
DestReceiver *dest;
+ TupOutputState *tstate;
TupleDesc tupdesc;
StringInfoData buf;
ListCell *lc;
@@ -395,51 +389,35 @@ SendTablespaceList(List *tablespaces)
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1,
0);
/* send RowDescription */
- dest->rStartup(dest, CMD_SELECT, tupdesc);
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* Construct and send the directory information */
foreach(lc, tablespaces)
{
tablespaceinfo *ti = lfirst(lc);
+ Datum values[3];
+ bool nulls[3] = {0};
/* Send one datarow message */
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 3); /* number of columns */
if (ti->path == NULL)
{
- pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
- pq_sendint32(&buf, -1);
+ nulls[0] = true;
+ nulls[1] = true;
}
else
{
- Size len;
-
- len = strlen(ti->oid);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->oid, len);
-
- len = strlen(ti->path);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->path, len);
+ values[0] = ObjectIdGetDatum(ti->oid);
+ values[1] = CStringGetTextDatum(ti->path);
}
if (ti->size >= 0)
- send_int8_string(&buf, ti->size / 1024);
+ values[2] = Int64GetDatum(ti->size / 1024);
else
- pq_sendint32(&buf, -1); /* NULL */
+ nulls[2] = true;
- pq_endmessage(&buf);
+ do_tup_output(tstate, values, nulls);
}
-}
-
-/*
- * Send a 64-bit integer as a string via the wire protocol.
- */
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
- char is[32];
- sprintf(is, INT64_FORMAT, intval);
- pq_sendint32(buf, strlen(is));
- pq_sendbytes(buf, is, strlen(is));
+ end_tup_output(tstate);
}
--
2.36.1