New version of the patch with fixed is attached.
On 28.10.2020 22:27, Andres Freund wrote:
Hi,
On 2020-10-26 19:20:46 +0300, Konstantin Knizhnik wrote:
diff --git a/configure b/configure
index ace4ed5..deba608 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
LDFLAGS_SL
LDFLAGS_EX
with_zlib
+with_zstd
with_system_tzdata
with_libxslt
XML2_LIBS
I don't see a corresponding configure.ac change?
+ <varlistentry id="libpq-connect-compression" xreflabel="compression">
+ <term><literal>compression</literal></term>
+ <listitem>
+ <para>
+ Request compression of libpq traffic. Client sends to the server list
of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of
this algorithm and then all libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the
suggested algorithms, then it replies with 'n' (no compression)
+ message and it is up to the client whether to continue work without
compression or report error.
+ Supported compression algorithms are chosen at configure time. Right
now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is
used.
+ </para>
+ </listitem>
+ </varlistentry>
+
- there should be a reference to potential security impacts when used in
combination with encrypted connections
- What does " and it is up to the client whether to continue work
without compression or report error" actually mean for a libpq parameter?
- What is the point of the "streaming mode" reference?
@@ -263,6 +272,21 @@
</varlistentry>
<varlistentry>
+ <term>CompressionAck</term>
+ <listitem>
+ <para>
+ Server acknowledges using compression for client-server communication
protocol.
+ Compression can be requested by client by including "compression"
option in connection string.
+ Client sends to the server list of compression algorithms, supported
by client library
+ (compression algorithm is identified by one letter: <literal>'f'</literal> -
Facebook zstd, <literal>'z'</literal> - zlib,...).
+ If server supports one of this algorithms, then it acknowledges use
of this algorithm and all subsequent libpq messages send both from client to
server and
+ visa versa will be compressed. If server is not supporting any of the
suggested algorithms, then it replies with 'n' (no compression)
+ algorithm identifier and it is up to the client whether to continue
work without compression or report error.
+ </para>
+ </listitem>
+ </varlistentry>
Why are compression methods identified by one byte identifiers? That
seems unnecessarily small, given this is commonly a once-per-connection
action?
The protocol sounds to me like there's no way to enable/disable
compression in an existing connection. To me it seems better to have an
explicit, client initiated, request to use a specific method of
compression (including none). That allows to enable compression for bulk
work, and disable it in other cases (e.g. for security sensitive
content, or for unlikely to compress well content).
I think that would also make cross-version handling easier, because a
newer client driver can send the compression request and handle the
error, without needing to reconnect or such.
Most importantly, I think such a design is basically a necessity to make
connection poolers to work in a sensible way.
And lastly, wouldn't it be reasonable to allow to specify things like
compression levels? All that doesn't have to be supported now, but I
think the protocol should take that into account.
+<para>
+ Used compression algorithm. Right now the following streaming
compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no
compression.
+</para>
I would prefer this just be referenced as zstd or zstandard, not
facebook zstd. There's an RFC (albeit only "informational"), and it
doesn't name facebook, except as an employer:
https://tools.ietf.org/html/rfc8478
+int
+pq_configure(Port* port)
+{
+ char* client_compression_algorithms = port->compression_algorithms;
+ /*
+ * If client request compression, it sends list of supported
compression algorithms.
+ * Each compression algorirthm is idetified by one letter ('f' -
Facebook zsts, 'z' - xlib)
+ */
s/algorirthm/algorithm/
s/idetified/identified/
s/zsts/zstd/
s/xlib/zlib/
That's, uh, quite the typo density.
+ if (client_compression_algorithms)
+ {
+ char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ char compression_algorithm = ZPQ_NO_COMPRESSION;
+ char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
+ int rc;
Why is this hand-rolling protocol messages?
+ /* Intersect lists */
+ while (*client_compression_algorithms != '\0')
+ {
+ if (strchr(server_compression_algorithms,
*client_compression_algorithms))
+ {
+ compression_algorithm =
*client_compression_algorithms;
+ break;
+ }
+ client_compression_algorithms += 1;
+ }
Why isn't this is handled within zpq?
+ /* Send 'z' message to the client with selectde comression
algorithm ('n' if match is ont found) */
s/selectde/selected/
s/comression/compression/
s/ont/not/
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, compression,
sizeof(compression))) < 0
+ && errno == EINTR);
+ if ((size_t)rc != sizeof(compression))
+ return -1;
Huh? This all seems like an abstraction violation.
+ /* initialize compression */
+ if (zpq_set_algorithm(compression_algorithm))
+ PqStream = zpq_create((zpq_tx_func)secure_write,
(zpq_rx_func)secure_read, MyProcPort);
+ }
+ return 0;
+}
Why is zpq a wrapper around secure_write/read? I'm a bit worried this
will reduce the other places we could use zpq.
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ /* If srteaming compression is enabled then use correpondent
comression read function. */
s/srteaming/streaming/
s/correpondent/correponding/
s/comression/compression/
Could you please try to proof-read the patch a bit? The typo density
is quite high.
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength,
&processed)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE -
PqRecvLength);
+ PqRecvLength += processed;
? : doesn't make sense to me in this case. This should be an if/else.
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ char const* msg = zpq_error(PqStream);
+ if (msg == NULL)
+ msg = "end of stream";
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to decompress data:
%s", msg)));
+ return EOF;
+ }
I don't think we should error out with "failed to decompress data:"
e.g. when the client closed the connection.
@@ -1413,13 +1457,18 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data
to flush or unsent data in internal compression buffer */
{
Overly long line.
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
Same comment as above, re ternary expression.
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+ /*
+ * Returns letter identifying compression algorithm.
+ */
+ char (*name)(void);
+
+ /*
+ * Create compression stream with using rx/tx function for
fetching/sending compressed data
+ */
+ ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void
*arg);
+
+ /*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned
by the rx function.
+ */
+ ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
+
+ /*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code returned by tx
function.
+ * In the last case amount of written raw bytes is stored in *processed.
+ */
+ ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t
*processed);
This should at least specify how these functions are supposed to handle
blocking/nonblocking sockets.
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
Add some arguments for choosing these parameters.
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+ {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error,
zstd_buffered},
+#endif
+#if HAVE_LIBZ
+ {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error,
zlib_buffered},
+#endif
+ {NULL}
+};
I think it's preferrable to use designated initializers.
Do we really need zero terminated lists? Works fine, but brrr.
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+static int zpq_algorithm_impl;
This is just odd API design imo. Why doesn't the dispatch work based on
an argument for zpq_create() and the ZpqStream * for the rest?
What if there's two libpq connections in one process? To servers
supporting different compression algorithms? This isn't going to fly.
+/*
+ * Get list of the supported algorithms.
+ * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
+ * Algorithm identifies are appended to the provided buffer and terminated by
'\0'.
+ */
+void
+zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
+{
+ int i;
+ for (i = 0; zpq_algorithms[i].name != NULL; i++)
+ {
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = zpq_algorithms[i].name();
+ }
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = '\0';
+}
Uh, doesn't this bake ZPQ_MAX_ALGORITHMS into the ABI? That seems
entirely unnecessary?
@@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("replication", conn->replication);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
+ if (conn->compression && conn->compression[0])
+ {
+ bool enabled;
+ /*
+ * If compressoin is enabled, then send to the server list of
compression algorithms
+ * supported by client
+ */
s/compressoin/compression/
+ if (parse_bool(conn->compression, &enabled))
+ {
+ char compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ zpq_get_supported_algorithms(compression_algorithms);
+ ADD_STARTUP_OPTION("compression",
compression_algorithms);
+ }
+ }
I think this needs to work in a graceful manner across server
versions. You can make that work with an argument, using the _pq_
parameter stuff, but as I said earlier, I think it's a mistake to deal
with this in the startup packet anyway.
Greetings,
Andres Freund
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/configure b/configure
index ace4ed5..676b90e 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ with_gnu_ld
LD
LDFLAGS_SL
LDFLAGS_EX
+with_zstd
with_zlib
with_system_tzdata
with_libxslt
@@ -799,6 +800,7 @@ infodir
docdir
oldincludedir
includedir
+runstatedir
localstatedir
sharedstatedir
sysconfdir
@@ -867,6 +869,7 @@ with_libxml
with_libxslt
with_system_tzdata
with_zlib
+with_zstd
with_gnu_ld
enable_largefile
'
@@ -936,6 +939,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
+runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@@ -1188,6 +1192,15 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
+ -runstatedir | --runstatedir | --runstatedi | --runstated \
+ | --runstate | --runstat | --runsta | --runst | --runs \
+ | --run | --ru | --r)
+ ac_prev=runstatedir ;;
+ -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
+ | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
+ | --run=* | --ru=* | --r=*)
+ runstatedir=$ac_optarg ;;
+
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@@ -1325,7 +1338,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
- libdir localedir mandir
+ libdir localedir mandir runstatedir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@@ -1478,6 +1491,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
+ --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]
@@ -1571,6 +1585,7 @@ Optional Packages:
--with-system-tzdata=DIR
use system time zone data in DIR
--without-zlib do not use Zlib
+ --with-zstd do not use zstd
--with-gnu-ld assume the C compiler uses GNU ld [default=no]
Some influential environment variables:
@@ -8602,6 +8617,35 @@ fi
#
+# Zstd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+ withval=$with_zstd;
+ case $withval in
+ yes)
+ :
+ ;;
+ no)
+ :
+ ;;
+ *)
+ as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+ ;;
+ esac
+
+else
+ with_zstd=no
+
+fi
+
+
+
+
+#
# Assignments
#
@@ -12092,6 +12136,59 @@ fi
fi
+if test "$with_zstd" = yes; then
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_decompressStream in -lzstd" >&5
+$as_echo_n "checking for ZSTD_decompressStream in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_decompressStream+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_decompressStream ();
+int
+main ()
+{
+return ZSTD_decompressStream ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zstd_ZSTD_decompressStream=yes
+else
+ ac_cv_lib_zstd_ZSTD_decompressStream=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_decompressStream" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_decompressStream" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_decompressStream" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+ LIBS="-lzstd $LIBS"
+
+else
+ as_fn_error $? "zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+fi
+
if test "$enable_spinlocks" = yes; then
$as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13297,6 +13394,20 @@ fi
fi
+if test "$with_zstd" = yes; then
+ ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default"
+if test "x$ac_cv_header_zstd_h" = xyes; then :
+
+else
+ as_fn_error $? "zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+
+fi
+
if test "$with_gssapi" = yes ; then
for ac_header in gssapi/gssapi.h
do :
@@ -14689,7 +14800,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -14735,7 +14846,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -14804,7 +14915,7 @@ else
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
@@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
We can't simply define LARGE_OFF_T to be 9223372036854775807,
since some C++ compilers masquerading as C compilers
incorrectly reject 9223372036854775807. */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
&& LARGE_OFF_T % 2147483647 == 1)
? 1 : -1];
diff --git a/configure.ac b/configure.ac
index 5b91c83..bd760e5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1000,6 +1000,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
AC_SUBST(with_zlib)
#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+ [do not use zstd])
+AC_SUBST(with_zstd)
+
+#
# Assignments
#
@@ -1186,6 +1193,14 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+ [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
if test "$enable_spinlocks" = yes; then
AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
else
@@ -1400,6 +1415,13 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
if test "$with_gssapi" = yes ; then
AC_CHECK_HEADERS(gssapi/gssapi.h, [],
[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ce32fb..140724d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,22 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
</listitem>
</varlistentry>
+ <varlistentry id="libpq-connect-compression" xreflabel="compression">
+ <term><literal>compression</literal></term>
+ <listitem>
+ <para>
+ Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+ message and it is up to the client whether to continue work without compression or report error.
+ Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
<term><literal>client_encoding</literal></term>
<listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 9a95d7b..03d5df4 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
such as <command>COPY</command>.
</para>
+ <para>
+ It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+ Compression is especial useful for importing/exporting data to/from database using COPY command
+ and for replication (both physical and logical). Also compression can reduce server response time
+ in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...)
+ Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ </para>
+
<sect2 id="protocol-message-concepts">
<title>Messaging Overview</title>
@@ -263,6 +272,21 @@
</varlistentry>
<varlistentry>
+ <term>CompressionAck</term>
+ <listitem>
+ <para>
+ Server acknowledges using compression for client-server communication protocol.
+ Compression can be requested by client by including "compression" option in connection string.
+ Client sends to the server list of compression algorithms, supported by client library
+ (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib,...).
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+ algorithm identifier and it is up to the client whether to continue work without compression or report error.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term>AuthenticationOk</term>
<listitem>
<para>
@@ -3396,6 +3420,56 @@ following:
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('z')
+</term>
+<listitem>
+<para>
+ Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it responds with CompressionAck with identifier (letter) of first such algorithm.
+ If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) algorithm.
+ It is up to the client whether to continue work without compression or report error.
+ After receiving this message with algorithm identifier other than 'n', both server and client are switched to compression mode
+ and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Byte1
+</term>
+<listitem>
+<para>
+ Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no compression.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
<varlistentry>
<term>
@@ -5962,6 +6036,19 @@ StartupMessage (F)
</para>
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+ <literal>compression</literal>
+</term>
+<listitem>
+<para>
+ Request compression of libpq traffic. Value is list of compression algorithms supported by client:
+ <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib, <literal>'n'</literal> - no compression.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
</variablelist>
In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9a..9e11599 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm = @with_llvm@
with_system_tzdata = @with_system_tzdata@
with_uuid = @with_uuid@
with_zlib = @with_zlib@
+with_zstd = @with_zstd@
enable_rpath = @enable_rpath@
enable_nls = @enable_nls@
enable_debug = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95..f32a780 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
LIBS += -lsystemd
endif
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
##########################################################################
all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index ac986c0..9df8aa0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -93,6 +93,7 @@
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
+#include "common/zpq_stream.h"
/*
* Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -141,6 +142,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+static ZpqStream* PqStream;
+
+
/*
* Message status
*/
@@ -183,6 +187,69 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
WaitEventSet *FeBeWaitSet;
+/* --------------------------------
+ * pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+ char* client_compression_algorithms = port->compression_algorithms;
+ /*
+ * If client request compression, it sends list of supported compression algorithms.
+ * Each compression algorithm is identified by one letter ('f' - Facebook zsts, 'z' - zlib)
+ */
+ if (client_compression_algorithms)
+ {
+ char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ char compression_algorithm = ZPQ_NO_COMPRESSION;
+ char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
+ int impl;
+ int rc;
+
+ /* Get list of compression algorithms, supported by server */
+ zpq_get_supported_algorithms(server_compression_algorithms);
+
+ /* Intersect lists */
+ while (*client_compression_algorithms != '\0')
+ {
+ if (strchr(server_compression_algorithms, *client_compression_algorithms))
+ {
+ compression_algorithm = *client_compression_algorithms;
+ break;
+ }
+ client_compression_algorithms += 1;
+ }
+
+ compression[5] = compression_algorithm;
+ /* Send 'z' message to the client with selected compression algorithm ('n' if match is not found) */
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
+ && errno == EINTR);
+ if ((size_t)rc != sizeof(compression))
+ return -1;
+
+ /* initialize compression */
+ impl = zpq_get_algorithm_impl(compression_algorithm);
+ if (impl < 0)
+ {
+ ereport(LOG,
+ (errmsg("Requested algorithm %c is not supported", compression_algorithm)));
+ return -1;
+ }
+ PqStream = zpq_create(impl, (zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort, NULL, 0);
+ if (!PqStream)
+ {
+ ereport(LOG,
+ (errmsg("Failed to initialize compressor %c(%d)", compression_algorithm, impl)));
+ return -1;
+ }
+ }
+ return 0;
+}
/* --------------------------------
* pq_init - initialize libpq at backend startup
@@ -280,6 +347,9 @@ socket_close(int code, Datum arg)
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
+ /* Release compression streams */
+ zpq_free(PqStream);
+
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +989,15 @@ socket_set_nonblocking(bool nonblocking)
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
- * returns 0 if OK, EOF if trouble
+ * nowait parameter toggles non-blocking mode.
+ * returns number of read bytes, EOF if trouble
* --------------------------------
*/
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ int r;
+
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1013,36 @@ pq_recvbuf(void)
}
/* Ensure that we're in blocking mode */
- socket_set_nonblocking(false);
+ socket_set_nonblocking(nowait);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
- int r;
-
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ /* If streaming compression is enabled then use correspondent compression read function. */
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ char const* msg = zpq_error(PqStream);
+ if (msg == NULL)
+ msg = "end of stream";
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to decompress data: %s", msg)));
+ return EOF;
+ }
if (errno == EINTR)
continue; /* Ok if interrupted */
+ if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return 0;
+
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
@@ -975,7 +1063,7 @@ pq_recvbuf(void)
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
- return 0;
+ return r;
}
}
@@ -990,7 +1078,7 @@ pq_getbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1097,7 @@ pq_peekbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1030,44 +1118,11 @@ pq_getbyte_if_available(unsigned char *c)
Assert(PqCommReadingMsg);
- if (PqRecvPointer < PqRecvLength)
+ if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
- /* Put the socket into non-blocking mode */
- socket_set_nonblocking(true);
-
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
- {
- /*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket). Report
- * other errors.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client would
- * cause recursion to here, leading to stack overflow and core
- * dump! This message must go *only* to the postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
- r = EOF;
- }
-
return r;
}
@@ -1088,7 +1143,7 @@ pq_getbytes(char *s, size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1177,7 @@ pq_discardbytes(size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1218,7 @@ pq_getstring(StringInfo s)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
@@ -1413,13 +1468,19 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered(PqStream) != 0)
+ /* has more data to flush or unsent data in internal compression buffer */
{
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
+ bufptr += processed;
+ PqSendStart += processed;
+
+ if (r < 0 || (r == 0 && available))
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
@@ -1467,7 +1528,6 @@ internal_flush(void)
bufptr += r;
PqSendStart += r;
}
-
PqSendStart = PqSendPointer = 0;
return 0;
}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 959e3b8..60ff1e7 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2151,6 +2151,8 @@ retry1:
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
+ else if (strcmp(nameptr, "compression") == 0)
+ port->compression_algorithms = pstrdup(valptr);
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
@@ -4458,6 +4460,14 @@ BackendInitialize(Port *port)
if (status != STATUS_OK)
proc_exit(0);
+ if (pq_configure(port))
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to send compression message: %m")));
+ proc_exit(0);
+ }
+
/*
* Now that we have the user and database name, we can set the process
* title for ps. It's good to do this as early as possible in startup.
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd..bc6cba8 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
unicode_norm.o \
username.o \
wait_error.o \
- wchar.o
+ wchar.o \
+ zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..c86cad0
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,517 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+ /*
+ * Returns letter identifying compression algorithm.
+ */
+ char (*name)(void);
+
+ /*
+ * Create compression stream with using rx/tx function for fetching/sending compressed data.
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for receiving compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+ ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size);
+
+ /*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
+ */
+ ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
+
+ /*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code returned by tx function.
+ * In the last case amount of written raw bytes is stored in *processed.
+ */
+ ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
+
+ /*
+ * Free stream created by create function.
+ */
+ void (*free)(ZpqStream *zs);
+
+ /*
+ * Get error message.
+ */
+ char const* (*error)(ZpqStream *zs);
+
+ /*
+ * Returns amount of data in internal compression buffer.
+ */
+ size_t (*buffered)(ZpqStream *zs);
+} ZpqAlgorithm;
+
+struct ZpqStream
+{
+ ZpqAlgorithm const* algorithm;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+typedef struct ZstdStream
+{
+ ZpqStream common;
+ ZSTD_CStream* tx_stream;
+ ZSTD_DStream* rx_stream;
+ ZSTD_outBuffer tx;
+ ZSTD_inBuffer rx;
+ size_t tx_not_flushed; /* Amount of data in internal zstd buffer */
+ size_t tx_buffered; /* Data which is consumed by ztd_read but not yet sent */
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+ char const* rx_error; /* Decompress error message */
+ size_t tx_total;
+ size_t tx_total_raw;
+ size_t rx_total;
+ size_t rx_total_raw;
+ char tx_buf[ZSTD_BUFFER_SIZE];
+ char rx_buf[ZSTD_BUFFER_SIZE];
+} ZstdStream;
+
+static ZpqStream*
+zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream));
+
+ zs->tx_stream = ZSTD_createCStream();
+ ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+ zs->rx_stream = ZSTD_createDStream();
+ ZSTD_initDStream(zs->rx_stream);
+ zs->tx.dst = zs->tx_buf;
+ zs->tx.pos = 0;
+ zs->tx.size = ZSTD_BUFFER_SIZE;
+ zs->rx.src = zs->rx_buf;
+ zs->rx.pos = 0;
+ zs->rx.size = 0;
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->tx_buffered = 0;
+ zs->tx_not_flushed = 0;
+ zs->rx_error = NULL;
+ zs->arg = arg;
+ zs->tx_total = zs->tx_total_raw = 0;
+ zs->rx_total = zs->rx_total_raw = 0;
+
+ zs->rx.size = rx_data_size;
+ Assert(rx_data_size < ZSTD_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_outBuffer out;
+ out.dst = buf;
+ out.pos = 0;
+ out.size = size;
+
+ while (1)
+ {
+ rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+ if (ZSTD_isError(rc))
+ {
+ zs->rx_error = ZSTD_getErrorName(rc);
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ /* Return result if we fill requested amount of bytes or read operation was performed */
+ if (out.pos != 0)
+ {
+ zs->rx_total_raw += out.pos;
+ return out.pos;
+ }
+ if (zs->rx.pos == zs->rx.size)
+ {
+ zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+ }
+ rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size);
+ if (rc > 0) /* read fetches some data */
+ {
+ zs->rx.size += rc;
+ zs->rx_total += rc;
+ }
+ else /* read failed */
+ {
+ zs->rx_total_raw += out.pos;
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_inBuffer in_buf;
+ in_buf.src = buf;
+ in_buf.pos = 0;
+ in_buf.size = size;
+
+ do
+ {
+ if (zs->tx.pos == 0) /* Compress buffer is empty */
+ {
+ zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (in_buf.pos < size) /* Has something to compress in input buffer */
+ ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+ if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+ {
+ zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+ if (rc > 0)
+ {
+ zs->tx.pos -= rc;
+ zs->tx.dst = (char*)zs->tx.dst + rc;
+ zs->tx_total += rc;
+ }
+ else
+ {
+ *processed = in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ zs->tx_total_raw += in_buf.pos;
+ return rc;
+ }
+ } while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+ zs->tx_total_raw += in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ if (zs != NULL)
+ {
+ ZSTD_freeCStream(zs->tx_stream);
+ ZSTD_freeDStream(zs->rx_stream);
+ free(zs);
+ }
+}
+
+static char const*
+zstd_error(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs->rx_error;
+}
+
+static size_t
+zstd_buffered(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static char
+zstd_name(void)
+{
+ return 'f';
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE 8192
+#define ZLIB_COMPRESSION_LEVEL 1
+
+typedef struct ZlibStream
+{
+ ZpqStream common;
+
+ z_stream tx;
+ z_stream rx;
+
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+
+ size_t tx_buffered;
+
+ Bytef tx_buf[ZLIB_BUFFER_SIZE];
+ Bytef rx_buf[ZLIB_BUFFER_SIZE];
+} ZlibStream;
+
+static ZpqStream*
+zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ int rc;
+ ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream));
+ memset(&zs->tx, 0, sizeof(zs->tx));
+ zs->tx.next_out = zs->tx_buf;
+ zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+ zs->tx_buffered = 0;
+ rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE);
+
+ memset(&zs->rx, 0, sizeof(zs->tx));
+ zs->rx.next_in = zs->rx_buf;
+ zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+ rc = inflateInit(&zs->rx);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE);
+
+ zs->rx.avail_in = rx_data_size;
+ Assert(rx_data_size < ZLIB_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->arg = arg;
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->rx.next_out = (Bytef *)buf;
+ zs->rx.avail_out = size;
+
+ while (1)
+ {
+ if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+ {
+ rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+ if (rc != Z_OK && rc != Z_BUF_ERROR)
+ {
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ if (zs->rx.avail_out != size)
+ {
+ return size - zs->rx.avail_out;
+ }
+ if (zs->rx.avail_in == 0)
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ }
+ else
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+ if (rc > 0)
+ {
+ zs->rx.avail_in += rc;
+ }
+ else
+ {
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->tx.next_in = (Bytef *)buf;
+ zs->tx.avail_in = size;
+ do
+ {
+ if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
+ {
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (zs->tx.avail_in != 0) /* Has something in input buffer */
+ {
+ rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+ Assert(rc == Z_OK);
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out);
+ if (rc > 0)
+ {
+ zs->tx.next_out += rc;
+ zs->tx.avail_out += rc;
+ }
+ else
+ {
+ *processed = size - zs->tx.avail_in;
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+ return rc;
+ }
+ } while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
+
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+
+ return size - zs->tx.avail_in;
+}
+
+static void
+zlib_free(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ if (zs != NULL)
+ {
+ inflateEnd(&zs->rx);
+ deflateEnd(&zs->tx);
+ free(zs);
+ }
+}
+
+static char const*
+zlib_error(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs != NULL ? zs->tx_buffered : 0;
+}
+
+static char
+zlib_name(void)
+{
+ return 'z';
+}
+
+#endif
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+ {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered},
+#endif
+#if HAVE_LIBZ
+ {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered},
+#endif
+ {NULL}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream*
+zpq_create(int algorithm_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZpqStream* stream = zpq_algorithms[algorithm_impl].create(tx_func, rx_func, arg, rx_data, rx_data_size);
+ stream->algorithm = &zpq_algorithms[algorithm_impl];
+ return stream;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+ return zs->algorithm->read(zs, buf, size);
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed)
+{
+ return zs->algorithm->write(zs, buf, size, processed);
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+ if (zs)
+ zs->algorithm->free(zs);
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return zs->algorithm->error(zs);
+}
+
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->buffered(zs) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
+ * Algorithm identifies are appended to the provided buffer and terminated by '\0'.
+ */
+void
+zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
+{
+ int i;
+ for (i = 0; zpq_algorithms[i].name != NULL; i++)
+ {
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = zpq_algorithms[i].name();
+ }
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = '\0';
+}
+
+
+
+/*
+ * Choose current algorithm implementation.
+ * Returns implementation number or -1 if algorithm with such name is not found
+ */
+int
+zpq_get_algorithm_impl(char name)
+{
+ int i;
+ if (name != ZPQ_NO_COMPRESSION)
+ {
+ for (i = 0; zpq_algorithms[i].name != NULL; i++)
+ {
+ if (zpq_algorithms[i].name() == name)
+ {
+ return i;
+ }
+ }
+ }
+ return -1;
+}
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..c7f0793
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,32 @@
+/*
+ * zpq_stream.h
+ * Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_MAX_ALGORITHMS (8)
+#define ZPQ_NO_COMPRESSION 'n'
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+ZpqStream* zpq_create(int impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+
+void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]);
+int zpq_get_algorithm_impl(char name);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281..5289f9f 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,8 @@ typedef struct Port
int keepalives_count;
int tcp_user_timeout;
+ char* compression_algorithms; /* Compression algorithms supported by client */
+
/*
* GSSAPI structures.
*/
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b115247..224ff3d 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
extern void pq_init(void);
+extern int pq_configure(Port* port);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index fb270df..b829fed 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
/* Define to 1 if you have the `link' function. */
#undef HAVE_LINK
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
/* Define to 1 if the system has the type `locale_t'. */
#undef HAVE_LOCALE_T
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b..be8cb34 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
# The MSVC build system scrapes OBJS from this file. If you change any of
# the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
OBJS = \
$(WIN32RES) \
fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index b0ca37c..edda46c 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
#include "common/ip.h"
#include "common/link-canary.h"
#include "common/scram-common.h"
+#include "common/zpq_stream.h"
#include "common/string.h"
#include "fe-auth.h"
#include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"compression", "COMPRESSION", NULL, NULL,
+ "Libpq-compression", "", 1,
+ offsetof(struct pg_conn, compression)},
+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
DefaultTargetSessionAttrs, NULL,
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
void
pqDropConnection(PGconn *conn, bool flushInput)
{
+ /* Release compression streams */
+ zpq_free(conn->zstream);
+ conn->zstream = NULL;
+
/* Drop any SSL state */
pqsecure_close(conn);
@@ -3216,11 +3225,55 @@ keep_going: /* We will come back to here until there is
*/
conn->inCursor = conn->inStart;
- /* Read type byte */
- if (pqGetc(&beresp, conn))
+ while (1)
{
- /* We'll come back when there is more data */
- return PGRES_POLLING_READING;
+ /* Read type byte */
+ if (pqGetc(&beresp, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+
+ if (beresp == 'z') /* Switch on compression */
+ {
+ char algorithm;
+ int impl;
+ /* Read message length word */
+ if (pqGetInt(&msgLength, 4, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+ if (msgLength != 5)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+ msgLength);
+ goto error_return;
+ }
+ pqGetc(&algorithm, conn);
+ impl = zpq_get_algorithm_impl(algorithm);
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "server is not supported requested compression algorithm %c\n"), algorithm);
+ goto error_return;
+ }
+ Assert(!conn->zstream);
+ conn->zstream = zpq_create(impl, (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+ &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+ if (!conn->zstream)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "failed to initialize compressor %c(%d)\n"), algorithm, impl);
+ goto error_return;
+ }
+ /* reset buffer */
+ conn->inStart = conn->inCursor = conn->inEnd = 0;
+ } else
+ break;
}
/*
@@ -4020,6 +4073,8 @@ freePGconn(PGconn *conn)
free(conn->dbName);
if (conn->replication)
free(conn->replication);
+ if (conn->compression)
+ free(conn->compression);
if (conn->pguser)
free(conn->pguser);
if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f3..56e8468 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,12 +53,21 @@
#include "pg_config_paths.h"
#include "port/pg_bswap.h"
+#include <common/zpq_stream.h>
+
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
+#define pq_read_conn(conn) \
+ (conn->zstream \
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd) \
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd))
+
/*
* PQlibVersion: return the libpq version number
*/
@@ -664,10 +673,17 @@ pqReadData(PGconn *conn)
/* OK, try to read some data */
retry3:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -759,10 +775,18 @@ retry3:
* arrived.
*/
retry4:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
+
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -875,12 +899,14 @@ pqSendSome(PGconn *conn, int len)
}
/* while there's still data to send */
- while (len > 0)
+ while (len > 0 || zpq_buffered(conn->zstream))
{
int sent;
-
+ size_t processed = 0;
+ sent = conn->zstream
+ ? zpq_write(conn->zstream, ptr, len, &processed)
#ifndef WIN32
- sent = pqsecure_write(conn, ptr, len);
+ : pqsecure_write(conn, ptr, len);
#else
/*
@@ -888,8 +914,11 @@ pqSendSome(PGconn *conn, int len)
* failure-point appears to be different in different versions of
* Windows, but 64k should always be safe.
*/
- sent = pqsecure_write(conn, ptr, Min(len, 65536));
+ : pqsecure_write(conn, ptr, Min(len, 65536));
#endif
+ ptr += processed;
+ len -= processed;
+ remaining -= processed;
if (sent < 0)
{
@@ -943,7 +972,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}
- if (len > 0)
+ if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525..8ad52c9 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2135,6 +2135,83 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
}
/*
+ * Parse boolean value. This code is copied from backend/utils/atd/bool.c
+ * because it is not available at frontend.
+ */
+static bool
+parse_bool(const char *value, bool *result)
+{
+ switch (*value)
+ {
+ case 't':
+ case 'T':
+ if (pg_strcasecmp(value, "true") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case 'f':
+ case 'F':
+ if (pg_strcasecmp(value, "false") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case 'y':
+ case 'Y':
+ if (pg_strcasecmp(value, "yes") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case 'n':
+ case 'N':
+ if (pg_strcasecmp(value, "no") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case 'o':
+ case 'O':
+ /* 'o' is not unique enough */
+ if (pg_strcasecmp(value, "on") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ else if (pg_strcasecmp(value, "off") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case '1':
+ if (value[1] == '\0')
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case '0':
+ if (value[1] == '\0')
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+
+ *result = false; /* suppress compiler warning */
+ return false;
+}
+
+/*
* Build a startup packet given a filled-in PGconn structure.
*
* We need to figure out how much space is needed, then fill it in.
@@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("replication", conn->replication);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
+ if (conn->compression && conn->compression[0])
+ {
+ bool enabled;
+ /*
+ * If compression is enabled, then send to the server list of compression algorithms
+ * supported by client
+ */
+ if (parse_bool(conn->compression, &enabled))
+ {
+ char compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ zpq_get_supported_algorithms(compression_algorithms);
+ ADD_STARTUP_OPTION("compression", compression_algorithms);
+ }
+ }
if (conn->send_appname)
{
/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae..69ad1e9 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
/* include stuff common to fe and be */
#include "getaddrinfo.h"
#include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
/* include stuff found in fe only */
#include "pqexpbuffer.h"
@@ -369,6 +370,7 @@ struct pg_conn
* "sspi") */
char *ssl_min_protocol_version; /* minimum TLS protocol version */
char *ssl_max_protocol_version; /* maximum TLS protocol version */
+ char *compression; /* stream compression (0 or 1) */
/* Type of connection to make. Possible values: any, read-write. */
char *target_session_attrs;
@@ -527,6 +529,9 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Compression stream */
+ ZpqStream* zstream;
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd..daac58c 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
config_info.c controldata_utils.c d2s.c encnames.c exec.c
f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
keywords.c kwlookup.c link-canary.c md5.c
- pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+ pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
wait_error.c wchar.c);