While working on 16fd03e95, I noticed that in each aggregate deserialization function, in order to "receive" the bytea value that is the serialized aggregate state, appendBinaryStringInfo is used to append the bytes of the bytea value onto a temporary StringInfoData. Using appendBinaryStringInfo seems a bit wasteful here. We could really just fake up a StringInfoData and point directly to the bytes of the bytea value.
The best way I could think of to do this was to invent initStringInfoFromString() which initialises a StringInfoData and has the ->data field point directly at the specified buffer. This will mean that it would be unsafe to do any appendStringInfo* operations on the resulting StringInfoData as enlargeStringInfo would try to repalloc the data buffer, which might not even point to a palloc'd string. I thought it might be fine just to mention that in the comments for the function, but we could probably do a bit better and set maxlen to something like -1 and Assert() we never see -1 in the various append functions. I wasn't sure it was worth it, so didn't do that. I had a look around for other places that might be following the same pattern. I only found range_recv() and XLogWalRcvProcessMsg(). I didn't adjust the range_recv() one as I couldn't see how to do that without casting away a const. I did adjust the XLogWalRcvProcessMsg() one and got rid of a global variable in the process. I've attached the benchmark results I got after testing how the modification changed the performance of string_agg_deserialize(). I was hoping this would have a slightly more impressive performance impact, especially for string_agg() and array_agg() as the aggregate states of those can be large. However, in the test I ran, there's only a very slight performance gain. I may just not have found the best case, however. David
#!/bin/bash psql -c "drop table if exists t;" postgres psql -c "create table t (a text not null);" postgres psql -c "insert into t select x::text from generate_series(1,1000000)x;" postgres psql -c "vacuum freeze t;" postgres psql -c "select pg_prewarm('t');" postgres for q in "select length(string_agg(a,',')) from t;" do for i in {0..32} do echo Testing with $i parallel workers: $q psql -c "alter table t set (parallel_workers = $i);" postgres echo $q > bench.sql pgbench -f bench.sql -n -T 10 postgres | grep latency done done
From 75d97a066ac81f5a50b1b7618ad9e240f5497860 Mon Sep 17 00:00:00 2001 From: David Rowley <dgrow...@gmail.com> Date: Sat, 11 Feb 2023 10:05:32 +1300 Subject: [PATCH v1 1/2] Optimize various aggregate deserialization functions The serialized representation of an internal aggregate state is a bytea value. In each deserial function, in order to "receive" the bytea value we appended it onto a short-lived StringInfoData using appendBinaryStringInfo. This was a little wasteful as it meant having to palloc memory, copy a (possibly long) series of bytes then later pfree that memory. Instead of doing to this extra trouble, we can just fake up a StringInfoData and point the data directly at the bytea's payload. This should help increase the performance of internal aggregate deserialization. To make this work, we introduce a function named initStringInfoFromString. This just makes the given StringInfoData point directly to the input buffer and sets the length to the given length. --- src/backend/utils/adt/array_userfuncs.c | 20 +++++-------- src/backend/utils/adt/numeric.c | 40 ++++++++++--------------- src/backend/utils/adt/varlena.c | 10 +++---- src/common/stringinfo.c | 20 +++++++++++++ src/include/lib/stringinfo.h | 13 ++++++++ 5 files changed, 61 insertions(+), 42 deletions(-) diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c index c8a8d33ca3..78fd0d2340 100644 --- a/src/backend/utils/adt/array_userfuncs.c +++ b/src/backend/utils/adt/array_userfuncs.c @@ -723,12 +723,11 @@ array_agg_deserialize(PG_FUNCTION_ARGS) sstate = PG_GETARG_BYTEA_PP(0); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); /* element_type */ element_type = pq_getmsgint(&buf, 4); @@ -825,7 +824,6 @@ array_agg_deserialize(PG_FUNCTION_ARGS) } pq_getmsgend(&buf); - pfree(buf.data); PG_RETURN_POINTER(result); } @@ -1134,12 +1132,11 @@ array_agg_array_deserialize(PG_FUNCTION_ARGS) sstate = PG_GETARG_BYTEA_PP(0); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); /* element_type */ element_type = pq_getmsgint(&buf, 4); @@ -1197,7 +1194,6 @@ array_agg_array_deserialize(PG_FUNCTION_ARGS) memcpy(result->lbs, temp, sizeof(result->lbs)); pq_getmsgend(&buf); - pfree(buf.data); PG_RETURN_POINTER(result); } diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c index a83feea396..dec29d2b9b 100644 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -5179,12 +5179,11 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS) init_var(&tmp_var); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); result = makeNumericAggStateCurrentContext(false); @@ -5211,7 +5210,6 @@ numeric_avg_deserialize(PG_FUNCTION_ARGS) result->nInfcount = pq_getmsgint64(&buf); pq_getmsgend(&buf); - pfree(buf.data); free_var(&tmp_var); @@ -5295,12 +5293,11 @@ numeric_deserialize(PG_FUNCTION_ARGS) init_var(&tmp_var); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); result = makeNumericAggStateCurrentContext(false); @@ -5331,7 +5328,6 @@ numeric_deserialize(PG_FUNCTION_ARGS) result->nInfcount = pq_getmsgint64(&buf); pq_getmsgend(&buf); - pfree(buf.data); free_var(&tmp_var); @@ -5666,12 +5662,11 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS) init_var(&tmp_var); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); result = makePolyNumAggStateCurrentContext(false); @@ -5695,7 +5690,6 @@ numeric_poly_deserialize(PG_FUNCTION_ARGS) #endif pq_getmsgend(&buf); - pfree(buf.data); free_var(&tmp_var); @@ -5857,12 +5851,11 @@ int8_avg_deserialize(PG_FUNCTION_ARGS) init_var(&tmp_var); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); result = makePolyNumAggStateCurrentContext(false); @@ -5878,7 +5871,6 @@ int8_avg_deserialize(PG_FUNCTION_ARGS) #endif pq_getmsgend(&buf); - pfree(buf.data); free_var(&tmp_var); diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 170b3a3820..be43bf314b 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -5532,12 +5532,11 @@ string_agg_deserialize(PG_FUNCTION_ARGS) sstate = PG_GETARG_BYTEA_PP(0); /* - * Copy the bytea into a StringInfo so that we can "receive" it using the - * standard recv-function infrastructure. + * Fake up a StringInfo pointing to the bytea's value so we can "receive" + * the serialized aggregate state value. */ - initStringInfo(&buf); - appendBinaryStringInfo(&buf, - VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + initStringInfoFromString(&buf, VARDATA_ANY(sstate), + VARSIZE_ANY_EXHDR(sstate)); result = makeStringAggState(fcinfo); @@ -5550,7 +5549,6 @@ string_agg_deserialize(PG_FUNCTION_ARGS) appendBinaryStringInfo(result, data, datalen); pq_getmsgend(&buf); - pfree(buf.data); PG_RETURN_POINTER(result); } diff --git a/src/common/stringinfo.c b/src/common/stringinfo.c index 05b22b5c53..2e0bc28527 100644 --- a/src/common/stringinfo.c +++ b/src/common/stringinfo.c @@ -65,6 +65,26 @@ initStringInfo(StringInfo str) resetStringInfo(str); } +/* + * initStringInfoFromString + * + * Initialize a StringInfoData struct (with previously undefined contents) + * and sets the string to directly point to the contents of 'str' with the + * given 'len'. + * + * Caution: this is primarily used for very short-lived StringInfos. The + * resulting StringInfo cannot be appended to unless 'buf' points directly to + * a memory address as returned by palloc. Generally, you should be using + * appendBinaryStringInfo() instead of this function. + */ +void +initStringInfoFromString(StringInfo str, char *buf, int len) +{ + str->data = buf; + str->len = str->maxlen = len; + str->cursor = 0; +} + /* * resetStringInfo * diff --git a/src/include/lib/stringinfo.h b/src/include/lib/stringinfo.h index 36a416f8e0..4e4d73658d 100644 --- a/src/include/lib/stringinfo.h +++ b/src/include/lib/stringinfo.h @@ -79,6 +79,19 @@ extern StringInfo makeStringInfo(void); */ extern void initStringInfo(StringInfo str); +/*------------------------ + * initStringInfoFromString + * Initialize a StringInfoData struct (with previously undefined contents) + * and sets the string to directly point to the contents of 'str' with the + * given 'len'. + * + * Caution: this is primarily used for very short-lived StringInfos. The + * resulting StringInfo cannot be appended to unless 'buf' points directly to + * a memory address as returned by palloc. Generally, you should be using + * appendBinaryStringInfo() instead of this function. + */ +extern void initStringInfoFromString(StringInfo str, char *buf, int len); + /*------------------------ * resetStringInfo * Clears the current content of the StringInfo, if any. The -- 2.37.2
From 0b31533674adb90644298c638cc7dfe066ab5bd6 Mon Sep 17 00:00:00 2001 From: David Rowley <dgrow...@gmail.com> Date: Sun, 12 Feb 2023 16:45:15 +1300 Subject: [PATCH v1 2/2] Add more usages of initStringInfoFromString This slightly reduces some memcpy work in walreceiver.c --- src/backend/replication/walreceiver.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f6446da2d6..366b91f923 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; static StringInfoData reply_message; -static StringInfoData incoming_message; /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); @@ -422,10 +421,12 @@ WalReceiverMain(void) LSN_FORMAT_ARGS(startpoint), startpointTLI))); first_stream = false; - /* Initialize LogstreamResult and buffers for processing messages */ + /* + * Initialize LogstreamResult and reply buffer for processing + * messages + */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); - initStringInfo(&incoming_message); /* Initialize nap wakeup times. */ now = GetCurrentTimestamp(); @@ -843,19 +844,19 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) TimestampTz sendTime; bool replyRequested; - resetStringInfo(&incoming_message); - switch (type) { case 'w': /* WAL records */ { - /* copy message to StringInfo */ + StringInfoData incoming_message; + + /* set the message in the incoming_message StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); - appendBinaryStringInfo(&incoming_message, buf, hdrlen); + initStringInfoFromString(&incoming_message, buf, hdrlen); /* read the fields */ dataStart = pq_getmsgint64(&incoming_message); @@ -870,13 +871,15 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) } case 'k': /* Keepalive */ { - /* copy message to StringInfo */ + StringInfoData incoming_message; + + /* set the message in the incoming_message StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); - appendBinaryStringInfo(&incoming_message, buf, hdrlen); + initStringInfoFromString(&incoming_message, buf, hdrlen); /* read the fields */ walEnd = pq_getmsgint64(&incoming_message); -- 2.37.2