On 13.08.2018 23:06, Andrew Dunstan wrote:
On 08/13/2018 02:47 PM, Robert Haas wrote:
On Fri, Aug 10, 2018 at 5:55 PM, Andrew Dunstan
<andrew.duns...@2ndquadrant.com> wrote:
This thread appears to have gone quiet. What concerns me is that there
appears to be substantial disagreement between the author and the
reviewers.
Since the last thing was this new patch it should really have been
put back
into "needs review" (my fault to some extent - I missed that). So
rather
than return the patch with feedfack I'm going to set it to "needs
review"
and move it to the next CF. However, if we can't arrive at a
consensus about
the direction during the next CF it should be returned with feedback.
I agree with the critiques from Robbie Harwood and Michael Paquier
that the way in that compression is being hooked into the existing
architecture looks like a kludge. I'm not sure I know exactly how it
should be done, but the current approach doesn't look natural; it
looks like it was bolted on. I agree with the critique from Peter
Eisentraut and others that we should not go around and add -Z as a
command-line option to all of our tools; this feature hasn't yet
proved itself to be useful enough to justify that. Better to let
people specify it via a connection string option if they want it. I
think Thomas Munro was right to ask about what will happen when
different compression libraries are in use, and I think failing
uncleanly is quite unacceptable. I think there needs to be some
method for negotiating the compression type; the client can, for
example, provide a comma-separated list of methods it supports in
preference order, and the server can pick the first one it likes. In
short, I think that a number of people have provided really good
feedback on this patch, and I suggest to Konstantin that he should
consider accepting all of those suggestions.
Commit ae65f6066dc3d19a55f4fdcd3b30003c5ad8dbed tried to introduce
some facilities that can be used for protocol version negotiation as
new features are added, but this patch doesn't use them. It looks to
me like it instead just breaks backward compatibility. The new
'compression' option won't be recognized by older servers. If it were
spelled '_pq_.compression' then the facilities in that commit would
cause a NegotiateProtocolVersion message to be sent by servers which
have that commit but don't support the compression option. I'm not
exactly sure what will happen on even-older servers that don't have
that commit either, but I hope they'll treat it as a GUC name; note
that setting an unknown GUC name with a namespace prefix is not an
error, but setting one without such a prefix IS an ERROR. Servers
which do support compression would respond with a message indicating
that compression had been enabled or, maybe, just start sending back
compressed-packet messages, if we go with including some framing in
the libpq protocol.
Excellent summary, and well argued recommendations, thanks. I've
changed the status to waiting on author.
New version of the patch is attached: I removed -Z options form pgbench
and psql and add checking that server and client are implementing the
same compression algorithm.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/configure b/configure
index 0aafd9c..fc5685c 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ ELF_SYS
EGREP
GREP
with_zlib
+with_zstd
with_system_tzdata
with_libxslt
with_libxml
@@ -863,6 +864,7 @@ with_libxml
with_libxslt
with_system_tzdata
with_zlib
+with_zstd
with_gnu_ld
enable_largefile
enable_float4_byval
@@ -8017,6 +8019,86 @@ fi
#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+ withval=$with_zstd;
+ case $withval in
+ yes)
+ ;;
+ no)
+ :
+ ;;
+ *)
+ as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+ ;;
+ esac
+
+else
+ with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zstd_ZSTD_compress=yes
+else
+ ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+ LIBS="-lzstd $LIBS"
+
+else
+ as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
+#
# Elf
#
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 498b8df..40c3187 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1115,6 +1115,17 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
</listitem>
</varlistentry>
+ <varlistentry id="libpq-connect-compression" xreflabel="compression">
+ <term><literal>compression</literal></term>
+ <listitem>
+ <para>
+ Request compression of libpq traffic. If server is supporting compression, then all libpq messages send both from client to server and
+ visa versa will be compressed. Right now compression algorithm is hardcoded: is it is either zlib (default), either zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
<term><literal>client_encoding</literal></term>
<listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cfc805f..c328dd7 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
such as <command>COPY</command>.
</para>
+ <para>
+ Is is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+ Compression is especialy useful for importing/exprorting data to/from database using COPY command
+ and for replication (oth physical and logical). Also compression can reduce server response time
+ in case of queries, requestion larger amount of data (for example returning JSON, BLOBs, text,...)
+ Right now compression algorithm is hardcoded: is it is either zlib (default), either zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ </para>
+
<sect2 id="protocol-message-concepts">
<title>Messaging Overview</title>
@@ -263,6 +272,18 @@
</varlistentry>
<varlistentry>
+ <term>CompressionOk</term>
+ <listitem>
+ <para>
+ Server acknowledge using compression for client-server communication protocol.
+ Compression can be requested by client by including "compression" option in connection string.
+ Right now compression algorithm is hardcoded, but in future client and server may negotiate to
+ choose proper compression algorithm.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term>AuthenticationOk</term>
<listitem>
<para>
@@ -3410,6 +3431,43 @@ AuthenticationSASLFinal (B)
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+CompressionOk (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('z')
+</term>
+<listitem>
+<para>
+ Acknowledge use of compression for protocol data. After receiving this message bother server and client are switched to compression mode
+ and exchange compressed messages.
+</para>
+</listitem>
+
+</varlistentry>
+<varlistentry>
+<term>
+ Byte1
+</term>
+<listitem>
+<para>
+ Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
<varlistentry>
<term>
@@ -5826,6 +5884,19 @@ StartupMessage (F)
</para>
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+ <literal>compression</literal>
+</term>
+<listitem>
+<para>
+ Request compression of libpq traffic. Value can be
+ <literal>0</literal>, <literal>1</literal>, <literal>true</literal>,
+ <literal>false</literal>, <literal>on</literal>, <literal>off.</literal>.
+ By default compression is disabled.
+</para>
+</listitem>
+</varlistentry>
</variablelist>
In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 95d090e..b59629c 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm = @with_llvm@
with_system_tzdata = @with_system_tzdata@
with_uuid = @with_uuid@
with_zlib = @with_zlib@
+with_zstd = @with_zstd@
enable_rpath = @enable_rpath@
enable_nls = @enable_nls@
enable_debug = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 25af514..a8e461a 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -51,6 +51,14 @@ ifeq ($(with_systemd),yes)
LIBS += -lsystemd
endif
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
##########################################################################
all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..d48fdd0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
+#include "common/zpq_stream.h"
/*
* Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+static ZpqStream* PqStream;
+
+
/*
* Message status
*/
@@ -185,6 +189,34 @@ PQcommMethods *PqCommMethods = &PqCommSocketMethods;
WaitEventSet *FeBeWaitSet;
+/* --------------------------------
+ * pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+ if (port->use_compression)
+ {
+ char compression[2];
+ int rc;
+ compression[0] = 'z'; /* Request compression message */
+ compression[1] = zpq_algorithm();
+ /* Switch on compression at client side */
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, &compression, sizeof compression)) < 0
+ && errno == EINTR);
+ if (rc != 2)
+ return -1;
+
+ /* initialize compression */
+ PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+ }
+ return 0;
+}
/* --------------------------------
* pq_init - initialize libpq at backend startup
@@ -225,6 +257,7 @@ pq_init(void)
NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
}
/* --------------------------------
@@ -282,6 +315,9 @@ socket_close(int code, Datum arg)
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
+ /* Release compression streams */
+ zpq_free(PqStream);
+
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +968,14 @@ socket_set_nonblocking(bool nonblocking)
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
- * returns 0 if OK, EOF if trouble
+ * returns number of read bytes, EOF if trouble
* --------------------------------
*/
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ int r;
+
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
@@ -953,21 +991,37 @@ pq_recvbuf(void)
}
/* Ensure that we're in blocking mode */
- socket_set_nonblocking(false);
+ socket_set_nonblocking(nowait);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
- int r;
-
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ size_t processed = 0;
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ PqRecvLength += processed;
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ char const* msg = zpq_error(PqStream);
+ if (msg == NULL)
+ msg = "end of stream";
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to decompress data: %s", msg)));
+ return EOF;
+ }
if (errno == EINTR)
continue; /* Ok if interrupted */
+ if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return 0;
+
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
@@ -988,7 +1042,7 @@ pq_recvbuf(void)
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
- return 0;
+ return r;
}
}
@@ -1003,7 +1057,7 @@ pq_getbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1076,7 @@ pq_peekbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1097,11 @@ pq_getbyte_if_available(unsigned char *c)
Assert(PqCommReadingMsg);
- if (PqRecvPointer < PqRecvLength)
+ if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
- /* Put the socket into non-blocking mode */
- socket_set_nonblocking(true);
-
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
- {
- /*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket). Report
- * other errors.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client would
- * cause recursion to here, leading to stack overflow and core
- * dump! This message must go *only* to the postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
- r = EOF;
- }
-
return r;
}
@@ -1101,7 +1122,7 @@ pq_getbytes(char *s, size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1156,7 @@ pq_discardbytes(size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1197,7 @@ pq_getstring(StringInfo s)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
@@ -1426,13 +1447,18 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
{
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
+ bufptr += processed;
+ PqSendStart += processed;
+
+ if (r < 0 || (r == 0 && available))
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
@@ -1480,7 +1506,6 @@ internal_flush(void)
bufptr += r;
PqSendStart += r;
}
-
PqSendStart = PqSendPointer = 0;
return 0;
}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b3..3928e89 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2053,6 +2053,16 @@ retry1:
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
+ else if (strcmp(nameptr, "compression") == 0)
+ {
+ if (!parse_bool(valptr, &port->use_compression))
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid boolean value for parameter \"%s\": \"%s\"",
+ "compression",
+ valptr),
+ errhint("Valid values are: \"false\", \"off\", 0, \"true\", \"on\", 1.")));
+ }
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
@@ -4257,6 +4267,14 @@ BackendInitialize(Port *port)
if (status != STATUS_OK)
proc_exit(0);
+ if (pq_configure(port))
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to send compression message: %m")));
+ proc_exit(0);
+ }
+
/*
* Now that we have the user and database name, we can set the process
* title for ps. It's good to do this as early as possible in startup.
diff --git a/src/common/Makefile b/src/common/Makefile
index 1fc2c66..f804fe1 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -43,7 +43,7 @@ LIBS += $(PTHREAD_LIBS)
OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o file_perm.o \
ip.o keywords.o md5.o pg_lzcompress.o pgfnames.o psprintf.o relpath.o \
rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
- username.o wait_error.o
+ username.o wait_error.o zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS_COMMON += sha2_openssl.o
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..afd42e9
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,386 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+#if HAVE_LIBZSTD
+
+#include <malloc.h>
+#include <zstd.h>
+
+#define ZPQ_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+ ZSTD_CStream* tx_stream;
+ ZSTD_DStream* rx_stream;
+ ZSTD_outBuffer tx;
+ ZSTD_inBuffer rx;
+ size_t tx_not_flushed; /* Amount of datas in internal zstd buffer */
+ size_t tx_buffered; /* Data which is consumed by zpq_read but not yet sent */
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+ char const* rx_error; /* Decompress error message */
+ size_t tx_total;
+ size_t tx_total_raw;
+ size_t rx_total;
+ size_t rx_total_raw;
+ char tx_buf[ZPQ_BUFFER_SIZE];
+ char rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+ ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+ zs->tx_stream = ZSTD_createCStream();
+ ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+ zs->rx_stream = ZSTD_createDStream();
+ ZSTD_initDStream(zs->rx_stream);
+ zs->tx.dst = zs->tx_buf;
+ zs->tx.pos = 0;
+ zs->tx.size = ZPQ_BUFFER_SIZE;
+ zs->rx.src = zs->rx_buf;
+ zs->rx.pos = 0;
+ zs->rx.size = 0;
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->tx_buffered = 0;
+ zs->tx_not_flushed = 0;
+ zs->rx_error = NULL;
+ zs->arg = arg;
+ zs->tx_total = zs->tx_total_raw = 0;
+ zs->rx_total = zs->rx_total_raw = 0;
+ return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size, size_t *processed)
+{
+ ssize_t rc;
+ ZSTD_outBuffer out;
+ out.dst = buf;
+ out.pos = 0;
+ out.size = size;
+
+ while (1)
+ {
+ rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+ if (ZSTD_isError(rc))
+ {
+ zs->rx_error = ZSTD_getErrorName(rc);
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ /* Return result if we fill requested amount of bytes or read operation was performed */
+ if (out.pos != 0)
+ {
+ zs->rx_total_raw += out.pos;
+ return out.pos;
+ }
+ if (zs->rx.pos == zs->rx.size)
+ {
+ zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+ }
+ rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZPQ_BUFFER_SIZE - zs->rx.size);
+ if (rc > 0) /* read fetches some data */
+ {
+ zs->rx.size += rc;
+ zs->rx_total += rc;
+ }
+ else /* read failed */
+ {
+ *processed = out.pos;
+ zs->rx_total_raw += out.pos;
+ return rc;
+ }
+ }
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t *processed)
+{
+ ssize_t rc;
+ ZSTD_inBuffer in_buf;
+ in_buf.src = buf;
+ in_buf.pos = 0;
+ in_buf.size = size;
+
+ do
+ {
+ if (zs->tx.pos == 0) /* Compress buffer is empty */
+ {
+ zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (in_buf.pos < size) /* Has something to compress in input buffer */
+ ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+ if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+ {
+ zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+ if (rc > 0)
+ {
+ zs->tx.pos -= rc;
+ zs->tx.dst = (char*)zs->tx.dst + rc;
+ zs->tx_total += rc;
+ }
+ else
+ {
+ *processed = in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ zs->tx_total_raw += in_buf.pos;
+ return rc;
+ }
+ } while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+ zs->tx_total_raw += in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ return in_buf.pos;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+ if (zs != NULL)
+ {
+ ZSTD_freeCStream(zs->tx_stream);
+ ZSTD_freeDStream(zs->rx_stream);
+ free(zs);
+ }
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return zs->rx_error;
+}
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+ return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+char
+zpq_algorithm(void)
+{
+ return 'f';
+}
+
+#elif HAVE_LIBZ
+
+#include <malloc.h>
+#include <zlib.h>
+
+#define ZPQ_BUFFER_SIZE 8192
+#define ZLIB_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+ z_stream tx;
+ z_stream rx;
+
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+
+ size_t tx_buffered;
+
+ Bytef tx_buf[ZPQ_BUFFER_SIZE];
+ Bytef rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+ int rc;
+ ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+ memset(&zs->tx, 0, sizeof(zs->tx));
+ zs->tx.next_out = zs->tx_buf;
+ zs->tx.avail_out = ZPQ_BUFFER_SIZE;
+ zs->tx_buffered = 0;
+ rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZPQ_BUFFER_SIZE);
+
+ memset(&zs->rx, 0, sizeof(zs->tx));
+ zs->rx.next_in = zs->rx_buf;
+ zs->rx.avail_in = ZPQ_BUFFER_SIZE;
+ rc = inflateInit(&zs->rx);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZPQ_BUFFER_SIZE);
+ zs->rx.avail_in = 0;
+
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->arg = arg;
+
+ return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size, size_t *processed)
+{
+ int rc;
+ zs->rx.next_out = (Bytef *)buf;
+ zs->rx.avail_out = size;
+
+ while (1)
+ {
+ if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+ {
+ rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+ if (rc != Z_OK)
+ {
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ if (zs->rx.avail_out != size)
+ {
+ return size - zs->rx.avail_out;
+ }
+ if (zs->rx.avail_in == 0)
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ }
+ else
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZPQ_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+ if (rc > 0)
+ {
+ zs->rx.avail_in += rc;
+ }
+ else
+ {
+ *processed = size - zs->rx.avail_out;
+ return rc;
+ }
+ }
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t *processed)
+{
+ int rc;
+ zs->tx.next_in = (Bytef *)buf;
+ zs->tx.avail_in = size;
+ do
+ {
+ if (zs->tx.avail_out == ZPQ_BUFFER_SIZE) /* Compress buffer is empty */
+ {
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (zs->tx.avail_in != 0) /* Has something in input buffer */
+ {
+ rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+ Assert(rc == Z_OK);
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.next_out, ZPQ_BUFFER_SIZE - zs->tx.avail_out);
+ if (rc > 0)
+ {
+ zs->tx.next_out += rc;
+ zs->tx.avail_out += rc;
+ }
+ else
+ {
+ *processed = size - zs->tx.avail_in;
+ zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+ return rc;
+ }
+ } while (zs->tx.avail_out == ZPQ_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
+
+ zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+
+ return size - zs->tx.avail_in;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+ if (zs != NULL)
+ {
+ inflateEnd(&zs->rx);
+ deflateEnd(&zs->tx);
+ free(zs);
+ }
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return zs->rx.msg;
+}
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+ return zs != NULL ? zs->tx_buffered : 0;
+}
+
+char
+zpq_algorithm(void)
+{
+ return 'z';
+}
+
+#else
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+ return NULL;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+ return -1;
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size)
+{
+ return -1;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return NULL;
+}
+
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+ return 0;
+}
+
+char
+zpq_algorithm(void)
+{
+ return '0';
+}
+
+#endif
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..30dc98d
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,29 @@
+/*
+ * zpq_stream.h
+ * Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size, size_t* processed);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+char zpq_algorithm(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 7698cd1..5203f2d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -182,6 +182,8 @@ typedef struct Port
char *peer_cn;
bool peer_cert_valid;
+ bool use_compression;
+
/*
* OpenSSL structures. (Keep these last so that the locations of other
* fields are the same whether or not you build with OpenSSL.)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 7bf06c6..cb0b69e 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -61,6 +61,7 @@ extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
extern void pq_init(void);
+extern int pq_configure(Port* port);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 9411f48..c0ea383 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -365,6 +365,9 @@
/* Define to 1 if you have the `z' library (-lz). */
#undef HAVE_LIBZ
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
/* Define to 1 if the system has the type `locale_t'. */
#undef HAVE_LOCALE_T
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index abe0a50..2dad4fb 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,16 @@ endif
# platforms require special flags.
LIBS := $(LIBS:-lpgport=)
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
# We can't use Makefile variables here because the MSVC build system scrapes
# OBJS from this file.
OBJS= fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
@@ -49,7 +59,7 @@ endif
# src/backend/utils/mb
OBJS += encnames.o wchar.o
# src/common
-OBJS += base64.o ip.o md5.o scram-common.o saslprep.o unicode_norm.o
+OBJS += base64.o ip.o md5.o scram-common.o saslprep.o unicode_norm.o zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS += fe-secure-openssl.o fe-secure-common.o sha2_openssl.o
@@ -106,7 +116,7 @@ backend_src = $(top_srcdir)/src/backend
chklocale.c crypt.c erand48.c getaddrinfo.c getpeereid.c inet_aton.c inet_net_ntop.c noblock.c open.c system.c pgsleep.c pg_strong_random.c pgstrcasecmp.c pqsignal.c snprintf.c strerror.c strlcpy.c strnlen.c thread.c win32error.c win32setlocale.c: % : $(top_srcdir)/src/port/%
rm -f $@ && $(LN_S) $< .
-ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c: % : $(top_srcdir)/src/common/%
+ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c zpq_stream.c: % : $(top_srcdir)/src/common/%
rm -f $@ && $(LN_S) $< .
encnames.c wchar.c: % : $(backend_src)/utils/mb/%
@@ -156,7 +166,7 @@ clean distclean: clean-lib
rm -f pg_config_paths.h
# Remove files we (may have) symlinked in from src/port and other places
rm -f chklocale.c crypt.c erand48.c getaddrinfo.c getpeereid.c inet_aton.c inet_net_ntop.c noblock.c open.c system.c pgsleep.c pg_strong_random.c pgstrcasecmp.c pqsignal.c snprintf.c strerror.c strlcpy.c strnlen.c thread.c win32error.c win32setlocale.c
- rm -f ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c
+ rm -f ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c zpq_stream.c
rm -f encnames.c wchar.c
maintainer-clean: distclean maintainer-clean-lib
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a7e969d..029a982 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -72,6 +72,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
#include "common/ip.h"
#include "common/scram-common.h"
+#include "common/zpq_stream.h"
#include "mb/pg_wchar.h"
#include "port/pg_bswap.h"
@@ -325,6 +326,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"compression", "COMPRESSION", NULL, NULL,
+ "Libpq-compression", "Z", 1,
+ offsetof(struct pg_conn, compression)},
+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
DefaultTargetSessionAttrs, NULL,
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -430,6 +435,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
void
pqDropConnection(PGconn *conn, bool flushInput)
{
+ /* Release compression streams */
+ zpq_free(conn->zstream);
+ conn->zstream = NULL;
+
/* Drop any SSL state */
pqsecure_close(conn);
@@ -2648,11 +2657,33 @@ keep_going: /* We will come back to here until there is
*/
conn->inCursor = conn->inStart;
- /* Read type byte */
- if (pqGetc(&beresp, conn))
+ while (1)
{
- /* We'll come back when there is more data */
- return PGRES_POLLING_READING;
+ /* Read type byte */
+ if (pqGetc(&beresp, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+
+ if (beresp == 'z') /* Switch on compression */
+ {
+ char algorithm;
+ pqGetc(&algorithm, conn);
+ if (zpq_algorithm() != algorithm)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "server and client were configured with different libpq compression algorithms: %c vs. %c\n"),
+ algorithm, zpq_algorithm());
+ goto error_return;
+ }
+ /* mark byte consumed */
+ conn->inStart = conn->inCursor;
+ Assert(!conn->zstream);
+ conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+ } else
+ break;
}
/*
@@ -3462,6 +3493,8 @@ freePGconn(PGconn *conn)
free(conn->dbName);
if (conn->replication)
free(conn->replication);
+ if (conn->compression)
+ free(conn->compression);
if (conn->pguser)
free(conn->pguser);
if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 2a6637f..488f8d2 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,11 +53,12 @@
#include "port/pg_bswap.h"
#include "pg_config_paths.h"
+#include <common/zpq_stream.h>
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
- time_t end_time);
+static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+ time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
/*
@@ -630,6 +631,7 @@ pqReadData(PGconn *conn)
{
int someread = 0;
int nread;
+ size_t processed;
if (conn->sock == PGINVALID_SOCKET)
{
@@ -678,10 +680,23 @@ pqReadData(PGconn *conn)
/* OK, try to read some data */
retry3:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ processed = 0;
+ nread = conn->zstream
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd, &processed)
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd);
+ conn->inEnd += processed;
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
if (SOCK_ERRNO == EINTR)
goto retry3;
/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -768,10 +783,24 @@ retry3:
* arrived.
*/
retry4:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ processed = 0;
+ nread = conn->zstream
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd, &processed)
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd);
+ conn->inEnd += processed;
+
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
if (SOCK_ERRNO == EINTR)
goto retry4;
/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -842,12 +871,14 @@ pqSendSome(PGconn *conn, int len)
}
/* while there's still data to send */
- while (len > 0)
+ while (len > 0 || zpq_buffered(conn->zstream))
{
int sent;
-
+ size_t processed = 0;
+ sent = conn->zstream
+ ? zpq_write(conn->zstream, ptr, len, &processed)
#ifndef WIN32
- sent = pqsecure_write(conn, ptr, len);
+ : pqsecure_write(conn, ptr, len);
#else
/*
@@ -855,8 +886,11 @@ pqSendSome(PGconn *conn, int len)
* failure-point appears to be different in different versions of
* Windows, but 64k should always be safe.
*/
- sent = pqsecure_write(conn, ptr, Min(len, 65536));
+ : pqsecure_write(conn, ptr, Min(len, 65536));
#endif
+ ptr += processed;
+ len -= processed;
+ remaining -= processed;
if (sent < 0)
{
@@ -896,7 +930,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}
- if (len > 0)
+ if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8345faf..3942be1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2179,6 +2179,8 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("database", conn->dbName);
if (conn->replication && conn->replication[0])
ADD_STARTUP_OPTION("replication", conn->replication);
+ if (conn->compression && conn->compression[0])
+ ADD_STARTUP_OPTION("compression", conn->compression);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 9a586ff..6344eb6 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
/* include stuff common to fe and be */
#include "getaddrinfo.h"
#include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
/* include stuff found in fe only */
#include "pqexpbuffer.h"
@@ -357,6 +358,7 @@ struct pg_conn
char *sslrootcert; /* root certificate filename */
char *sslcrl; /* certificate revocation list filename */
char *requirepeer; /* required peer credentials for local sockets */
+ char *compression; /* stream compression (0 or 1) */
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
char *krbsrvname; /* Kerberos service name */
@@ -495,6 +497,9 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Compression stream */
+ ZpqStream* zstream;
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 593732f..ab2bad5 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -116,7 +116,7 @@ sub mkvcbuild
our @pgcommonallfiles = qw(
base64.c config_info.c controldata_utils.c exec.c file_perm.c ip.c
- keywords.c md5.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+ keywords.c md5.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
saslprep.c scram-common.c string.c unicode_norm.c username.c
wait_error.c);