While working on 16fd03e95, I noticed that in each aggregate
deserialization function, in order to "receive" the bytea value that
is the serialized aggregate state, appendBinaryStringInfo is used to
append the bytes of the bytea value onto a temporary StringInfoData.
Using  appendBinaryStringInfo seems a bit wasteful here. We could
really just fake up a StringInfoData and point directly to the bytes
of the bytea value.

The best way I could think of to do this was to invent
initStringInfoFromString() which initialises a StringInfoData and has
the ->data field point directly at the specified buffer.  This will
mean that it would be unsafe to do any appendStringInfo* operations on
the resulting StringInfoData as enlargeStringInfo would try to
repalloc the data buffer, which might not even point to a palloc'd
string.  I thought it might be fine just to mention that in the
comments for the function, but we could probably do a bit better and
set maxlen to something like -1 and Assert() we never see -1 in the
various append functions.  I wasn't sure it was worth it, so didn't do
that.

I had a look around for other places that might be following the same
pattern. I only found range_recv() and XLogWalRcvProcessMsg().  I
didn't adjust the range_recv() one as I couldn't see how to do that
without casting away a const.  I did adjust the XLogWalRcvProcessMsg()
one and got rid of a global variable in the process.

I've attached the benchmark results I got after testing how the
modification changed the performance of string_agg_deserialize().

I was hoping this would have a slightly more impressive performance
impact, especially for string_agg() and array_agg() as the aggregate
states of those can be large.  However, in the test I ran, there's
only a very slight performance gain. I may just not have found the
best case, however.

David
#!/bin/bash

psql -c "drop table if exists t;" postgres
psql -c "create table t (a text not null);" postgres
psql -c "insert into t select x::text from generate_series(1,1000000)x;" 
postgres
psql -c "vacuum freeze t;" postgres
psql -c "select pg_prewarm('t');" postgres
for q in "select length(string_agg(a,',')) from t;"
do
        for i in {0..32}
        do
                        echo Testing with $i parallel workers: $q
                        psql -c "alter table t set (parallel_workers = $i);" 
postgres
                        echo $q > bench.sql
                        pgbench -f bench.sql -n -T 10 postgres | grep latency
        done
done


From 75d97a066ac81f5a50b1b7618ad9e240f5497860 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrow...@gmail.com>
Date: Sat, 11 Feb 2023 10:05:32 +1300
Subject: [PATCH v1 1/2] Optimize various aggregate deserialization functions

The serialized representation of an internal aggregate state is a bytea
value.  In each deserial function, in order to "receive" the bytea value
we appended it onto a short-lived StringInfoData using
appendBinaryStringInfo.  This was a little wasteful as it meant having to
palloc memory, copy a (possibly long) series of bytes then later pfree
that memory.  Instead of doing to this extra trouble, we can just fake up
a StringInfoData and point the data directly at the bytea's payload.  This
should help increase the performance of internal aggregate
deserialization.

To make this work, we introduce a function named initStringInfoFromString.
This just makes the given StringInfoData point directly to the input
buffer and sets the length to the given length.
---
 src/backend/utils/adt/array_userfuncs.c | 20 +++++--------
 src/backend/utils/adt/numeric.c         | 40 ++++++++++---------------
 src/backend/utils/adt/varlena.c         | 10 +++----
 src/common/stringinfo.c                 | 20 +++++++++++++
 src/include/lib/stringinfo.h            | 13 ++++++++
 5 files changed, 61 insertions(+), 42 deletions(-)

diff --git a/src/backend/utils/adt/array_userfuncs.c 
b/src/backend/utils/adt/array_userfuncs.c
index c8a8d33ca3..78fd0d2340 100644
--- a/src/backend/utils/adt/array_userfuncs.c
+++ b/src/backend/utils/adt/array_userfuncs.c
@@ -723,12 +723,11 @@ array_agg_deserialize(PG_FUNCTION_ARGS)
        sstate = PG_GETARG_BYTEA_PP(0);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        /* element_type */
        element_type = pq_getmsgint(&buf, 4);
@@ -825,7 +824,6 @@ array_agg_deserialize(PG_FUNCTION_ARGS)
        }
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        PG_RETURN_POINTER(result);
 }
@@ -1134,12 +1132,11 @@ array_agg_array_deserialize(PG_FUNCTION_ARGS)
        sstate = PG_GETARG_BYTEA_PP(0);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        /* element_type */
        element_type = pq_getmsgint(&buf, 4);
@@ -1197,7 +1194,6 @@ array_agg_array_deserialize(PG_FUNCTION_ARGS)
        memcpy(result->lbs, temp, sizeof(result->lbs));
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        PG_RETURN_POINTER(result);
 }
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index a83feea396..dec29d2b9b 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -5179,12 +5179,11 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
        init_var(&tmp_var);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        result = makeNumericAggStateCurrentContext(false);
 
@@ -5211,7 +5210,6 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS)
        result->nInfcount = pq_getmsgint64(&buf);
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        free_var(&tmp_var);
 
@@ -5295,12 +5293,11 @@ numeric_deserialize(PG_FUNCTION_ARGS)
        init_var(&tmp_var);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        result = makeNumericAggStateCurrentContext(false);
 
@@ -5331,7 +5328,6 @@ numeric_deserialize(PG_FUNCTION_ARGS)
        result->nInfcount = pq_getmsgint64(&buf);
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        free_var(&tmp_var);
 
@@ -5666,12 +5662,11 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
        init_var(&tmp_var);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        result = makePolyNumAggStateCurrentContext(false);
 
@@ -5695,7 +5690,6 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS)
 #endif
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        free_var(&tmp_var);
 
@@ -5857,12 +5851,11 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
        init_var(&tmp_var);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        result = makePolyNumAggStateCurrentContext(false);
 
@@ -5878,7 +5871,6 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
 #endif
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        free_var(&tmp_var);
 
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 170b3a3820..be43bf314b 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -5532,12 +5532,11 @@ string_agg_deserialize(PG_FUNCTION_ARGS)
        sstate = PG_GETARG_BYTEA_PP(0);
 
        /*
-        * Copy the bytea into a StringInfo so that we can "receive" it using 
the
-        * standard recv-function infrastructure.
+        * Fake up a StringInfo pointing to the bytea's value so we can 
"receive"
+        * the serialized aggregate state value.
         */
-       initStringInfo(&buf);
-       appendBinaryStringInfo(&buf,
-                                                  VARDATA_ANY(sstate), 
VARSIZE_ANY_EXHDR(sstate));
+       initStringInfoFromString(&buf, VARDATA_ANY(sstate),
+                                                        
VARSIZE_ANY_EXHDR(sstate));
 
        result = makeStringAggState(fcinfo);
 
@@ -5550,7 +5549,6 @@ string_agg_deserialize(PG_FUNCTION_ARGS)
        appendBinaryStringInfo(result, data, datalen);
 
        pq_getmsgend(&buf);
-       pfree(buf.data);
 
        PG_RETURN_POINTER(result);
 }
diff --git a/src/common/stringinfo.c b/src/common/stringinfo.c
index 05b22b5c53..2e0bc28527 100644
--- a/src/common/stringinfo.c
+++ b/src/common/stringinfo.c
@@ -65,6 +65,26 @@ initStringInfo(StringInfo str)
        resetStringInfo(str);
 }
 
+/*
+ * initStringInfoFromString
+ *
+ * Initialize a StringInfoData struct (with previously undefined contents)
+ * and sets the string to directly point to the contents of 'str' with the
+ * given 'len'.
+ *
+ * Caution: this is primarily used for very short-lived StringInfos.  The
+ * resulting StringInfo cannot be appended to unless 'buf' points directly to
+ * a memory address as returned by palloc.  Generally, you should be using
+ * appendBinaryStringInfo() instead of this function.
+ */
+void
+initStringInfoFromString(StringInfo str, char *buf, int len)
+{
+       str->data = buf;
+       str->len = str->maxlen = len;
+       str->cursor = 0;
+}
+
 /*
  * resetStringInfo
  *
diff --git a/src/include/lib/stringinfo.h b/src/include/lib/stringinfo.h
index 36a416f8e0..4e4d73658d 100644
--- a/src/include/lib/stringinfo.h
+++ b/src/include/lib/stringinfo.h
@@ -79,6 +79,19 @@ extern StringInfo makeStringInfo(void);
  */
 extern void initStringInfo(StringInfo str);
 
+/*------------------------
+ * initStringInfoFromString
+ * Initialize a StringInfoData struct (with previously undefined contents)
+ * and sets the string to directly point to the contents of 'str' with the
+ * given 'len'.
+ *
+ * Caution: this is primarily used for very short-lived StringInfos.  The
+ * resulting StringInfo cannot be appended to unless 'buf' points directly to
+ * a memory address as returned by palloc.  Generally, you should be using
+ * appendBinaryStringInfo() instead of this function.
+ */
+extern void initStringInfoFromString(StringInfo str, char *buf, int len);
+
 /*------------------------
  * resetStringInfo
  * Clears the current content of the StringInfo, if any. The
-- 
2.37.2

From 0b31533674adb90644298c638cc7dfe066ab5bd6 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrow...@gmail.com>
Date: Sun, 12 Feb 2023 16:45:15 +1300
Subject: [PATCH v1 2/2] Add more usages of initStringInfoFromString

This slightly reduces some memcpy work in walreceiver.c
---
 src/backend/replication/walreceiver.c | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index f6446da2d6..366b91f923 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason
 static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
 
 static StringInfoData reply_message;
-static StringInfoData incoming_message;
 
 /* Prototypes for private functions */
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
@@ -422,10 +421,12 @@ WalReceiverMain(void)
                                                                
LSN_FORMAT_ARGS(startpoint), startpointTLI)));
                        first_stream = false;
 
-                       /* Initialize LogstreamResult and buffers for 
processing messages */
+                       /*
+                        * Initialize LogstreamResult and reply buffer for 
processing
+                        * messages
+                        */
                        LogstreamResult.Write = LogstreamResult.Flush = 
GetXLogReplayRecPtr(NULL);
                        initStringInfo(&reply_message);
-                       initStringInfo(&incoming_message);
 
                        /* Initialize nap wakeup times. */
                        now = GetCurrentTimestamp();
@@ -843,19 +844,19 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
        TimestampTz sendTime;
        bool            replyRequested;
 
-       resetStringInfo(&incoming_message);
-
        switch (type)
        {
                case 'w':                               /* WAL records */
                        {
-                               /* copy message to StringInfo */
+                               StringInfoData incoming_message;
+
+                               /* set the message in the incoming_message 
StringInfo */
                                hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(int64);
                                if (len < hdrlen)
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         
errmsg_internal("invalid WAL message received from primary")));
-                               appendBinaryStringInfo(&incoming_message, buf, 
hdrlen);
+                               initStringInfoFromString(&incoming_message, 
buf, hdrlen);
 
                                /* read the fields */
                                dataStart = pq_getmsgint64(&incoming_message);
@@ -870,13 +871,15 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
                        }
                case 'k':                               /* Keepalive */
                        {
-                               /* copy message to StringInfo */
+                               StringInfoData incoming_message;
+
+                               /* set the message in the incoming_message 
StringInfo */
                                hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(char);
                                if (len != hdrlen)
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         
errmsg_internal("invalid keepalive message received from primary")));
-                               appendBinaryStringInfo(&incoming_message, buf, 
hdrlen);
+                               initStringInfoFromString(&incoming_message, 
buf, hdrlen);
 
                                /* read the fields */
                                walEnd = pq_getmsgint64(&incoming_message);
-- 
2.37.2

Reply via email to