On 1/11/21 2:53 PM, Konstantin Knizhnik wrote: > > ... > > New version of libpq compression patch is attached. > It can be also be found at g...@github.com:postgrespro/libpq_compression.git >
Seems it bit-rotted already, so here's a slightly fixed version. 1) Fixes the MSVC makefile. The list of files is sorted alphabetically, so I've added the file at the end. 2) Fixes duplicate OID. It's a good practice to assign OIDs from the end of the range, to prevent collisions during development. Other than that, I wonder what's the easiest way to run all tests with compression enabled. ISTM it'd be nice to add pg_regress option forcing a particular compression algorithm to be used, or something similar. I'd like a convenient way to pass this through a valgrind, for example. Or how do we get this tested on a buildfarm? I'm not convinced it's very user-friendly to not have a psql option enabling compression. It's true it can be enabled in a connection string, but I doubt many people will notice that. The sgml docs need a bit more love / formatting. The lines in libpq.sgml are far too long, and there are no tags whatsoever. Presumably zlib/zstd should be marked as <literal>, and so on. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
>From 14690d8a877244fbc839f97274ccf7d84e971e71 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@2ndquadrant.com> Date: Mon, 11 Jan 2021 18:01:11 +0100 Subject: [PATCH] v29 --- configure | 81 +++ configure.ac | 21 + .../postgres_fdw/expected/postgres_fdw.out | 2 +- doc/src/sgml/config.sgml | 16 + doc/src/sgml/libpq.sgml | 25 + doc/src/sgml/protocol.sgml | 93 +++ src/Makefile.global.in | 1 + src/backend/Makefile | 8 + src/backend/catalog/system_views.sql | 9 + src/backend/libpq/pqcomm.c | 247 +++++-- src/backend/postmaster/pgstat.c | 30 + src/backend/postmaster/postmaster.c | 10 + src/backend/utils/adt/pgstatfuncs.c | 50 +- src/backend/utils/misc/guc.c | 10 + src/common/Makefile | 3 +- src/common/zpq_stream.c | 684 ++++++++++++++++++ src/include/catalog/pg_proc.dat | 18 +- src/include/common/zpq_stream.h | 94 +++ src/include/libpq/libpq-be.h | 3 + src/include/libpq/libpq.h | 1 + src/include/libpq/pqcomm.h | 1 + src/include/pg_config.h.in | 3 + src/include/pgstat.h | 7 + src/interfaces/libpq/Makefile | 14 + src/interfaces/libpq/fe-connect.c | 92 ++- src/interfaces/libpq/fe-exec.c | 4 +- src/interfaces/libpq/fe-misc.c | 55 +- src/interfaces/libpq/fe-protocol3.c | 165 ++++- src/interfaces/libpq/libpq-int.h | 21 + src/test/regress/expected/rules.out | 14 +- src/tools/msvc/Mkvcbuild.pm | 2 +- 31 files changed, 1701 insertions(+), 83 deletions(-) create mode 100644 src/common/zpq_stream.c create mode 100644 src/include/common/zpq_stream.h diff --git a/configure b/configure index b917a2a1c9..feb9420106 100755 --- a/configure +++ b/configure @@ -699,6 +699,7 @@ LD LDFLAGS_SL LDFLAGS_EX with_zlib +with_zstd with_system_tzdata with_libxslt XML2_LIBS @@ -866,6 +867,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_zstd with_gnu_ld enable_largefile ' @@ -8571,6 +8573,85 @@ fi +# +# ZStd +# + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + + + +if test "$with_zstd" = yes ; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compress (); +int +main () +{ +return ZSTD_compress (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compress=yes +else + ac_cv_lib_zstd_ZSTD_compress=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5 +fi + +fi + + # # Zlib diff --git a/configure.ac b/configure.ac index 838d47dc22..c037cd88f6 100644 --- a/configure.ac +++ b/configure.ac @@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes, [do not use Zlib]) AC_SUBST(with_zlib) +# +# Zstd +# +PGAC_ARG_BOOL(with, zstd, no, + [use zstd]) +AC_SUBST(with_zstd) + # # Assignments # @@ -1186,6 +1193,13 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_LIB(zstd, ZSTD_decompressStream, [], + [AC_MSG_ERROR([zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory.])]) +fi + if test "$enable_spinlocks" = yes; then AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) else @@ -1401,6 +1415,13 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index c11092f8cc..07504388b0 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8911,7 +8911,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7c0a673a8d..a723251a2a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -996,6 +996,22 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression"> + <term><varname>libpq_compression</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>libpq_compression</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + This parameter enables compression of libpq traffic between client and server. + The default is <literal>on</literal>. + This option allows rejecting compression requests even if it is supported by server + (for example, due to security, or CPU consumption). + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 2bb3bf77e4..ed9e7f910e 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -1240,6 +1240,31 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname </listitem> </varlistentry> + <varlistentry id="libpq-connect-compression" xreflabel="compression"> + <term><literal>compression</literal></term> + <listitem> + <para> + Request compression of libpq traffic. The client sends a request with a list of compression algorithms. + Compression can be requested by a client by including the "compression" option in its connection string. + This can either be a boolean value to enable or disable compression + ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "any", or an explicit list of comma-separated compression algorithms + which can optionally include compression level ("zlib,zstd:5"). + If compression is enabled but an algorithm is not explicitly specified, the client library sends its full list of + supported algorithms and the server chooses a preferred algorithm. + + If the server accepts one of the algorithms, it replies with an acknowledgment and all future libpq messages between client and server + will be compressed. + If the server rejects the compression request, it is up to the client whether to continue without compression or to report an error. + Support for compression algorithms must be enabled when the server is compiled. + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). In both cases, streaming mode is used. + By default, compression is not requested by the client. + Please note that using compression together with SSL may expose extra vulnerabilities: + <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink> + </para> + </listitem> + </varlistentry> + <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding"> <term><literal>client_encoding</literal></term> <listitem> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 24f7b21ecb..a69f9bfc26 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -92,6 +92,15 @@ such as <command>COPY</command>. </para> + <para> + It is possible to compress protocol data to reduce traffic and speed-up client-server interaction. + Compression is especially useful for importing/exporting data to/from the database using the <literal>COPY</literal> command + and for replication (both physical and logical). Compression can also reduce the server's response time + for queries returning a large amount of data (for example, JSON, BLOBs, text, ...). + Currently, two libraries are supported: zlib (default) and zstd (if Postgres was + configured with --with-zstd option). + </para> + <sect2 id="protocol-message-concepts"> <title>Messaging Overview</title> @@ -262,6 +271,22 @@ </listitem> </varlistentry> + <varlistentry> + <term>CompressionAck</term> + <listitem> + <para> + The server accepts the client's compression request. + Compression is requested when a client connection includes the "compression" option, which includes + a list of requested compression algorithms. + If the server accepts one of these algorithms, it acknowledges use of compression and + all subsequent libpq messages between the client and server will be compressed. + The server chooses an algorithm from the list specified by client and responds with the index of the chosen algorithm from the client-supplied list. + If the server does not accept any of the requested algorithms, then it replies with an index of -1 + and it is up to the client whether to continue without compression or to report an error. + </para> + </listitem> + </varlistentry> + <varlistentry> <term>AuthenticationOk</term> <listitem> @@ -3416,6 +3441,59 @@ following: </listitem> </varlistentry> +<varlistentry> +<term> +CompressionAck (B) +</term> +<listitem> +<para> + +<variablelist> +<varlistentry> +<term> + Byte1('z') +</term> +<listitem> +<para> + Acknowledge use of compression for protocol data. The client sends to the server a list of requested compression algorithms. + If the server supports any of these algorithms, it acknowledges use of this algorithm and all subsequent libpq messages between client and server + will be compressed. + The server selects the preferred algorithm from the list specified by client and responds with the + index of the chosen algorithm in this list. + If the server does not support any of the requested algorithms, it replies with -1 + and it is up to the client whether to continue without compression or to report an error. + After receiving this message with algorithm index other than -1, both server and client switch to compressed mode + and exchange compressed messages. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Int32 +</term> +<listitem> +<para> + Length of message contents in bytes, including self. +</para> +</listitem> +</varlistentry> +<varlistentry> +<term> + Byte1 +</term> +<listitem> +<para> + Index of algorithm in the list of supported algorithms specified by client or -1 if none of them are supported. +</para> +</listitem> +</varlistentry> +</variablelist> + +</para> +</listitem> +</varlistentry> + + <varlistentry> <term> @@ -5982,6 +6060,21 @@ StartupMessage (F) </para> </listitem> </varlistentry> +<varlistentry> +<term> + <literal>_pq_.compression</literal> +</term> +<listitem> +<para> + Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional + specification of compression level: <literal>"zlib,zstd:5"</literal>. + If the server does not accept compression, the backend will ignore the _pq_.compression + parameter and will not send the CompressionAck message to the frontend. + By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities: + <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>. +</para> +</listitem> +</varlistentry> </variablelist> In addition to the above, other parameters may be listed. diff --git a/src/Makefile.global.in b/src/Makefile.global.in index 7ca1e9aac5..9e11599002 100644 --- a/src/Makefile.global.in +++ b/src/Makefile.global.in @@ -196,6 +196,7 @@ with_llvm = @with_llvm@ with_system_tzdata = @with_system_tzdata@ with_uuid = @with_uuid@ with_zlib = @with_zlib@ +with_zstd = @with_zstd@ enable_rpath = @enable_rpath@ enable_nls = @enable_nls@ enable_debug = @enable_debug@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 9672e2cb43..3ff92daf75 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes) LIBS += -lsystemd endif +ifeq ($(with_zstd),yes) +LIBS += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +endif + ########################################################################## all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5d89e77dbe..1cd343a425 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS LEFT JOIN pg_database AS D ON (S.datid = D.oid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_network_traffic AS + SELECT + S.pid, + S.rx_raw_bytes, + S.tx_raw_bytes, + S.rx_compressed_bytes, + S.tx_compressed_bytes + FROM pg_stat_get_activity(NULL) AS S; + CREATE VIEW pg_stat_replication AS SELECT S.pid, diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 1e6b6db540..84f9b183e8 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -71,6 +71,7 @@ #include <signal.h> #include <fcntl.h> #include <grp.h> +#include <pgstat.h> #include <unistd.h> #include <sys/file.h> #include <sys/socket.h> @@ -88,11 +89,14 @@ #include "common/ip.h" #include "libpq/libpq.h" +#include "libpq/pqformat.h" #include "miscadmin.h" #include "port/pg_bswap.h" #include "storage/ipc.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/builtins.h" +#include "common/zpq_stream.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -118,6 +122,7 @@ */ int Unix_socket_permissions; char *Unix_socket_group; +bool libpq_compression; /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -141,6 +146,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ +static ZpqStream * PqStream; + + /* * Message status */ @@ -183,6 +191,115 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; +static ssize_t +write_compressed(void *arg, void const *data, size_t size) +{ + ssize_t rc = secure_write((Port *) arg, (void *) data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, 0, rc); + return rc; +} + +static ssize_t +read_compressed(void *arg, void *data, size_t size) +{ + ssize_t rc = secure_read((Port *) arg, data, size); + + if (rc > 0) + pgstat_report_network_traffic(0, 0, rc, 0); + return rc; +} + +/* + * Send to the client index of chosen compression algorithm + */ +static void +SendCompressionACK(int algorithm) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'z'); + pq_sendbyte(&buf, (uint8) algorithm); + pq_endmessage(&buf); + pq_flush(); +} + +/* -------------------------------- + * pq_configure - configure connection using port settings + * + * Right now only compression is toggled in the configure. + * Function returns 0 in case of success, non-null in case of error + * -------------------------------- + */ +int +pq_configure(Port *port) +{ + char *client_compression_algorithms = port->compression_algorithms; + + /* + * If client request compression, it sends list of supported compression + * algorithms separated by comma. + */ + if (client_compression_algorithms && libpq_compression) + { + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + int impl = -1; + char **server_compression_algorithms = zpq_get_supported_algorithms(); + int index = -1; + char *protocol_extension = strchr(client_compression_algorithms, ';'); + + /* No protocol extension are currently supported */ + if (protocol_extension) + *protocol_extension = '\0'; + + for (int i = 0; *client_compression_algorithms; i++) + { + char *sep = strchr(client_compression_algorithms, ','); + char *level; + + if (sep != NULL) + *sep = '\0'; + + level = strchr(client_compression_algorithms, ':'); + if (level != NULL) + { + *level = '\0'; /* compression level is ignored now */ + if (sscanf(level + 1, "%d", &compression_level) != 1) + ereport(LOG, + (errmsg("Invalid compression level: %s", level + 1))); + } + for (impl = 0; server_compression_algorithms[impl] != NULL; impl++) + { + if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0) + { + index = i; + goto SendCompressionAck; + } + } + + if (sep != NULL) + client_compression_algorithms = sep + 1; + else + break; + } +SendCompressionAck: + free(server_compression_algorithms); + SendCompressionACK(index); + + if (index >= 0) /* Use compression */ + { + PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0); + if (!PqStream) + { + ereport(LOG, + (errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl]))); + return -1; + } + } + } + return 0; +} /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -283,6 +400,9 @@ socket_close(int code, Datum arg) */ secure_close(MyProcPort); + /* Release compression streams */ + zpq_free(PqStream); + /* * Formerly we did an explicit close() here, but it seems better to * leave the socket open until the process dies. This allows clients @@ -931,12 +1051,15 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * returns 0 if OK, EOF if trouble + * nowait parameter toggles non-blocking mode. + * returns number of read bytes, EOF if trouble * -------------------------------- */ static int -pq_recvbuf(void) +pq_recvbuf(bool nowait) { + int r; + if (PqRecvPointer > 0) { if (PqRecvLength > PqRecvPointer) @@ -952,21 +1075,40 @@ pq_recvbuf(void) } /* Ensure that we're in blocking mode */ - socket_set_nonblocking(false); + socket_set_nonblocking(nowait); /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - int r; - - r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, - PQ_RECV_BUFFER_SIZE - PqRecvLength); + /* + * If streaming compression is enabled then use correspondent + * compression read function. + */ + r = PqStream + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength) + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, + PQ_RECV_BUFFER_SIZE - PqRecvLength); if (r < 0) { + if (r == ZPQ_DECOMPRESS_ERROR) + { + char const *msg = zpq_decompress_error(PqStream); + + if (msg == NULL) + msg = "end of stream"; + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to decompress data: %s", msg))); + return EOF; + } if (errno == EINTR) continue; /* Ok if interrupted */ + if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK)) + return 0; + /* * Careful: an ereport() that tries to write to the client would * cause recursion to here, leading to stack overflow and core @@ -987,7 +1129,8 @@ pq_recvbuf(void) } /* r contains number of bytes read, so just incr length */ PqRecvLength += r; - return 0; + pgstat_report_network_traffic(r, 0, 0, 0); + return r; } } @@ -1002,7 +1145,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1021,7 +1165,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1038,48 +1183,15 @@ pq_peekbyte(void) int pq_getbyte_if_available(unsigned char *c) { - int r; + int r = 0; Assert(PqCommReadingMsg); - if (PqRecvPointer < PqRecvLength) + if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0) { *c = PqRecvBuffer[PqRecvPointer++]; return 1; } - - /* Put the socket into non-blocking mode */ - socket_set_nonblocking(true); - - r = secure_read(MyProcPort, c, 1); - if (r < 0) - { - /* - * Ok if no data available without blocking or interrupted (though - * EINTR really shouldn't happen with a non-blocking socket). Report - * other errors. - */ - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - r = 0; - else - { - /* - * Careful: an ereport() that tries to write to the client would - * cause recursion to here, leading to stack overflow and core - * dump! This message must go *only* to the postmaster log. - */ - ereport(COMMERROR, - (errcode_for_socket_access(), - errmsg("could not receive data from client: %m"))); - r = EOF; - } - } - else if (r == 0) - { - /* EOF detected */ - r = EOF; - } - return r; } @@ -1100,7 +1212,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1134,7 +1247,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1175,7 +1289,8 @@ pq_getstring(StringInfo s) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf()) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } @@ -1425,13 +1540,24 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend) + while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) + + /* + * has more data to flush or unsent data in internal compression + * buffer + */ { int r; + size_t processed = 0; + size_t available = bufend - bufptr; - r = secure_write(MyProcPort, bufptr, bufend - bufptr); + r = PqStream + ? zpq_write(PqStream, bufptr, available, &processed) + : secure_write(MyProcPort, bufptr, available); + bufptr += processed; + PqSendStart += processed; - if (r <= 0) + if (r < 0 || (r == 0 && available)) { if (errno == EINTR) continue; /* Ok if we were interrupted */ @@ -1474,12 +1600,12 @@ internal_flush(void) InterruptPending = 1; return EOF; } + pgstat_report_network_traffic(0, r, 0, 0); last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; PqSendStart += r; } - PqSendStart = PqSendPointer = 0; return 0; } @@ -1496,7 +1622,7 @@ socket_flush_if_writable(void) int res; /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0)) return 0; /* No-op if reentrant call */ @@ -1519,7 +1645,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0)); } /* -------------------------------- @@ -2010,3 +2136,16 @@ pq_settcpusertimeout(int timeout, Port *port) return STATUS_OK; } + +PG_FUNCTION_INFO_V1(pg_compression_algorithm); + +Datum +pg_compression_algorithm(PG_FUNCTION_ARGS) +{ + char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + + if (algorithm_name) + PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); + else + PG_RETURN_NULL(); +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3f24a33ef1..f09c81bedb 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3135,6 +3135,9 @@ pgstat_bestart(void) lbeentry.st_xact_start_timestamp = 0; lbeentry.st_databaseid = MyDatabaseId; + lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes = + lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0; + /* We have userid for client-backends, wal-sender and bgworker processes */ if (lbeentry.st_backendType == B_BACKEND || lbeentry.st_backendType == B_WAL_SENDER @@ -3517,6 +3520,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp) PGSTAT_END_WRITE_ACTIVITY(beentry); } +/* + * Report current transaction start timestamp as the specified value. + * Zero means there is no active transaction. + */ +void +pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!pgstat_track_activities || !beentry) + return; + + /* + * Update my status entry, following the protocol of bumping + * st_changecount before and after. We use a volatile pointer here to + * ensure the compiler doesn't try to get cute. + */ + PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); + + beentry->st_rx_raw_bytes += rx_raw_bytes; + beentry->st_tx_raw_bytes += tx_raw_bytes; + beentry->st_rx_compressed_bytes += rx_compressed_bytes; + beentry->st_tx_compressed_bytes += tx_compressed_bytes; + + PGSTAT_END_WRITE_ACTIVITY(beentry); +} + /* ---------- * pgstat_read_current_status() - * diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 7de27ee4e0..96741ab16c 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -2140,6 +2140,8 @@ retry1: port->database_name = pstrdup(valptr); else if (strcmp(nameptr, "user") == 0) port->user_name = pstrdup(valptr); + else if (strcmp(nameptr, "_pq_.compression") == 0) + port->compression_algorithms = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); else if (strcmp(nameptr, "replication") == 0) @@ -4439,6 +4441,14 @@ BackendInitialize(Port *port) if (status != STATUS_OK) proc_exit(0); + if (pq_configure(port)) + { + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("failed to send compression message: %m"))); + proc_exit(0); + } + /* * Now that we have the user and database name, we can set the process * title for ps. It's good to do this as early as possible in startup. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 5c12a165a1..3a80b34675 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -569,7 +569,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) Datum pg_stat_get_activity(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_ACTIVITY_COLS 30 +#define PG_STAT_GET_ACTIVITY_COLS 34 int num_backends = pgstat_fetch_stat_numbackends(); int curr_backend; int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0); @@ -915,6 +915,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) values[28] = BoolGetDatum(false); /* GSS Encryption not in * use */ } + values[30] = beentry->st_rx_raw_bytes; + values[31] = beentry->st_tx_raw_bytes; + values[32] = beentry->st_rx_compressed_bytes; + values[33] = beentry->st_tx_compressed_bytes; } else { @@ -943,6 +947,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS) nulls[27] = true; nulls[28] = true; nulls[29] = true; + nulls[30] = true; + nulls[31] = true; + nulls[32] = true; + nulls[33] = true; } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -1142,6 +1150,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS) PG_RETURN_TIMESTAMPTZ(result); } +Datum +pg_stat_get_network_traffic(PG_FUNCTION_ARGS) +{ +#define PG_STAT_NETWORK_TRAFFIC_COLS 4 + TupleDesc tupdesc; + Datum values[PG_STAT_NETWORK_TRAFFIC_COLS]; + bool nulls[PG_STAT_NETWORK_TRAFFIC_COLS]; + int32 beid = PG_GETARG_INT32(0); + PgBackendStatus *beentry; + + if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL) + PG_RETURN_NULL(); + else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid)) + PG_RETURN_NULL(); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes", + INT8OID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Fill values and NULLs */ + values[0] = beentry->st_rx_raw_bytes; + values[1] = beentry->st_tx_raw_bytes; + values[2] = beentry->st_rx_compressed_bytes; + values[3] = beentry->st_tx_compressed_bytes; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} Datum pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 17579eeaca..cba4fdd880 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1292,6 +1292,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER, + gettext_noop("Compress client-server traffic."), + NULL + }, + &libpq_compression, + true, + NULL, NULL, NULL + }, + { {"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT, gettext_noop("Logs each checkpoint."), diff --git a/src/common/Makefile b/src/common/Makefile index f624977939..6915cb1858 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -78,7 +78,8 @@ OBJS_COMMON = \ unicode_norm.o \ username.o \ wait_error.o \ - wchar.o + wchar.o \ + zpq_stream.o ifeq ($(with_openssl),yes) OBJS_COMMON += \ diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c new file mode 100644 index 0000000000..0451d2d502 --- /dev/null +++ b/src/common/zpq_stream.c @@ -0,0 +1,684 @@ +#include "postgres_fe.h" +#include "common/zpq_stream.h" +#include "c.h" +#include "pg_config.h" + +/* + * Functions implementing streaming compression algorithm + */ +typedef struct +{ + /* + * Name of compression algorithm. + */ + char const *(*name) (void); + + /* + * Create new compression stream. + * level: compression level + */ + void *(*create_compressor) (int level); + + /* + * Create new decompression stream. + */ + void *(*create_decompressor) (); + + /* + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. + * + * Return codes: ZPQ_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * decompressor internal buffers. + * + * ZPQ_STREAM_END if encountered end of compressed data stream. + * + * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. + */ + ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: ZPQ_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * compressor internal buffers. + * + * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt. + */ + ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + + /* + * Free compression stream created by create_compressor function. + */ + void (*free_compressor) (void *cs); + + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor) (void *ds); + + /* + * Get compressor error message. + */ + char const *(*compress_error) (void *cs); + + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); +} ZpqAlgorithm; + + +#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each + * protocol command and command is mostly + * limited by record length, which in turn + * is usually less than page size (except + * TOAST) + */ + +struct ZpqStream +{ + ZpqAlgorithm const *c_algorithm; + void *c_stream; + + ZpqAlgorithm const *d_algorithm; + void *d_stream; + + char tx_buf[ZPQ_BUFFER_SIZE]; + size_t tx_pos; + size_t tx_size; + + char rx_buf[ZPQ_BUFFER_SIZE]; + size_t rx_pos; + size_t rx_size; + + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void *arg; + + size_t tx_total; + size_t tx_total_raw; + size_t rx_total; + size_t rx_total_raw; + + bool rx_not_flushed; + bool tx_not_flushed; +}; + +#if HAVE_LIBZSTD + +#include <stdlib.h> +#include <zstd.h> + +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ + +typedef struct ZPQ_ZSTD_CStream +{ + ZSTD_CStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_CStream; + +typedef struct ZPQ_ZSTD_DStream +{ + ZSTD_DStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_DStream; + +static void * +zstd_create_compressor(int level) +{ + ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + ZSTD_initCStream(c_stream->stream, level); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); +#endif + c_stream->error = NULL; + return c_stream; +} + +static void * +zstd_create_decompressor() +{ + ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); + + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); +#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3 + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); +#endif + d_stream->error = NULL; + return d_stream; +} + +static ssize_t +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + size_t rc; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + rc = ZSTD_decompressStream(ds->stream, &out, &in); + + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZPQ_DECOMPRESS_ERROR; + } + + if (out.pos == out.size) + { + /* + * if `output.pos == output.size`, there might be some data left + * within internal buffers + */ + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; +} + +static ssize_t +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; + ZSTD_outBuffer out; + + in.src = src; + in.pos = 0; + in.size = src_size; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZPQ_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + if (tx_not_flushed > 0) + { + return ZPQ_DATA_PENDING; + } + } + + return ZPQ_OK; +} + +static void +zstd_free_compressor(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + if (cs != NULL) + { + ZSTD_freeCStream(cs->stream); + free(cs); + } +} + +static void +zstd_free_decompressor(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } +} + +static char const * +zstd_compress_error(void *c_stream) +{ + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + return cs->error; +} + +static char const * +zstd_decompress_error(void *d_stream) +{ + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + return ds->error; +} + +static char const * +zstd_name(void) +{ + return "zstd"; +} + +#endif + +#if HAVE_LIBZ + +#include <stdlib.h> +#include <zlib.h> + + +static void * +zlib_create_compressor(int level) +{ + int rc; + z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); + if (rc != Z_OK) + { + free(c_stream); + return NULL; + } + return c_stream; +} + +static void * +zlib_create_decompressor() +{ + int rc; + z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + free(d_stream); + return NULL; + } + return d_stream; +} + +static ssize_t +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *ds = (z_stream *) d_stream; + int rc; + + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) + { + return ZPQ_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZPQ_DECOMPRESS_ERROR; + } + + return ZPQ_OK; +} + +static ssize_t +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) +{ + z_stream *cs = (z_stream *) c_stream; + int rc; + unsigned deflate_pending = 0; + + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left + * in deflate buffer */ + if (deflate_pending > 0) + { + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; +} + +static void +zlib_free_compressor(void *c_stream) +{ + z_stream *cs = (z_stream *) c_stream; + + if (cs != NULL) + { + deflateEnd(cs); + free(cs); + } +} + +static void +zlib_free_decompressor(void *d_stream) +{ + z_stream *ds = (z_stream *) d_stream; + + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } +} + +static char const * +zlib_error(void *stream) +{ + z_stream *zs = (z_stream *) stream; + + return zs->msg; +} + +static char const * +zlib_name(void) +{ + return "zlib"; +} + +#endif + +static char const * +no_compression_name(void) +{ + return NULL; +} + +/* + * Array with all supported compression algorithms. + */ +static ZpqAlgorithm const zpq_algorithms[] = +{ +#if HAVE_LIBZSTD + {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error}, +#endif +#if HAVE_LIBZ + {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error}, +#endif + {no_compression_name} +}; + +/* + * Index of used compression algorithm in zpq_algorithms array. + */ +ZpqStream * +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) +{ + ZpqStream *zs = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zs->tx_pos = 0; + zs->tx_size = 0; + zs->rx_pos = 0; + zs->rx_size = 0; + zs->tx_func = tx_func; + zs->rx_func = rx_func; + zs->arg = arg; + zs->tx_total = 0; + zs->tx_total_raw = 0; + zs->rx_total = 0; + zs->rx_total_raw = 0; + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + zs->rx_size = rx_data_size; + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zs->rx_buf, rx_data, rx_data_size); + + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) + { + free(zs); + return NULL; + } + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) + { + free(zs); + return NULL; + } + + return zs; +} + +ssize_t +zpq_read(ZpqStream * zs, void *buf, size_t size) +{ + size_t buf_pos = 0; + size_t rx_processed; + size_t buf_processed; + ssize_t rc; + + while (buf_pos == 0) + { /* Read until some data fetched */ + if (zs->rx_pos == zs->rx_size) + { + zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ + } + + if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) + { + ssize_t rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); + + if (rc > 0) /* read fetches some data */ + { + zs->rx_size += rc; + zs->rx_total += rc; + } + else /* read failed */ + { + return rc; + } + } + + Assert(zs->rx_pos <= zs->rx_size); + rx_processed = 0; + buf_processed = 0; + rc = zs->d_algorithm->decompress(zs->d_stream, + (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, + buf, size, &buf_processed); + + zs->rx_pos += rx_processed; + zs->rx_total_raw += rx_processed; + buf_pos += buf_processed; + zs->rx_not_flushed = false; + if (rc == ZPQ_STREAM_END) + { + break; + } + if (rc == ZPQ_DATA_PENDING) + { + zs->rx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + return ZPQ_DECOMPRESS_ERROR; + } + } + return buf_pos; +} + +ssize_t +zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed) +{ + size_t buf_pos = 0; + + do + { + if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ + { + size_t tx_processed = 0; + size_t buf_processed = 0; + ssize_t rc; + + zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning of buffer */ + + rc = zs->c_algorithm->compress(zs->c_stream, + (char *) buf + buf_pos, size - buf_pos, &buf_processed, + (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); + + zs->tx_size += tx_processed; + buf_pos += buf_processed; + zs->tx_total_raw += buf_processed; + zs->tx_not_flushed = false; + + if (rc == ZPQ_DATA_PENDING) + { + zs->tx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + *processed = buf_pos; + return ZPQ_COMPRESS_ERROR; + } + } + while (zs->tx_pos < zs->tx_size) + { + ssize_t rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); + + if (rc > 0) + { + zs->tx_pos += rc; + zs->tx_total += rc; + } + else + { + *processed = buf_pos; + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (buf_pos < size || zs->tx_not_flushed); + + return buf_pos; +} + +void +zpq_free(ZpqStream * zs) +{ + if (zs) + { + if (zs->c_stream) + { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) + { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); + } +} + +char const * +zpq_compress_error(ZpqStream * zs) +{ + return zs->c_algorithm->compress_error(zs->c_stream); +} + +char const * +zpq_decompress_error(ZpqStream * zs) +{ + return zs->d_algorithm->decompress_error(zs->d_stream); +} + +size_t +zpq_buffered_rx(ZpqStream * zs) +{ + return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0; +} + +size_t +zpq_buffered_tx(ZpqStream * zs) +{ + return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0; +} + +/* + * Get list of the supported algorithms. + */ +char ** +zpq_get_supported_algorithms(void) +{ + size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + char **algorithm_names = malloc(n_algorithms * sizeof(char *)); + + for (size_t i = 0; i < n_algorithms; i++) + { + algorithm_names[i] = (char *) zpq_algorithms[i].name(); + } + + return algorithm_names; +} + +char const * +zpq_compress_algorithm_name(ZpqStream * zs) +{ + return zs ? zs->c_algorithm->name() : NULL; +} + +char const * +zpq_decompress_algorithm_name(ZpqStream * zs) +{ + return zs ? zs->d_algorithm->name() : NULL; +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d7b55f57ea..845d317a0d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5236,9 +5236,9 @@ proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'int4', - proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}', + proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', prosrc => 'pg_stat_get_activity' }, { oid => '3318', descr => 'statistics: information about progress of backends running maintenance command', @@ -5505,6 +5505,14 @@ proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,stats_reset}', prosrc => 'pg_stat_get_wal' }, +{ oid => '1137', descr => 'statistics: information about network traffic', + proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's', + proparallel => 'r', prorettype => 'record', proargtypes => 'int4', + proallargtypes => '{int4,int8,int8,int8,int8}', + proargmodes => '{i,o,o,o,o}', + proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}', + prosrc => 'pg_stat_get_network_traffic' }, + { oid => '2306', descr => 'statistics: information about SLRU caches', proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5636,6 +5644,10 @@ proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text', proargtypes => 'oid', prosrc => 'pg_tablespace_location' }, +{ oid => '9257', descr => 'connection compression algorithm', + proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text', + proargtypes => '', prosrc => 'pg_compression_algorithm' }, + { oid => '1946', descr => 'convert bytea value into some ascii-only text string', proname => 'encode', prorettype => 'text', proargtypes => 'bytea text', diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h new file mode 100644 index 0000000000..e48b5ac930 --- /dev/null +++ b/src/include/common/zpq_stream.h @@ -0,0 +1,94 @@ +/* + * zpq_stream.h + * Streaming compression for libpq + */ + +#ifndef ZPQ_STREAM_H +#define ZPQ_STREAM_H + +#include <stdlib.h> + +#define ZPQ_OK (0) +#define ZPQ_IO_ERROR (-1) +#define ZPQ_DECOMPRESS_ERROR (-2) +#define ZPQ_COMPRESS_ERROR (-3) +#define ZPQ_STREAM_END (-4) +#define ZPQ_DATA_PENDING (-5) + +#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) + +struct ZpqStream; +typedef struct ZpqStream ZpqStream; + +typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); +typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); + +/* + * Create compression stream with rx/tx function for reading/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + * tx_func: function for writing compressed data in underlying stream + * rx_func: function for reading compressed data from underlying stream + * arg: context passed to the function + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + */ +extern ZpqStream *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ +extern ssize_t zpq_read(ZpqStream * zs, void *buf, size_t size); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In the last case number of bytes written is stored in *processed. + */ +extern ssize_t zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed); + +/* + * Get decompressor error message. + */ +extern char const *zpq_decompress_error(ZpqStream * zs); + +/* + * Get compressor error message. + */ +extern char const *zpq_compress_error(ZpqStream * zs); + +/* + * Return an estimated amount of data in internal rx decompression buffer. + */ +extern size_t zpq_buffered_rx(ZpqStream * zs); + +/* + * Return an estimated amount of data in internal tx compression buffer. + */ +extern size_t zpq_buffered_tx(ZpqStream * zs); + +/* + * Free stream created by zpq_create function. + */ +extern void zpq_free(ZpqStream * zs); + +/* + * Get the name of chosen compression algorithm. + */ +extern char const *zpq_compress_algorithm_name(ZpqStream * zs); + +/* + * Get the name of chosen decompression algorithm. + */ +extern char const *zpq_decompress_algorithm_name(ZpqStream * zs); + +/* + Returns zero terminated array with compression algorithms names +*/ +extern char **zpq_get_supported_algorithms(void); + +#endif diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 66a8673d93..439904bc27 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -170,6 +170,9 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; + char *compression_algorithms; /* Compression algorithms supported by + * client */ + /* * GSSAPI structures. */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index a55898c85a..d502dbca85 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern void pq_startmsgread(void); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index a86b895b26..beca9f7c35 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -159,6 +159,7 @@ typedef struct StartupPacket } StartupPacket; extern bool Db_user_namespace; +extern bool libpq_compression; /* * In protocol 3.0 and later, the startup packet length is not fixed, but diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index f4d9f3b408..049b8f31df 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -349,6 +349,9 @@ /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if the system has the type `locale_t'. */ #undef HAVE_LOCALE_T diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c38b689710..366b0f4507 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1197,6 +1197,12 @@ typedef struct PgBackendStatus /* application name; MUST be null-terminated */ char *st_appname; + /* client-server traffic information */ + uint64 st_rx_raw_bytes; + uint64 st_tx_raw_bytes; + uint64 st_rx_compressed_bytes; + uint64 st_tx_compressed_bytes; + /* * Current command string; MUST be null-terminated. Note that this string * possibly is truncated in the middle of a multi-byte character. As @@ -1410,6 +1416,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str); extern void pgstat_report_tempfile(size_t filesize); extern void pgstat_report_appname(const char *appname); extern void pgstat_report_xact_timestamp(TimestampTz tstamp); +extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes); extern const char *pgstat_get_wait_event(uint32 wait_event_info); extern const char *pgstat_get_wait_event_type(uint32 wait_event_info); extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser); diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index c4fde3f93d..21a1dededd 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -29,6 +29,20 @@ endif # The MSVC build system scrapes OBJS from this file. If you change any of # the conditional additions of files to OBJS, update Mkvcbuild.pm to match. +ifeq ($(with_zstd),yes) +LIBS += -lzstd +SHLIB_LINK += -lzstd +endif + +ifeq ($(with_zlib),yes) +LIBS += -lz +SHLIB_LINK += -lz +endif + +# We can't use Makefile variables here because the MSVC build system scrapes +# OBJS from this file. + + OBJS = \ $(WIN32RES) \ fe-auth-scram.o \ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index a834ce8cf0..07495bdf13 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -24,6 +24,7 @@ #include "common/ip.h" #include "common/link-canary.h" #include "common/scram-common.h" +#include "common/zpq_stream.h" #include "common/string.h" #include "fe-auth.h" #include "libpq-fe.h" @@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Replication", "D", 5, offsetof(struct pg_conn, replication)}, + {"compression", "PGCOMPRESSION", NULL, NULL, + "Libpq-compression", "", 16, + offsetof(struct pg_conn, compression)}, + {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, "Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */ @@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; void pqDropConnection(PGconn *conn, bool flushInput) { + /* Release compression streams */ + zpq_free(conn->zstream); + conn->zstream = NULL; + /* Drop any SSL state */ pqsecure_close(conn); @@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn) return 1; /* success! */ case PGRES_POLLING_READING: + /* if there is some buffered RX data in ZpqStream + * then don't proceed to pqWaitTimed */ + if (zpq_buffered_rx(conn->zstream)) { + break; + } + ret = pqWaitTimed(1, 0, conn, finish_time); if (ret == -1) { @@ -3221,11 +3236,78 @@ keep_going: /* We will come back to here until there is */ conn->inCursor = conn->inStart; - /* Read type byte */ - if (pqGetc(&beresp, conn)) + while (1) { - /* We'll come back when there is more data */ - return PGRES_POLLING_READING; + /* Read type byte */ + if (pqGetc(&beresp, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + + if (beresp == 'z') /* Switch on compression */ + { + int index; + char resp; + /* Read message length word */ + if (pqGetInt(&msgLength, 4, conn)) + { + /* We'll come back when there is more data */ + return PGRES_POLLING_READING; + } + if (msgLength != 5) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "expected compression algorithm specification message length is 5 bytes, but %d is received\n"), + msgLength); + goto error_return; + } + pqGetc(&resp, conn); + index = resp; + if (index == (char)-1) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server does not support requested compression algorithms %s\n"), + conn->compression); + goto error_return; + } + if ((unsigned)index >= conn->n_compressors) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server returns incorrect compression aslogirhm index: %d\n"), + index); + goto error_return; + } + Assert(!conn->zstream); + conn->zstream = zpq_create(conn->compressors[index].impl, + conn->compressors[index].level, + conn->compressors[index].impl, + (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn, + &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor); + if (!conn->zstream) + { + char** supported_algorithms = zpq_get_supported_algorithms(); + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "failed to initialize compressor %s\n"), + supported_algorithms[conn->compressors[index].impl]); + free(supported_algorithms); + goto error_return; + } + /* reset buffer */ + conn->inStart = conn->inCursor = conn->inEnd = 0; + } + else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol version */ + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext( + "server is not supporting libpq compression\n")); + goto error_return; + } else + break; } /* @@ -4024,6 +4106,8 @@ freePGconn(PGconn *conn) free(conn->dbName); if (conn->replication) free(conn->replication); + if (conn->compression) + free(conn->compression); if (conn->pguser) free(conn->pguser); if (conn->pgpass) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index d48f0fd587..5fd099015f 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn) * EOF indication. We expect therefore that this won't result in any * undue delay in reporting a previous write failure.) */ - if (flushResult || - pqWait(true, false, conn) || + if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 && + pqWait(true, false, conn)) || pqReadData(conn) < 0) { /* diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 6094f048f3..18ee6053b4 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -53,12 +53,24 @@ #include "pg_config_paths.h" #include "port/pg_bswap.h" +#include <common/zpq_stream.h> + static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn); static int pqSendSome(PGconn *conn, int len); static int pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time); static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time); +/* + * Use zpq_read if compression is switched on + */ +#define pq_read_conn(conn) \ + (conn->zstream \ + ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd) \ + : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \ + conn->inBufSize - conn->inEnd)) + /* * PQlibVersion: return the libpq version number */ @@ -664,10 +676,17 @@ pqReadData(PGconn *conn) /* OK, try to read some data */ retry3: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); if (nread < 0) { + if (nread == ZPQ_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -759,10 +778,18 @@ retry3: * arrived. */ retry4: - nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd, - conn->inBufSize - conn->inEnd); + nread = pq_read_conn(conn); + if (nread < 0) { + if (nread == ZPQ_DECOMPRESS_ERROR) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); + return -1; + } + switch (SOCK_ERRNO) { case EINTR: @@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0) + while (len > 0 || zpq_buffered_tx(conn->zstream)) { int sent; - + size_t processed = 0; + /* + * Use zpq_write if compression is switched on + */ + sent = conn->zstream + ? zpq_write(conn->zstream, ptr, len, &processed) #ifndef WIN32 - sent = pqsecure_write(conn, ptr, len); + : pqsecure_write(conn, ptr, len); #else /* @@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len) * failure-point appears to be different in different versions of * Windows, but 64k should always be safe. */ - sent = pqsecure_write(conn, ptr, Min(len, 65536)); + : pqsecure_write(conn, ptr, Min(len, 65536)); #endif + ptr += processed; + len -= processed; + remaining -= processed; if (sent < 0) { @@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0) + if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream)) { /* * We didn't send it all, wait till we can send more. @@ -1034,6 +1069,8 @@ pqFlush(PGconn *conn) int pqWait(int forRead, int forWrite, PGconn *conn) { + if (forRead && conn->inCursor < conn->inEnd) + return 0; return pqWaitTimed(forRead, forWrite, conn, (time_t) -1); } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index a4d6ee2674..d50dd9a8fd 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) if (async) return 0; /* Need to load more data */ - if (pqWait(true, false, conn) || + if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || pqReadData(conn) < 0) return -2; continue; @@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen) while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0) { /* need to load more data */ - if (pqWait(true, false, conn) || + if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || pqReadData(conn) < 0) { *s = '\0'; @@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid, if (needInput) { /* Wait for some data to arrive (or for the channel to close) */ - if (pqWait(true, false, conn) || + if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) || pqReadData(conn) < 0) break; } @@ -2134,6 +2134,154 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen, return startpacket; } +/* + * Build comma-separated list of compression algorithms requested by client. + * It can be either explicitly specified by user in connection string, or + * include all algorithms supported by client library. + * This function returns true if the compression string is successfully parsed and + * stores a comma-separated list of algorithms in *client_compressors. + * If compression is disabled, then NULL is assigned to *client_compressors. + * Also it creates an array of compressor descriptors, each element of which corresponds to + * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn + * and is used during handshake when a compression acknowledgment response is received from the server. + */ +static bool +build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors) +{ + char** supported_algorithms = zpq_get_supported_algorithms(); + char* value = conn->compression; + int n_supported_algorithms; + int total_len = 0; + int i; + + for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++) + { + total_len += strlen(supported_algorithms[n_supported_algorithms])+1; + } + + if (pg_strcasecmp(value, "true") == 0 || + pg_strcasecmp(value, "yes") == 0 || + pg_strcasecmp(value, "on") == 0 || + pg_strcasecmp(value, "any") == 0 || + pg_strcasecmp(value, "1") == 0) + { + /* Compression is enabled: choose algorithm automatically */ + char* p; + + if (n_supported_algorithms == 0) + { + *client_compressors = NULL; /* no compressors are available */ + conn->compressors = NULL; + conn->n_compressors = 0; + return true; + } + *client_compressors = p = malloc(total_len); + if (build_descriptors) + conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor)); + for (i = 0; i < n_supported_algorithms; i++) + { + strcpy(p, supported_algorithms[i]); + p += strlen(p); + *p++ = ','; + if (build_descriptors) + { + conn->compressors[i].impl = i; + conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + } + } + p[-1] = '\0'; + return true; + } + else if (*value == 0 || + pg_strcasecmp(value, "false") == 0 || + pg_strcasecmp(value, "no") == 0 || + pg_strcasecmp(value, "off") == 0 || + pg_strcasecmp(value, "0") == 0) + { + /* Compression is disabled */ + *client_compressors = NULL; + conn->compressors = NULL; + conn->n_compressors = 0; + return true; + } + else + { + /* List of compression algorithms separated by commas */ + char *src, *dst; + int n_suggested_algorithms = 0; + char* suggested_algorithms = strdup(value); + src = suggested_algorithms; + *client_compressors = dst = strdup(value); + + if (build_descriptors) + conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor)); + + while (*src != '\0') + { + char* sep = strchr(src, ','); + char* col; + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + + if (sep != NULL) + *sep = '\0'; + + strcpy(dst, src); + + col = strchr(src, ':'); + if (col != NULL) + { + *col = '\0'; + if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors) + { + fprintf(stderr, + libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"), + col+1, value); + return false; + } + } + for (i = 0; supported_algorithms[i] != NULL; i++) + { + if (pg_strcasecmp(src, supported_algorithms[i]) == 0) + { + if (build_descriptors) + { + conn->compressors[n_suggested_algorithms].impl = i; + conn->compressors[n_suggested_algorithms].level = compression_level; + } + n_suggested_algorithms += 1; + dst += strlen(dst); + *dst++ = ','; + break; + } + } + if (sep) + src = sep+1; + else + break; + } + free(suggested_algorithms); + conn->n_compressors = n_suggested_algorithms; + if (n_suggested_algorithms == 0) + { + if (!build_descriptors) + fprintf(stderr, + libpq_gettext("WARNING: none of the specified algorithms are supported by client: %s\n"), + value); + else + { + free(conn->compressors); + conn->compressors = NULL; + conn->n_compressors = 0; + } + free(*client_compressors); + *client_compressors = NULL; + return false; + } + dst[-1] = '\0'; + return true; + } +} + /* * Build a startup packet given a filled-in PGconn structure. * @@ -2180,6 +2328,17 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); + if (conn->compression && conn->compression[0]) + { + char* client_compression_algorithms; + if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL)) + { + if (client_compression_algorithms) + { + ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms); + } + } + } if (conn->send_appname) { /* Use appname if present, otherwise use fallback */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index e1018adb9e..88fbd0c371 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -40,6 +40,7 @@ /* include stuff common to fe and be */ #include "getaddrinfo.h" #include "libpq/pqcomm.h" +#include "common/zpq_stream.h" /* include stuff found in fe only */ #include "pqexpbuffer.h" @@ -317,6 +318,16 @@ typedef struct pg_conn_host * found in password file. */ } pg_conn_host; + +/* + * Descriptors of compression algorithms chosen by client + */ +typedef struct pg_conn_compressor +{ + int impl; /* compression implementation index */ + int level; /* compression level */ +} pg_conn_compressor; + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -370,6 +381,13 @@ struct pg_conn char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + pg_conn_compressor *compressors; /* descriptors of compression + * algorithms chosen by client */ + unsigned n_compressors; /* size of compressors array */ + /* Type of connection to make. Possible values: any, read-write. */ char *target_session_attrs; @@ -527,6 +545,9 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* Compression stream */ + ZpqStream *zstream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a687e99d1e..93d6346a85 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid, s.backend_xmin, s.query, s.backend_type - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) LEFT JOIN pg_database d ON ((s.datid = d.oid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_all_indexes| SELECT c.oid AS relid, @@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, s.gss_princ AS principal, s.gss_enc AS encrypted - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); +pg_stat_network_traffic| SELECT s.pid, + s.rx_raw_bytes, + s.tx_raw_bytes, + s.rx_compressed_bytes, + s.tx_compressed_bytes + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, @@ -2024,7 +2030,7 @@ pg_stat_replication| SELECT s.pid, w.sync_priority, w.sync_state, w.reply_time - FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) + FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, @@ -2055,7 +2061,7 @@ pg_stat_ssl| SELECT s.pid, s.ssl_client_dn AS client_dn, s.ssl_client_serial AS client_serial, s.ssl_issuer_dn AS issuer_dn - FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) + FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes) WHERE (s.client_port IS NOT NULL); pg_stat_subscription| SELECT su.oid AS subid, su.subname, diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 7f014a12c9..fd9efc1b64 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -125,7 +125,7 @@ sub mkvcbuild keywords.c kwlookup.c link-canary.c md5_common.c pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c - wait_error.c wchar.c); + wait_error.c wchar.c zpq_stream.c); if ($solution->{options}->{openssl}) { -- 2.26.2