On 02.11.2020 19:32, Daniil Zakhlystov wrote:
Hi,
Currently, zpq_stream contains a check only for the tx buffered data -
zpq_buffered().
I think that there should be the same functionality for rx buffered
data. For example, the zpq_buffered_rx().
zpq_buffered_rx() returns a value greater than zero if there is any
data that was fetched by rx_func() but haven't been decompressed yet,
in any other case zpq_buffered_rx() returns zero.
In this case, I think that we may also rename the existing
zpq_buffered() to zpq_buffered_tx() for clarity.
--
Daniil Zakhlystov
Please try the attached patch v24 which adds zpq_buffered_rx and
zpq_buffered_tx functions.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/configure b/configure
index ace4ed5..deba608 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
LDFLAGS_SL
LDFLAGS_EX
with_zlib
+with_zstd
with_system_tzdata
with_libxslt
XML2_LIBS
@@ -867,6 +868,7 @@ with_libxml
with_libxslt
with_system_tzdata
with_zlib
+with_zstd
with_gnu_ld
enable_largefile
'
@@ -8571,6 +8573,85 @@ fi
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+ withval=$with_zstd;
+ case $withval in
+ yes)
+ ;;
+ no)
+ :
+ ;;
+ *)
+ as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+ ;;
+ esac
+
+else
+ with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zstd_ZSTD_compress=yes
+else
+ ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+ LIBS="-lzstd $LIBS"
+
+else
+ as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
#
# Zlib
diff --git a/configure.ac b/configure.ac
index 5b91c83..93a5285 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1000,6 +1000,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
AC_SUBST(with_zlib)
#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+ [use zstd])
+AC_SUBST(with_zstd)
+
+#
# Assignments
#
@@ -1186,6 +1193,14 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+ [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
if test "$enable_spinlocks" = yes; then
AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
else
@@ -1400,6 +1415,13 @@ failure. It is possible the compiler isn't looking in the proper directory.
Use --without-zlib to disable zlib support.])])
fi
+if test "$with_zstd" = yes; then
+ AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure. It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
if test "$with_gssapi" = yes ; then
AC_CHECK_HEADERS(gssapi/gssapi.h, [],
[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ce32fb..140724d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,22 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
</listitem>
</varlistentry>
+ <varlistentry id="libpq-connect-compression" xreflabel="compression">
+ <term><literal>compression</literal></term>
+ <listitem>
+ <para>
+ Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+ message and it is up to the client whether to continue work without compression or report error.
+ Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option). In both cases streaming mode is used.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
<term><literal>client_encoding</literal></term>
<listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 9a95d7b..6225ec7 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
such as <command>COPY</command>.
</para>
+ <para>
+ It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+ Compression is especial useful for importing/exporting data to/from database using COPY command
+ and for replication (both physical and logical). Also compression can reduce server response time
+ in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...)
+ Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+ configured with --with-zstd option).
+ </para>
+
<sect2 id="protocol-message-concepts">
<title>Messaging Overview</title>
@@ -263,6 +272,21 @@
</varlistentry>
<varlistentry>
+ <term>CompressionAck</term>
+ <listitem>
+ <para>
+ Server acknowledges using compression for client-server communication protocol.
+ Compression can be requested by client by including "compression" option in connection string.
+ Client sends to the server list of compression algorithms, supported by client library
+ (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib,...).
+ If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and
+ visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
+ algorithm identifier and it is up to the client whether to continue work without compression or report error.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term>AuthenticationOk</term>
<listitem>
<para>
@@ -3396,6 +3420,56 @@ following:
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+ Byte1('z')
+</term>
+<listitem>
+<para>
+ Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library.
+ If server supports one of this algorithms, then it responds with CompressionAck with identifier (letter) of first such algorithm.
+ If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression) algorithm.
+ It is up to the client whether to continue work without compression or report error.
+ After receiving this message with algorithm identifier other than 'n', both server and client are switched to compression mode
+ and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Int32
+</term>
+<listitem>
+<para>
+ Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+ Byte1
+</term>
+<listitem>
+<para>
+ Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no compression.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
<varlistentry>
<term>
@@ -5962,6 +6036,19 @@ StartupMessage (F)
</para>
</listitem>
</varlistentry>
+<varlistentry>
+<term>
+ <literal>compression</literal>
+</term>
+<listitem>
+<para>
+ Request compression of libpq traffic. Value is list of compression algorithms supported by client:
+ <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib, <literal>'n'</literal> - no compression.
+ By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+ <ulink url="https://en.wikipedia.org/wiki/CRIME">CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
</variablelist>
In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9a..9e11599 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm = @with_llvm@
with_system_tzdata = @with_system_tzdata@
with_uuid = @with_uuid@
with_zlib = @with_zlib@
+with_zstd = @with_zstd@
enable_rpath = @enable_rpath@
enable_nls = @enable_nls@
enable_debug = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95..f32a780 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
LIBS += -lsystemd
endif
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
##########################################################################
all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c6dd084..6b69d77 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
LEFT JOIN pg_database AS D ON (S.datid = D.oid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
+CREATE VIEW pg_stat_network_traffic AS
+ SELECT
+ S.pid,
+ S.rx_raw_bytes,
+ S.tx_raw_bytes,
+ S.rx_compressed_bytes,
+ S.tx_compressed_bytes
+ FROM pg_stat_get_activity(NULL) AS S;
+
CREATE VIEW pg_stat_replication AS
SELECT
S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index ac986c0..73d4230 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
#include <signal.h>
#include <fcntl.h>
#include <grp.h>
+#include <pgstat.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/socket.h>
@@ -93,6 +94,7 @@
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
+#include "common/zpq_stream.h"
/*
* Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -141,6 +143,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+static ZpqStream* PqStream;
+
+
/*
* Message status
*/
@@ -183,6 +188,86 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
WaitEventSet *FeBeWaitSet;
+static ssize_t write_compressed(void* arg, void const* data, size_t size)
+{
+ ssize_t rc = secure_write((Port*)arg, (void*)data, size);
+ if (rc > 0)
+ pgstat_report_network_traffic(0, 0, 0, rc);
+ return rc;
+}
+
+static ssize_t read_compressed(void* arg, void* data, size_t size)
+{
+ ssize_t rc = secure_read((Port*)arg, data, size);
+ if (rc > 0)
+ pgstat_report_network_traffic(0, 0, rc, 0);
+ return rc;
+}
+
+
+/* --------------------------------
+ * pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+ char* client_compression_algorithms = port->compression_algorithms;
+ /*
+ * If client request compression, it sends list of supported compression algorithms.
+ * Each compression algorithm is identified by one letter ('f' - Facebook zsts, 'z' - zlib)
+ */
+ if (client_compression_algorithms)
+ {
+ char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ char compression_algorithm = ZPQ_NO_COMPRESSION;
+ char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
+ int impl;
+ int rc;
+
+ /* Get list of compression algorithms, supported by server */
+ zpq_get_supported_algorithms(server_compression_algorithms);
+
+ /* Intersect lists */
+ while (*client_compression_algorithms != '\0')
+ {
+ if (strchr(server_compression_algorithms, *client_compression_algorithms))
+ {
+ compression_algorithm = *client_compression_algorithms;
+ break;
+ }
+ client_compression_algorithms += 1;
+ }
+
+ compression[5] = compression_algorithm;
+ /* Send 'z' message to the client with selected compression algorithm ('n' if match is not found) */
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
+ && errno == EINTR);
+ if ((size_t)rc != sizeof(compression))
+ return -1;
+
+ /* initialize compression */
+ impl = zpq_get_algorithm_impl(compression_algorithm);
+ if (impl < 0)
+ {
+ ereport(LOG,
+ (errmsg("Requested algorithm %c is not supported", compression_algorithm)));
+ return -1;
+ }
+ PqStream = zpq_create(impl, write_compressed, read_compressed, MyProcPort, NULL, 0);
+ if (!PqStream)
+ {
+ ereport(LOG,
+ (errmsg("Failed to initialize compressor %c(%d)", compression_algorithm, impl)));
+ return -1;
+ }
+ }
+ return 0;
+}
/* --------------------------------
* pq_init - initialize libpq at backend startup
@@ -280,6 +365,9 @@ socket_close(int code, Datum arg)
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
+ /* Release compression streams */
+ zpq_free(PqStream);
+
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +1007,15 @@ socket_set_nonblocking(bool nonblocking)
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
- * returns 0 if OK, EOF if trouble
+ * nowait parameter toggles non-blocking mode.
+ * returns number of read bytes, EOF if trouble
* --------------------------------
*/
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ int r;
+
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1031,36 @@ pq_recvbuf(void)
}
/* Ensure that we're in blocking mode */
- socket_set_nonblocking(false);
+ socket_set_nonblocking(nowait);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
- int r;
-
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ /* If streaming compression is enabled then use correspondent compression read function. */
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ char const* msg = zpq_error(PqStream);
+ if (msg == NULL)
+ msg = "end of stream";
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to decompress data: %s", msg)));
+ return EOF;
+ }
if (errno == EINTR)
continue; /* Ok if interrupted */
+ if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return 0;
+
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
@@ -975,7 +1081,8 @@ pq_recvbuf(void)
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
- return 0;
+ pgstat_report_network_traffic(r, 0, 0, 0);
+ return r;
}
}
@@ -990,7 +1097,7 @@ pq_getbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1116,7 @@ pq_peekbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1026,48 +1133,15 @@ pq_peekbyte(void)
int
pq_getbyte_if_available(unsigned char *c)
{
- int r;
+ int r = 0;
Assert(PqCommReadingMsg);
- if (PqRecvPointer < PqRecvLength)
+ if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
- /* Put the socket into non-blocking mode */
- socket_set_nonblocking(true);
-
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
- {
- /*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket). Report
- * other errors.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client would
- * cause recursion to here, leading to stack overflow and core
- * dump! This message must go *only* to the postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
- r = EOF;
- }
-
return r;
}
@@ -1088,7 +1162,7 @@ pq_getbytes(char *s, size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1196,7 @@ pq_discardbytes(size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1237,7 @@ pq_getstring(StringInfo s)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
@@ -1413,13 +1487,19 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+ /* has more data to flush or unsent data in internal compression buffer */
{
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
+ bufptr += processed;
+ PqSendStart += processed;
+
+ if (r < 0 || (r == 0 && available))
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
@@ -1462,12 +1542,12 @@ internal_flush(void)
InterruptPending = 1;
return EOF;
}
+ pgstat_report_network_traffic(0, r, 0, 0);
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
PqSendStart += r;
}
-
PqSendStart = PqSendPointer = 0;
return 0;
}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0eb..33aee92 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3112,6 +3112,9 @@ pgstat_bestart(void)
lbeentry.st_xact_start_timestamp = 0;
lbeentry.st_databaseid = MyDatabaseId;
+ lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+ lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
/* We have userid for client-backends, wal-sender and bgworker processes */
if (lbeentry.st_backendType == B_BACKEND
|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3493,6 +3496,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+ volatile PgBackendStatus *beentry = MyBEEntry;
+
+ if (!pgstat_track_activities || !beentry)
+ return;
+
+ /*
+ * Update my status entry, following the protocol of bumping
+ * st_changecount before and after. We use a volatile pointer here to
+ * ensure the compiler doesn't try to get cute.
+ */
+ PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+ beentry->st_rx_raw_bytes += rx_raw_bytes;
+ beentry->st_tx_raw_bytes += tx_raw_bytes;
+ beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+ beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+ PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
/* ----------
* pgstat_read_current_status() -
*
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 959e3b8..60ff1e7 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2151,6 +2151,8 @@ retry1:
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
+ else if (strcmp(nameptr, "compression") == 0)
+ port->compression_algorithms = pstrdup(valptr);
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
@@ -4458,6 +4460,14 @@ BackendInitialize(Port *port)
if (status != STATUS_OK)
proc_exit(0);
+ if (pq_configure(port))
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to send compression message: %m")));
+ proc_exit(0);
+ }
+
/*
* Now that we have the user and database name, we can set the process
* title for ps. It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 472fa59..eef4ead 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum
pg_stat_get_activity(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_ACTIVITY_COLS 30
+#define PG_STAT_GET_ACTIVITY_COLS 34
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
values[28] = BoolGetDatum(false); /* GSS Encryption not in
* use */
}
+ values[30] = beentry->st_rx_raw_bytes;
+ values[31] = beentry->st_tx_raw_bytes;
+ values[32] = beentry->st_rx_compressed_bytes;
+ values[33] = beentry->st_tx_compressed_bytes;
}
else
{
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
nulls[27] = true;
nulls[28] = true;
nulls[29] = true;
+ nulls[30] = true;
+ nulls[31] = true;
+ nulls[32] = true;
+ nulls[33] = true;
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
PG_RETURN_TIMESTAMPTZ(result);
}
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS 4
+ TupleDesc tupdesc;
+ Datum values[PG_STAT_NETWORK_TRAFFIC_COLS];
+ bool nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+ int32 beid = PG_GETARG_INT32(0);
+ PgBackendStatus *beentry;
+
+ if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+ PG_RETURN_NULL();
+ else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+ PG_RETURN_NULL();
+
+ /* Initialise values and NULL flags arrays */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ /* Initialise attributes information in the tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+ INT8OID, -1, 0);
+ BlessTupleDesc(tupdesc);
+
+ /* Fill values and NULLs */
+ values[0] = beentry->st_rx_raw_bytes;
+ values[1] = beentry->st_tx_raw_bytes;
+ values[2] = beentry->st_rx_compressed_bytes;
+ values[3] = beentry->st_tx_compressed_bytes;
+
+ /* Returns the record as Datum */
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
Datum
pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd..bc6cba8 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
unicode_norm.o \
username.o \
wait_error.o \
- wchar.o
+ wchar.o \
+ zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..411fa28
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,550 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+ /*
+ * Returns letter identifying compression algorithm.
+ */
+ char (*name)(void);
+
+ /*
+ * Create compression stream with using rx/tx function for fetching/sending compressed data.
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for receiving compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+ ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size);
+
+ /*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
+ */
+ ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
+
+ /*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code returned by tx function.
+ * In the last case amount of written raw bytes is stored in *processed.
+ */
+ ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
+
+ /*
+ * Free stream created by create function.
+ */
+ void (*free)(ZpqStream *zs);
+
+ /*
+ * Get error message.
+ */
+ char const* (*error)(ZpqStream *zs);
+
+ /*
+ * Returns amount of data in internal tx compression buffer.
+ */
+ size_t (*buffered_tx)(ZpqStream *zs);
+
+ /*
+ * Returns amount of data in internal rx compression buffer.
+ */
+ size_t (*buffered_rx)(ZpqStream *zs);
+} ZpqAlgorithm;
+
+struct ZpqStream
+{
+ ZpqAlgorithm const* algorithm;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+typedef struct ZstdStream
+{
+ ZpqStream common;
+ ZSTD_CStream* tx_stream;
+ ZSTD_DStream* rx_stream;
+ ZSTD_outBuffer tx;
+ ZSTD_inBuffer rx;
+ size_t tx_not_flushed; /* Amount of data in internal zstd buffer */
+ size_t tx_buffered; /* Data which is consumed by ztd_read but not yet sent */
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+ char const* rx_error; /* Decompress error message */
+ size_t tx_total;
+ size_t tx_total_raw;
+ size_t rx_total;
+ size_t rx_total_raw;
+ char tx_buf[ZSTD_BUFFER_SIZE];
+ char rx_buf[ZSTD_BUFFER_SIZE];
+} ZstdStream;
+
+static ZpqStream*
+zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream));
+
+ zs->tx_stream = ZSTD_createCStream();
+ ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+ zs->rx_stream = ZSTD_createDStream();
+ ZSTD_initDStream(zs->rx_stream);
+ zs->tx.dst = zs->tx_buf;
+ zs->tx.pos = 0;
+ zs->tx.size = ZSTD_BUFFER_SIZE;
+ zs->rx.src = zs->rx_buf;
+ zs->rx.pos = 0;
+ zs->rx.size = 0;
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->tx_buffered = 0;
+ zs->tx_not_flushed = 0;
+ zs->rx_error = NULL;
+ zs->arg = arg;
+ zs->tx_total = zs->tx_total_raw = 0;
+ zs->rx_total = zs->rx_total_raw = 0;
+
+ zs->rx.size = rx_data_size;
+ Assert(rx_data_size < ZSTD_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_outBuffer out;
+ out.dst = buf;
+ out.pos = 0;
+ out.size = size;
+
+ while (1)
+ {
+ rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+ if (ZSTD_isError(rc))
+ {
+ zs->rx_error = ZSTD_getErrorName(rc);
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ /* Return result if we fill requested amount of bytes or read operation was performed */
+ if (out.pos != 0)
+ {
+ zs->rx_total_raw += out.pos;
+ return out.pos;
+ }
+ if (zs->rx.pos == zs->rx.size)
+ {
+ zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+ }
+ rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size);
+ if (rc > 0) /* read fetches some data */
+ {
+ zs->rx.size += rc;
+ zs->rx_total += rc;
+ }
+ else /* read failed */
+ {
+ zs->rx_total_raw += out.pos;
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ ssize_t rc;
+ ZSTD_inBuffer in_buf;
+ in_buf.src = buf;
+ in_buf.pos = 0;
+ in_buf.size = size;
+
+ do
+ {
+ if (zs->tx.pos == 0) /* Compress buffer is empty */
+ {
+ zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (in_buf.pos < size) /* Has something to compress in input buffer */
+ ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+ if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+ {
+ zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+ if (rc > 0)
+ {
+ zs->tx.pos -= rc;
+ zs->tx.dst = (char*)zs->tx.dst + rc;
+ zs->tx_total += rc;
+ }
+ else
+ {
+ *processed = in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ zs->tx_total_raw += in_buf.pos;
+ return rc;
+ }
+ } while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+ zs->tx_total_raw += in_buf.pos;
+ zs->tx_buffered = zs->tx.pos;
+ return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ if (zs != NULL)
+ {
+ ZSTD_freeCStream(zs->tx_stream);
+ ZSTD_freeDStream(zs->rx_stream);
+ free(zs);
+ }
+}
+
+static char const*
+zstd_error(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs->rx_error;
+}
+
+static size_t
+zstd_buffered_tx(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static size_t
+zstd_buffered_rx(ZpqStream *zstream)
+{
+ ZstdStream* zs = (ZstdStream*)zstream;
+ return zs != NULL ? zs->rx.size - zs->rx.pos : 0;
+}
+
+static char
+zstd_name(void)
+{
+ return 'f';
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command
+ * and command is mostly limited by record length,
+ * which in turn usually less than page size (except TOAST)
+ */
+#define ZLIB_COMPRESSION_LEVEL 1 /* Experiments shows that default (fastest) compression level
+ * provides the best size/speed ratio. It is significantly (times)
+ * faster than more expensive levels and differences in compression
+ * ratio is not so large
+ */
+
+typedef struct ZlibStream
+{
+ ZpqStream common;
+
+ z_stream tx;
+ z_stream rx;
+
+ zpq_tx_func tx_func;
+ zpq_rx_func rx_func;
+ void* arg;
+
+ size_t tx_buffered;
+
+ Bytef tx_buf[ZLIB_BUFFER_SIZE];
+ Bytef rx_buf[ZLIB_BUFFER_SIZE];
+} ZlibStream;
+
+static ZpqStream*
+zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ int rc;
+ ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream));
+ memset(&zs->tx, 0, sizeof(zs->tx));
+ zs->tx.next_out = zs->tx_buf;
+ zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+ zs->tx_buffered = 0;
+ rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE);
+
+ memset(&zs->rx, 0, sizeof(zs->tx));
+ zs->rx.next_in = zs->rx_buf;
+ zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+ rc = inflateInit(&zs->rx);
+ if (rc != Z_OK)
+ {
+ free(zs);
+ return NULL;
+ }
+ Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE);
+
+ zs->rx.avail_in = rx_data_size;
+ Assert(rx_data_size < ZLIB_BUFFER_SIZE);
+ memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+ zs->rx_func = rx_func;
+ zs->tx_func = tx_func;
+ zs->arg = arg;
+
+ return (ZpqStream*)zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream *zstream, void *buf, size_t size)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->rx.next_out = (Bytef *)buf;
+ zs->rx.avail_out = size;
+
+ while (1)
+ {
+ if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+ {
+ rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+ if (rc != Z_OK && rc != Z_BUF_ERROR)
+ {
+ return ZPQ_DECOMPRESS_ERROR;
+ }
+ if (zs->rx.avail_out != size)
+ {
+ return size - zs->rx.avail_out;
+ }
+ if (zs->rx.avail_in == 0)
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ }
+ else
+ {
+ zs->rx.next_in = zs->rx_buf;
+ }
+ rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+ if (rc > 0)
+ {
+ zs->rx.avail_in += rc;
+ }
+ else
+ {
+ return rc;
+ }
+ }
+}
+
+static ssize_t
+zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ int rc;
+ zs->tx.next_in = (Bytef *)buf;
+ zs->tx.avail_in = size;
+ do
+ {
+ if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
+ {
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+ if (zs->tx.avail_in != 0) /* Has something in input buffer */
+ {
+ rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+ Assert(rc == Z_OK);
+ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+ }
+ }
+ rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out);
+ if (rc > 0)
+ {
+ zs->tx.next_out += rc;
+ zs->tx.avail_out += rc;
+ }
+ else
+ {
+ *processed = size - zs->tx.avail_in;
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+ return rc;
+ }
+ } while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
+
+ zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+
+ return size - zs->tx.avail_in;
+}
+
+static void
+zlib_free(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ if (zs != NULL)
+ {
+ inflateEnd(&zs->rx);
+ deflateEnd(&zs->tx);
+ free(zs);
+ }
+}
+
+static char const*
+zlib_error(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered_tx(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs != NULL ? zs->tx_buffered : 0;
+}
+
+static size_t
+zlib_buffered_rx(ZpqStream *zstream)
+{
+ ZlibStream* zs = (ZlibStream*)zstream;
+ return zs != NULL ? zs->rx.avail_in : 0;
+}
+
+static char
+zlib_name(void)
+{
+ return 'z';
+}
+
+#endif
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+ {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx},
+#endif
+#if HAVE_LIBZ
+ {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx},
+#endif
+ {NULL}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream*
+zpq_create(int algorithm_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+ ZpqStream* stream = zpq_algorithms[algorithm_impl].create(tx_func, rx_func, arg, rx_data, rx_data_size);
+ if (stream)
+ stream->algorithm = &zpq_algorithms[algorithm_impl];
+ return stream;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+ return zs->algorithm->read(zs, buf, size);
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed)
+{
+ return zs->algorithm->write(zs, buf, size, processed);
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+ if (zs)
+ zs->algorithm->free(zs);
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+ return zs->algorithm->error(zs);
+}
+
+
+size_t
+zpq_buffered_rx(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->buffered_rx(zs) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream *zs)
+{
+ return zs ? zs->algorithm->buffered_tx(zs) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
+ * Algorithm identifies are appended to the provided buffer and terminated by '\0'.
+ */
+void
+zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
+{
+ int i;
+ for (i = 0; zpq_algorithms[i].name != NULL; i++)
+ {
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = zpq_algorithms[i].name();
+ }
+ Assert(i < ZPQ_MAX_ALGORITHMS);
+ algorithms[i] = '\0';
+}
+
+
+
+/*
+ * Choose current algorithm implementation.
+ * Returns implementation number or -1 if algorithm with such name is not found
+ */
+int
+zpq_get_algorithm_impl(char name)
+{
+ int i;
+ if (name != ZPQ_NO_COMPRESSION)
+ {
+ for (i = 0; zpq_algorithms[i].name != NULL; i++)
+ {
+ if (zpq_algorithms[i].name() == name)
+ {
+ return i;
+ }
+ }
+ }
+ return -1;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a66870b..2f767c6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5228,9 +5228,9 @@
proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'int4',
- proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+ proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
prosrc => 'pg_stat_get_activity' },
{ oid => '3318',
descr => 'statistics: information about progress of backends running maintenance command',
@@ -5497,6 +5497,14 @@
proargnames => '{wal_buffers_full,stats_reset}',
prosrc => 'pg_stat_get_wal' },
+{ oid => '1137', descr => 'statistics: information about network traffic',
+ proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+ proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+ proallargtypes => '{int4,int8,int8,int8,int8}',
+ proargmodes => '{i,o,o,o,o}',
+ proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+ prosrc => 'pg_stat_get_network_traffic' },
+
{ oid => '2306', descr => 'statistics: information about SLRU caches',
proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..54b2b8e
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,33 @@
+/*
+ * zpq_stream.h
+ * Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_MAX_ALGORITHMS (8)
+#define ZPQ_NO_COMPRESSION 'n'
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+ZpqStream* zpq_create(int impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered_rx(ZpqStream* zs);
+size_t zpq_buffered_tx(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+
+void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]);
+int zpq_get_algorithm_impl(char name);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281..5289f9f 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,8 @@ typedef struct Port
int keepalives_count;
int tcp_user_timeout;
+ char* compression_algorithms; /* Compression algorithms supported by client */
+
/*
* GSSAPI structures.
*/
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b115247..224ff3d 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
extern void pq_init(void);
+extern int pq_configure(Port* port);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index fb270df..b829fed 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
/* Define to 1 if you have the `link' function. */
#undef HAVE_LINK
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
/* Define to 1 if the system has the type `locale_t'. */
#undef HAVE_LOCALE_T
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4..5732796 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1184,6 +1184,12 @@ typedef struct PgBackendStatus
/* application name; MUST be null-terminated */
char *st_appname;
+ /* client-server traffic information */
+ uint64 st_rx_raw_bytes;
+ uint64 st_tx_raw_bytes;
+ uint64 st_rx_compressed_bytes;
+ uint64 st_tx_compressed_bytes;
+
/*
* Current command string; MUST be null-terminated. Note that this string
* possibly is truncated in the middle of a multi-byte character. As
@@ -1397,6 +1403,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
extern void pgstat_report_tempfile(size_t filesize);
extern void pgstat_report_appname(const char *appname);
extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
extern const char *pgstat_get_wait_event(uint32 wait_event_info);
extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b..be8cb34 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
# The MSVC build system scrapes OBJS from this file. If you change any of
# the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
OBJS = \
$(WIN32RES) \
fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index b0ca37c..5b04767 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
#include "common/ip.h"
#include "common/link-canary.h"
#include "common/scram-common.h"
+#include "common/zpq_stream.h"
#include "common/string.h"
#include "fe-auth.h"
#include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"compression", "COMPRESSION", NULL, NULL,
+ "Libpq-compression", "", 1,
+ offsetof(struct pg_conn, compression)},
+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
DefaultTargetSessionAttrs, NULL,
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
void
pqDropConnection(PGconn *conn, bool flushInput)
{
+ /* Release compression streams */
+ zpq_free(conn->zstream);
+ conn->zstream = NULL;
+
/* Drop any SSL state */
pqsecure_close(conn);
@@ -3216,11 +3225,55 @@ keep_going: /* We will come back to here until there is
*/
conn->inCursor = conn->inStart;
- /* Read type byte */
- if (pqGetc(&beresp, conn))
+ while (1)
{
- /* We'll come back when there is more data */
- return PGRES_POLLING_READING;
+ /* Read type byte */
+ if (pqGetc(&beresp, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+
+ if (beresp == 'z') /* Switch on compression */
+ {
+ char algorithm;
+ int impl;
+ /* Read message length word */
+ if (pqGetInt(&msgLength, 4, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+ if (msgLength != 5)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+ msgLength);
+ goto error_return;
+ }
+ pqGetc(&algorithm, conn);
+ impl = zpq_get_algorithm_impl(algorithm);
+ if (impl < 0) {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "server is not supported requested compression algorithm %c\n"), algorithm);
+ goto error_return;
+ }
+ Assert(!conn->zstream);
+ conn->zstream = zpq_create(impl, (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+ &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+ if (!conn->zstream)
+ {
+ appendPQExpBuffer(&conn->errorMessage,
+ libpq_gettext(
+ "failed to initialize compressor %c(%d)\n"), algorithm, impl);
+ goto error_return;
+ }
+ /* reset buffer */
+ conn->inStart = conn->inCursor = conn->inEnd = 0;
+ } else
+ break;
}
/*
@@ -4020,6 +4073,8 @@ freePGconn(PGconn *conn)
free(conn->dbName);
if (conn->replication)
free(conn->replication);
+ if (conn->compression)
+ free(conn->compression);
if (conn->pguser)
free(conn->pguser);
if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f3..4f39f66 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,12 +53,21 @@
#include "pg_config_paths.h"
#include "port/pg_bswap.h"
+#include <common/zpq_stream.h>
+
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
+#define pq_read_conn(conn) \
+ (conn->zstream \
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd) \
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd, \
+ conn->inBufSize - conn->inEnd))
+
/*
* PQlibVersion: return the libpq version number
*/
@@ -664,10 +673,17 @@ pqReadData(PGconn *conn)
/* OK, try to read some data */
retry3:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -759,10 +775,18 @@ retry3:
* arrived.
*/
retry4:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ nread = pq_read_conn(conn);
+
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
switch (SOCK_ERRNO)
{
case EINTR:
@@ -875,12 +899,14 @@ pqSendSome(PGconn *conn, int len)
}
/* while there's still data to send */
- while (len > 0)
+ while (len > 0 || zpq_buffered_tx(conn->zstream))
{
int sent;
-
+ size_t processed = 0;
+ sent = conn->zstream
+ ? zpq_write(conn->zstream, ptr, len, &processed)
#ifndef WIN32
- sent = pqsecure_write(conn, ptr, len);
+ : pqsecure_write(conn, ptr, len);
#else
/*
@@ -888,8 +914,11 @@ pqSendSome(PGconn *conn, int len)
* failure-point appears to be different in different versions of
* Windows, but 64k should always be safe.
*/
- sent = pqsecure_write(conn, ptr, Min(len, 65536));
+ : pqsecure_write(conn, ptr, Min(len, 65536));
#endif
+ ptr += processed;
+ len -= processed;
+ remaining -= processed;
if (sent < 0)
{
@@ -943,7 +972,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}
- if (len > 0)
+ if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525..4a38eed 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2135,6 +2135,83 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
}
/*
+ * Parse boolean value. This code is copied from backend/utils/atd/bool.c
+ * because it is not available at frontend.
+ */
+static bool
+parse_bool(const char *value, bool *result)
+{
+ switch (*value)
+ {
+ case 't':
+ case 'T':
+ if (pg_strcasecmp(value, "true") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case 'f':
+ case 'F':
+ if (pg_strcasecmp(value, "false") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case 'y':
+ case 'Y':
+ if (pg_strcasecmp(value, "yes") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case 'n':
+ case 'N':
+ if (pg_strcasecmp(value, "no") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case 'o':
+ case 'O':
+ /* 'o' is not unique enough */
+ if (pg_strcasecmp(value, "on") == 0)
+ {
+ *result = true;
+ return true;
+ }
+ else if (pg_strcasecmp(value, "off") == 0)
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ case '1':
+ if (value[1] == '\0')
+ {
+ *result = true;
+ return true;
+ }
+ break;
+ case '0':
+ if (value[1] == '\0')
+ {
+ *result = false;
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+
+ *result = false; /* suppress compiler warning */
+ return false;
+}
+
+/*
* Build a startup packet given a filled-in PGconn structure.
*
* We need to figure out how much space is needed, then fill it in.
@@ -2180,6 +2257,26 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("replication", conn->replication);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
+ if (conn->compression && conn->compression[0])
+ {
+ bool enabled;
+ /*
+ * If compression is enabled, then send to the server list of compression algorithms
+ * supported by client
+ */
+ if (parse_bool(conn->compression, &enabled))
+ {
+ char compression_algorithms[ZPQ_MAX_ALGORITHMS];
+ zpq_get_supported_algorithms(compression_algorithms);
+ ADD_STARTUP_OPTION("compression", compression_algorithms);
+ }
+ else if (packet == NULL)
+ {
+ fprintf(stderr,
+ libpq_gettext("WARNING: invlaid value for compression option: '%s'\n"),
+ conn->compression);
+ }
+ }
if (conn->send_appname)
{
/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae..69ad1e9 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
/* include stuff common to fe and be */
#include "getaddrinfo.h"
#include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
/* include stuff found in fe only */
#include "pqexpbuffer.h"
@@ -369,6 +370,7 @@ struct pg_conn
* "sspi") */
char *ssl_min_protocol_version; /* minimum TLS protocol version */
char *ssl_max_protocol_version; /* maximum TLS protocol version */
+ char *compression; /* stream compression (0 or 1) */
/* Type of connection to make. Possible values: any, read-write. */
char *target_session_attrs;
@@ -527,6 +529,9 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Compression stream */
+ ZpqStream* zstream;
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd..daac58c 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
config_info.c controldata_utils.c d2s.c encnames.c exec.c
f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
keywords.c kwlookup.c link-canary.c md5.c
- pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+ pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
wait_error.c wchar.c);