On 1/11/21 2:53 PM, Konstantin Knizhnik wrote:
> 
> ...
> 
> New version of libpq compression patch is attached.
> It can be also be found at g...@github.com:postgrespro/libpq_compression.git
> 

Seems it bit-rotted already, so here's a slightly fixed version.

1) Fixes the MSVC makefile. The list of files is sorted alphabetically,
so I've added the file at the end.

2) Fixes duplicate OID. It's a good practice to assign OIDs from the end
of the range, to prevent collisions during development.


Other than that, I wonder what's the easiest way to run all tests with
compression enabled. ISTM it'd be nice to add pg_regress option forcing
a particular compression algorithm to be used, or something similar. I'd
like a convenient way to pass this through a valgrind, for example. Or
how do we get this tested on a buildfarm?

I'm not convinced it's very user-friendly to not have a psql option
enabling compression. It's true it can be enabled in a connection
string, but I doubt many people will notice that.

The sgml docs need a bit more love / formatting. The lines in libpq.sgml
are far too long, and there are no tags whatsoever. Presumably zlib/zstd
should be marked as <literal>, and so on.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
>From 14690d8a877244fbc839f97274ccf7d84e971e71 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Mon, 11 Jan 2021 18:01:11 +0100
Subject: [PATCH] v29

---
 configure                                     |  81 +++
 configure.ac                                  |  21 +
 .../postgres_fdw/expected/postgres_fdw.out    |   2 +-
 doc/src/sgml/config.sgml                      |  16 +
 doc/src/sgml/libpq.sgml                       |  25 +
 doc/src/sgml/protocol.sgml                    |  93 +++
 src/Makefile.global.in                        |   1 +
 src/backend/Makefile                          |   8 +
 src/backend/catalog/system_views.sql          |   9 +
 src/backend/libpq/pqcomm.c                    | 247 +++++--
 src/backend/postmaster/pgstat.c               |  30 +
 src/backend/postmaster/postmaster.c           |  10 +
 src/backend/utils/adt/pgstatfuncs.c           |  50 +-
 src/backend/utils/misc/guc.c                  |  10 +
 src/common/Makefile                           |   3 +-
 src/common/zpq_stream.c                       | 684 ++++++++++++++++++
 src/include/catalog/pg_proc.dat               |  18 +-
 src/include/common/zpq_stream.h               |  94 +++
 src/include/libpq/libpq-be.h                  |   3 +
 src/include/libpq/libpq.h                     |   1 +
 src/include/libpq/pqcomm.h                    |   1 +
 src/include/pg_config.h.in                    |   3 +
 src/include/pgstat.h                          |   7 +
 src/interfaces/libpq/Makefile                 |  14 +
 src/interfaces/libpq/fe-connect.c             |  92 ++-
 src/interfaces/libpq/fe-exec.c                |   4 +-
 src/interfaces/libpq/fe-misc.c                |  55 +-
 src/interfaces/libpq/fe-protocol3.c           | 165 ++++-
 src/interfaces/libpq/libpq-int.h              |  21 +
 src/test/regress/expected/rules.out           |  14 +-
 src/tools/msvc/Mkvcbuild.pm                   |   2 +-
 31 files changed, 1701 insertions(+), 83 deletions(-)
 create mode 100644 src/common/zpq_stream.c
 create mode 100644 src/include/common/zpq_stream.h

diff --git a/configure b/configure
index b917a2a1c9..feb9420106 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ LD
 LDFLAGS_SL
 LDFLAGS_EX
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 XML2_LIBS
@@ -866,6 +867,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -8571,6 +8573,85 @@ fi
 
 
 
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
 
 #
 # Zlib
diff --git a/configure.ac b/configure.ac
index 838d47dc22..c037cd88f6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use zstd])
+AC_SUBST(with_zstd)
+
 #
 # Assignments
 #
@@ -1186,6 +1193,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1401,6 +1415,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..07504388b0 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7c0a673a8d..a723251a2a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -996,6 +996,22 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter enables compression of libpq traffic between client and server.
+        The default is <literal>on</literal>.
+        This option allows rejecting compression requests even if it is supported by server
+        (for example, due to security, or CPU consumption).
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 2bb3bf77e4..ed9e7f910e 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1240,6 +1240,31 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. The client sends a request with a list of compression algorithms.
+        Compression can be requested by a client by including the "compression" option in its connection string.
+        This can either be a boolean value to enable or disable compression
+        ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "any", or an explicit list of comma-separated compression algorithms
+        which can optionally include compression level ("zlib,zstd:5").
+        If compression is enabled but an algorithm is not explicitly specified, the client library sends its full list of
+        supported algorithms and the server chooses a preferred algorithm.
+
+        If the server accepts one of the algorithms, it replies with an acknowledgment and all future libpq messages between client and server
+        will be compressed.
+        If the server rejects the compression request, it is up to the client whether to continue without compression or to report an error.
+        Support for compression algorithms must be enabled when the server is compiled.
+        Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases, streaming mode is used.
+        By default, compression is not requested by the client.
+        Please note that using compression together with SSL may expose extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 24f7b21ecb..a69f9bfc26 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especially useful for importing/exporting data to/from the database using the <literal>COPY</literal> command
+    and for replication (both physical and logical). Compression can also reduce the server's response time
+    for queries returning a large amount of data (for example, JSON, BLOBs, text, ...).
+    Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+    configured with --with-zstd option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -262,6 +271,22 @@
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         The server accepts the client's compression request.
+         Compression is requested when a client connection includes the "compression" option, which includes
+         a list of requested compression algorithms.
+         If the server accepts one of these algorithms, it acknowledges use of compression and
+         all subsequent libpq messages between the client and server will be compressed.
+         The server chooses an algorithm from the list specified by client and responds with the index of the chosen algorithm from the client-supplied list.
+         If the server does not accept any of the requested algorithms, then it replies with an index of -1
+         and it is up to the client whether to continue without compression or to report an error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
@@ -3416,6 +3441,59 @@ following:
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. The client sends to the server a list of requested compression algorithms.
+  If the server supports any of these algorithms, it acknowledges use of this algorithm and all subsequent libpq messages between client and server
+  will be compressed.
+  The server selects the preferred algorithm from the list specified by client and responds with the
+  index of the chosen algorithm in this list.
+  If the server does not support any of the requested algorithms, it replies with -1
+  and it is up to the client whether to continue without compression or to report an error.
+  After receiving this message with algorithm index other than -1, both server and client switch to compressed mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Index of algorithm in the list of supported algorithms specified by client or -1 if none of them are supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5982,6 +6060,21 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional
+                        specification of compression level: <literal>"zlib,zstd:5"</literal>.
+                        If the server does not accept compression, the backend will ignore the _pq_.compression
+                        parameter and will not send the CompressionAck message to the frontend.
+                        By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities:
+                        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9aac5..9e11599002 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9672e2cb43..3ff92daf75 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5d89e77dbe..1cd343a425 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+	    S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 1e6b6db540..84f9b183e8 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -88,11 +89,14 @@
 
 #include "common/ip.h"
 #include "libpq/libpq.h"
+#include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "port/pg_bswap.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +122,7 @@
  */
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
+bool		libpq_compression;
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -141,6 +146,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream * PqStream;
+
+
 /*
  * Message status
  */
@@ -183,6 +191,115 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t
+write_compressed(void *arg, void const *data, size_t size)
+{
+	ssize_t		rc = secure_write((Port *) arg, (void *) data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, 0, rc);
+	return rc;
+}
+
+static ssize_t
+read_compressed(void *arg, void *data, size_t size)
+{
+	ssize_t		rc = secure_read((Port *) arg, data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, rc, 0);
+	return rc;
+}
+
+/*
+ * Send to the client index of chosen compression algorithm
+ */
+static void
+SendCompressionACK(int algorithm)
+{
+	StringInfoData buf;
+
+	pq_beginmessage(&buf, 'z');
+	pq_sendbyte(&buf, (uint8) algorithm);
+	pq_endmessage(&buf);
+	pq_flush();
+}
+
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port *port)
+{
+	char	   *client_compression_algorithms = port->compression_algorithms;
+
+	/*
+	 * If client request compression, it sends list of supported compression
+	 * algorithms separated by comma.
+	 */
+	if (client_compression_algorithms && libpq_compression)
+	{
+		int			compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+		int			impl = -1;
+		char	  **server_compression_algorithms = zpq_get_supported_algorithms();
+		int			index = -1;
+		char	   *protocol_extension = strchr(client_compression_algorithms, ';');
+
+		/* No protocol extension are currently supported */
+		if (protocol_extension)
+			*protocol_extension = '\0';
+
+		for (int i = 0; *client_compression_algorithms; i++)
+		{
+			char	   *sep = strchr(client_compression_algorithms, ',');
+			char	   *level;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			level = strchr(client_compression_algorithms, ':');
+			if (level != NULL)
+			{
+				*level = '\0';	/* compression level is ignored now */
+				if (sscanf(level + 1, "%d", &compression_level) != 1)
+					ereport(LOG,
+							(errmsg("Invalid compression level: %s", level + 1)));
+			}
+			for (impl = 0; server_compression_algorithms[impl] != NULL; impl++)
+			{
+				if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0)
+				{
+					index = i;
+					goto SendCompressionAck;
+				}
+			}
+
+			if (sep != NULL)
+				client_compression_algorithms = sep + 1;
+			else
+				break;
+		}
+SendCompressionAck:
+		free(server_compression_algorithms);
+		SendCompressionACK(index);
+
+		if (index >= 0)			/* Use compression */
+		{
+			PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0);
+			if (!PqStream)
+			{
+				ereport(LOG,
+						(errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl])));
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -283,6 +400,9 @@ socket_close(int code, Datum arg)
 		 */
 		secure_close(MyProcPort);
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Formerly we did an explicit close() here, but it seems better to
 		 * leave the socket open until the process dies.  This allows clients
@@ -931,12 +1051,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode.
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -952,21 +1075,40 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		/*
+		 * If streaming compression is enabled then use correspondent
+		 * compression read function.
+		 */
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const *msg = zpq_decompress_error(PqStream);
+
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -987,7 +1129,8 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		pgstat_report_network_traffic(r, 0, 0, 0);
+		return r;
 	}
 }
 
@@ -1002,7 +1145,8 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1021,7 +1165,8 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1038,48 +1183,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-	int			r;
+	int			r = 0;
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1100,7 +1212,8 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1134,7 +1247,8 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1175,7 +1289,8 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1425,13 +1540,24 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+
+		/*
+		 * has more data to flush or unsent data in internal compression
+		 * buffer
+		 */
 	{
 		int			r;
+		size_t		processed = 0;
+		size_t		available = bufend - bufptr;
 
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
 
-		if (r <= 0)
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1474,12 +1600,12 @@ internal_flush(void)
 			InterruptPending = 1;
 			return EOF;
 		}
+		pgstat_report_network_traffic(0, r, 0, 0);
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
@@ -1496,7 +1622,7 @@ socket_flush_if_writable(void)
 	int			res;
 
 	/* Quick exit if nothing to do */
-	if (PqSendPointer == PqSendStart)
+	if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
 		return 0;
 
 	/* No-op if reentrant call */
@@ -1519,7 +1645,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-	return (PqSendStart < PqSendPointer);
+	return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
 }
 
 /* --------------------------------
@@ -2010,3 +2136,16 @@ pq_settcpusertimeout(int timeout, Port *port)
 
 	return STATUS_OK;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+	char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL;
+
+	if (algorithm_name)
+		PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3f24a33ef1..f09c81bedb 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3135,6 +3135,9 @@ pgstat_bestart(void)
 	lbeentry.st_xact_start_timestamp = 0;
 	lbeentry.st_databaseid = MyDatabaseId;
 
+	lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+		lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
 	/* We have userid for client-backends, wal-sender and bgworker processes */
 	if (lbeentry.st_backendType == B_BACKEND
 		|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3517,6 +3520,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+	volatile PgBackendStatus *beentry = MyBEEntry;
+
+	if (!pgstat_track_activities || !beentry)
+		return;
+
+	/*
+	 * Update my status entry, following the protocol of bumping
+	 * st_changecount before and after.  We use a volatile pointer here to
+	 * ensure the compiler doesn't try to get cute.
+	 */
+	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+	beentry->st_rx_raw_bytes += rx_raw_bytes;
+	beentry->st_tx_raw_bytes += tx_raw_bytes;
+	beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+	beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+	PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 7de27ee4e0..96741ab16c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2140,6 +2140,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "_pq_.compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4439,6 +4441,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 5c12a165a1..3a80b34675 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -569,7 +569,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	30
+#define PG_STAT_GET_ACTIVITY_COLS	34
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -915,6 +915,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[28] = BoolGetDatum(false);	/* GSS Encryption not in
 													 * use */
 			}
+			values[30] = beentry->st_rx_raw_bytes;
+			values[31] = beentry->st_tx_raw_bytes;
+			values[32] = beentry->st_rx_compressed_bytes;
+			values[33] = beentry->st_tx_compressed_bytes;
 		}
 		else
 		{
@@ -943,6 +947,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[27] = true;
 			nulls[28] = true;
 			nulls[29] = true;
+			nulls[30] = true;
+			nulls[31] = true;
+			nulls[32] = true;
+			nulls[33] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1142,6 +1150,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
 	PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS	4
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_NETWORK_TRAFFIC_COLS];
+	bool		nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+	int32		beid = PG_GETARG_INT32(0);
+	PgBackendStatus *beentry;
+
+	if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+		PG_RETURN_NULL();
+	else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+					   INT8OID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Fill values and NULLs */
+	values[0] = beentry->st_rx_raw_bytes;
+	values[1] = beentry->st_tx_raw_bytes;
+	values[2] = beentry->st_rx_compressed_bytes;
+	values[3] = beentry->st_tx_compressed_bytes;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 17579eeaca..cba4fdd880 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1292,6 +1292,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+			gettext_noop("Compress client-server traffic."),
+			NULL
+		},
+		&libpq_compression,
+		true,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
 			gettext_noop("Logs each checkpoint."),
diff --git a/src/common/Makefile b/src/common/Makefile
index f624977939..6915cb1858 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -78,7 +78,8 @@ OBJS_COMMON = \
 	unicode_norm.o \
 	username.o \
 	wait_error.o \
-	wchar.o
+	wchar.o \
+	zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000000..0451d2d502
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,684 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+	/*
+	 * Name of compression algorithm.
+	 */
+	char const *(*name) (void);
+
+	/*
+	 * Create new compression stream.
+	 * level: compression level
+	 */
+	void	   *(*create_compressor) (int level);
+
+	/*
+	 * Create new decompression stream.
+	 */
+	void	   *(*create_decompressor) ();
+
+	/*
+	 * Decompress up to "src_size" compressed bytes from *src and write up to
+	 * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed
+	 * bytes written to *dst is stored in *dst_processed. Number of compressed
+	 * bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during decompression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * decompressor internal buffers.
+	 *
+	 * ZPQ_STREAM_END if encountered end of compressed data stream.
+	 *
+	 * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression
+	 * attempt.
+	 */
+	ssize_t		(*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Compress up to "src_size" raw (non-compressed) bytes from *src and
+	 * write up to "dst_size" compressed bytes to *dst. Number of compressed
+	 * bytes written to *dst is stored in *dst_processed. Number of
+	 * non-compressed bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during compression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * compressor internal buffers.
+	 *
+	 * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt.
+	 */
+	ssize_t		(*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Free compression stream created by create_compressor function.
+	 */
+	void		(*free_compressor) (void *cs);
+
+	/*
+	 * Free decompression stream created by create_decompressor function.
+	 */
+	void		(*free_decompressor) (void *ds);
+
+	/*
+	 * Get compressor error message.
+	 */
+	char const *(*compress_error) (void *cs);
+
+	/*
+	 * Get decompressor error message.
+	 */
+	char const *(*decompress_error) (void *ds);
+}			ZpqAlgorithm;
+
+
+#define ZPQ_BUFFER_SIZE       8192	/* We have to flush stream after each
+									 * protocol command and command is mostly
+									 * limited by record length, which in turn
+									 * is usually less than page size (except
+									 * TOAST)
+									 */
+
+struct ZpqStream
+{
+	ZpqAlgorithm const *c_algorithm;
+	void	   *c_stream;
+
+	ZpqAlgorithm const *d_algorithm;
+	void	   *d_stream;
+
+	char		tx_buf[ZPQ_BUFFER_SIZE];
+	size_t		tx_pos;
+	size_t		tx_size;
+
+	char		rx_buf[ZPQ_BUFFER_SIZE];
+	size_t		rx_pos;
+	size_t		rx_size;
+
+	zpq_tx_func tx_func;
+	zpq_rx_func rx_func;
+	void	   *arg;
+
+	size_t		tx_total;
+	size_t		tx_total_raw;
+	size_t		rx_total;
+	size_t		rx_total_raw;
+
+	bool		rx_not_flushed;
+	bool		tx_not_flushed;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+/*
+ * Maximum allowed back-reference distance, expressed as power of 2.
+ * This setting controls max compressor/decompressor window size.
+ * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536
+ */
+#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */
+
+typedef struct ZPQ_ZSTD_CStream
+{
+	ZSTD_CStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_CStream;
+
+typedef struct ZPQ_ZSTD_DStream
+{
+	ZSTD_DStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_DStream;
+
+static void *
+zstd_create_compressor(int level)
+{
+	ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream));
+
+	c_stream->stream = ZSTD_createCStream();
+	ZSTD_initCStream(c_stream->stream, level);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	c_stream->error = NULL;
+	return c_stream;
+}
+
+static void *
+zstd_create_decompressor()
+{
+	ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream));
+
+	d_stream->stream = ZSTD_createDStream();
+	ZSTD_initDStream(d_stream->stream);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	d_stream->error = NULL;
+	return d_stream;
+}
+
+static ssize_t
+zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+	size_t		rc;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	rc = ZSTD_decompressStream(ds->stream, &out, &in);
+
+	*src_processed = in.pos;
+	*dst_processed = out.pos;
+	if (ZSTD_isError(rc))
+	{
+		ds->error = ZSTD_getErrorName(rc);
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	if (out.pos == out.size)
+	{
+		/*
+		 * if `output.pos == output.size`, there might be some data left
+		 * within internal buffers
+		 */
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static ssize_t
+zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	if (in.pos < src_size)		/* Has something to compress in input buffer */
+	{
+		size_t		rc = ZSTD_compressStream(cs->stream, &out, &in);
+
+		*dst_processed = out.pos;
+		*src_processed = in.pos;
+		if (ZSTD_isError(rc))
+		{
+			cs->error = ZSTD_getErrorName(rc);
+			return ZPQ_COMPRESS_ERROR;
+		}
+	}
+
+	if (in.pos == src_size)		/* All data is compressed: flush internal zstd
+								 * buffer */
+	{
+		size_t		tx_not_flushed = ZSTD_flushStream(cs->stream, &out);
+
+		*dst_processed = out.pos;
+		if (tx_not_flushed > 0)
+		{
+			return ZPQ_DATA_PENDING;
+		}
+	}
+
+	return ZPQ_OK;
+}
+
+static void
+zstd_free_compressor(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		ZSTD_freeCStream(cs->stream);
+		free(cs);
+	}
+}
+
+static void
+zstd_free_decompressor(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		ZSTD_freeDStream(ds->stream);
+		free(ds);
+	}
+}
+
+static char const *
+zstd_compress_error(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	return cs->error;
+}
+
+static char const *
+zstd_decompress_error(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	return ds->error;
+}
+
+static char const *
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+
+static void *
+zlib_create_compressor(int level)
+{
+	int			rc;
+	z_stream   *c_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(c_stream, 0, sizeof(*c_stream));
+	rc = deflateInit(c_stream, level);
+	if (rc != Z_OK)
+	{
+		free(c_stream);
+		return NULL;
+	}
+	return c_stream;
+}
+
+static void *
+zlib_create_decompressor()
+{
+	int			rc;
+	z_stream   *d_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(d_stream, 0, sizeof(*d_stream));
+	rc = inflateInit(d_stream);
+	if (rc != Z_OK)
+	{
+		free(d_stream);
+		return NULL;
+	}
+	return d_stream;
+}
+
+static ssize_t
+zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+	int			rc;
+
+	ds->next_in = (Bytef *) src;
+	ds->avail_in = src_size;
+	ds->next_out = (Bytef *) dst;
+	ds->avail_out = dst_size;
+
+	rc = inflate(ds, Z_SYNC_FLUSH);
+	*src_processed = src_size - ds->avail_in;
+	*dst_processed = dst_size - ds->avail_out;
+
+	if (rc == Z_STREAM_END)
+	{
+		return ZPQ_STREAM_END;
+	}
+	if (rc != Z_OK && rc != Z_BUF_ERROR)
+	{
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	return ZPQ_OK;
+}
+
+static ssize_t
+zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+	int			rc;
+	unsigned	deflate_pending = 0;
+
+
+	cs->next_out = (Bytef *) dst;
+	cs->avail_out = dst_size;
+	cs->next_in = (Bytef *) src;
+	cs->avail_in = src_size;
+
+	rc = deflate(cs, Z_SYNC_FLUSH);
+	Assert(rc == Z_OK);
+	*dst_processed = dst_size - cs->avail_out;
+	*src_processed = src_size - cs->avail_in;
+
+	deflatePending(cs, &deflate_pending, Z_NULL);	/* check if any data left
+													 * in deflate buffer */
+	if (deflate_pending > 0)
+	{
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static void
+zlib_free_compressor(void *c_stream)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+
+	if (cs != NULL)
+	{
+		deflateEnd(cs);
+		free(cs);
+	}
+}
+
+static void
+zlib_free_decompressor(void *d_stream)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+
+	if (ds != NULL)
+	{
+		inflateEnd(ds);
+		free(ds);
+	}
+}
+
+static char const *
+zlib_error(void *stream)
+{
+	z_stream   *zs = (z_stream *) stream;
+
+	return zs->msg;
+}
+
+static char const *
+zlib_name(void)
+{
+	return "zlib";
+}
+
+#endif
+
+static char const *
+no_compression_name(void)
+{
+	return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error},
+#endif
+#if HAVE_LIBZ
+	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error},
+#endif
+	{no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream *
+zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size)
+{
+	ZpqStream  *zs = (ZpqStream *) malloc(sizeof(ZpqStream));
+
+	zs->tx_pos = 0;
+	zs->tx_size = 0;
+	zs->rx_pos = 0;
+	zs->rx_size = 0;
+	zs->tx_func = tx_func;
+	zs->rx_func = rx_func;
+	zs->arg = arg;
+	zs->tx_total = 0;
+	zs->tx_total_raw = 0;
+	zs->rx_total = 0;
+	zs->rx_total_raw = 0;
+	zs->tx_not_flushed = false;
+	zs->rx_not_flushed = false;
+
+	zs->rx_size = rx_data_size;
+	Assert(rx_data_size < ZPQ_BUFFER_SIZE);
+	memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+	zs->c_algorithm = &zpq_algorithms[c_alg_impl];
+	zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level);
+	if (zs->c_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+	zs->d_algorithm = &zpq_algorithms[d_alg_impl];
+	zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor();
+	if (zs->d_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+
+	return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream * zs, void *buf, size_t size)
+{
+	size_t		buf_pos = 0;
+	size_t		rx_processed;
+	size_t		buf_processed;
+	ssize_t		rc;
+
+	while (buf_pos == 0)
+	{							/* Read until some data fetched */
+		if (zs->rx_pos == zs->rx_size)
+		{
+			zs->rx_pos = zs->rx_size = 0;	/* Reset rx buffer */
+		}
+
+		if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed)
+		{
+			ssize_t		rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size);
+
+			if (rc > 0)			/* read fetches some data */
+			{
+				zs->rx_size += rc;
+				zs->rx_total += rc;
+			}
+			else				/* read failed */
+			{
+				return rc;
+			}
+		}
+
+		Assert(zs->rx_pos <= zs->rx_size);
+		rx_processed = 0;
+		buf_processed = 0;
+		rc = zs->d_algorithm->decompress(zs->d_stream,
+										 (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed,
+										 buf, size, &buf_processed);
+
+		zs->rx_pos += rx_processed;
+		zs->rx_total_raw += rx_processed;
+		buf_pos += buf_processed;
+		zs->rx_not_flushed = false;
+		if (rc == ZPQ_STREAM_END)
+		{
+			break;
+		}
+		if (rc == ZPQ_DATA_PENDING)
+		{
+			zs->rx_not_flushed = true;
+			continue;
+		}
+		if (rc != ZPQ_OK)
+		{
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+	}
+	return buf_pos;
+}
+
+ssize_t
+zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed)
+{
+	size_t		buf_pos = 0;
+
+	do
+	{
+		if (zs->tx_pos == zs->tx_size)	/* Have nothing to send */
+		{
+			size_t		tx_processed = 0;
+			size_t		buf_processed = 0;
+			ssize_t		rc;
+
+			zs->tx_pos = zs->tx_size = 0;	/* Reset pointer to the beginning of buffer */
+
+			rc = zs->c_algorithm->compress(zs->c_stream,
+										   (char *) buf + buf_pos, size - buf_pos, &buf_processed,
+										   (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed);
+
+			zs->tx_size += tx_processed;
+			buf_pos += buf_processed;
+			zs->tx_total_raw += buf_processed;
+			zs->tx_not_flushed = false;
+
+			if (rc == ZPQ_DATA_PENDING)
+			{
+				zs->tx_not_flushed = true;
+				continue;
+			}
+			if (rc != ZPQ_OK)
+			{
+				*processed = buf_pos;
+				return ZPQ_COMPRESS_ERROR;
+			}
+		}
+		while (zs->tx_pos < zs->tx_size)
+		{
+			ssize_t		rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos);
+
+			if (rc > 0)
+			{
+				zs->tx_pos += rc;
+				zs->tx_total += rc;
+			}
+			else
+			{
+				*processed = buf_pos;
+				return rc;
+			}
+		}
+
+		/*
+		 * repeat sending while there is some data in input or internal
+		 * compression buffer
+		 */
+	} while (buf_pos < size || zs->tx_not_flushed);
+
+	return buf_pos;
+}
+
+void
+zpq_free(ZpqStream * zs)
+{
+	if (zs)
+	{
+		if (zs->c_stream)
+		{
+			zs->c_algorithm->free_compressor(zs->c_stream);
+		}
+		if (zs->d_stream)
+		{
+			zs->d_algorithm->free_decompressor(zs->d_stream);
+		}
+		free(zs);
+	}
+}
+
+char const *
+zpq_compress_error(ZpqStream * zs)
+{
+	return zs->c_algorithm->compress_error(zs->c_stream);
+}
+
+char const *
+zpq_decompress_error(ZpqStream * zs)
+{
+	return zs->d_algorithm->decompress_error(zs->d_stream);
+}
+
+size_t
+zpq_buffered_rx(ZpqStream * zs)
+{
+	return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream * zs)
+{
+	return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char	  **
+zpq_get_supported_algorithms(void)
+{
+	size_t		n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms);
+	char	  **algorithm_names = malloc(n_algorithms * sizeof(char *));
+
+	for (size_t i = 0; i < n_algorithms; i++)
+	{
+		algorithm_names[i] = (char *) zpq_algorithms[i].name();
+	}
+
+	return algorithm_names;
+}
+
+char const *
+zpq_compress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->c_algorithm->name() : NULL;
+}
+
+char const *
+zpq_decompress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->d_algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d7b55f57ea..845d317a0d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5236,9 +5236,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
@@ -5505,6 +5505,14 @@
    proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 
+{ oid => '1137', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5636,6 +5644,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '9257', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000000..e48b5ac930
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,94 @@
+/*
+ * zpq_stream.h
+ *     Streaming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_OK (0)
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_COMPRESS_ERROR (-3)
+#define ZPQ_STREAM_END (-4)
+#define ZPQ_DATA_PENDING (-5)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size);
+typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size);
+
+/*
+ * Create compression stream with rx/tx function for reading/sending compressed data.
+ * c_alg_impl: index of chosen compression algorithm
+ * c_level: compression c_level
+ * d_alg_impl: index of chosen decompression algorithm
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for reading compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+extern ZpqStream  *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size);
+
+/*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function.
+ */
+extern ssize_t		zpq_read(ZpqStream * zs, void *buf, size_t size);
+
+/*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code.
+ * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function.
+ * In the last case number of bytes written is stored in *processed.
+ */
+extern ssize_t		zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed);
+
+/*
+ * Get decompressor error message.
+ */
+extern char const *zpq_decompress_error(ZpqStream * zs);
+
+/*
+ * Get compressor error message.
+ */
+extern char const *zpq_compress_error(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal rx decompression buffer.
+ */
+extern size_t		zpq_buffered_rx(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal tx compression buffer.
+ */
+extern size_t		zpq_buffered_tx(ZpqStream * zs);
+
+/*
+ * Free stream created by zpq_create function.
+ */
+extern void		zpq_free(ZpqStream * zs);
+
+/*
+ * Get the name of chosen compression algorithm.
+ */
+extern char const *zpq_compress_algorithm_name(ZpqStream * zs);
+
+/*
+ * Get the name of chosen decompression algorithm.
+ */
+extern char const *zpq_decompress_algorithm_name(ZpqStream * zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+extern char	  **zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 66a8673d93..439904bc27 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,9 @@ typedef struct Port
 	int			keepalives_count;
 	int			tcp_user_timeout;
 
+	char	   *compression_algorithms; /* Compression algorithms supported by
+										 * client */
+
 	/*
 	 * GSSAPI structures.
 	 */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index a55898c85a..d502dbca85 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int	pq_configure(Port *port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index a86b895b26..beca9f7c35 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -159,6 +159,7 @@ typedef struct StartupPacket
 } StartupPacket;
 
 extern bool Db_user_namespace;
+extern bool libpq_compression;
 
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index f4d9f3b408..049b8f31df 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index c38b689710..366b0f4507 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1197,6 +1197,12 @@ typedef struct PgBackendStatus
 	/* application name; MUST be null-terminated */
 	char	   *st_appname;
 
+	/* client-server traffic information */
+	uint64		st_rx_raw_bytes;
+	uint64		st_tx_raw_bytes;
+	uint64		st_rx_compressed_bytes;
+	uint64		st_tx_compressed_bytes;
+
 	/*
 	 * Current command string; MUST be null-terminated. Note that this string
 	 * possibly is truncated in the middle of a multi-byte character. As
@@ -1410,6 +1416,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_wait_event(uint32 wait_event_info);
 extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index c4fde3f93d..21a1dededd 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a834ce8cf0..07495bdf13 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "PGCOMPRESSION", NULL, NULL,
+		"Libpq-compression", "", 16,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn)
 				return 1;		/* success! */
 
 			case PGRES_POLLING_READING:
+			    /* if there is some buffered RX data in ZpqStream
+			     * then don't proceed to pqWaitTimed */
+			    if (zpq_buffered_rx(conn->zstream)) {
+			        break;
+			    }
+
 				ret = pqWaitTimed(1, 0, conn, finish_time);
 				if (ret == -1)
 				{
@@ -3221,11 +3236,78 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						int index;
+						char resp;
+						/* Read message length word */
+						if (pqGetInt(&msgLength, 4, conn))
+						{
+							/* We'll come back when there is more data */
+							return PGRES_POLLING_READING;
+						}
+						if (msgLength != 5)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+											  msgLength);
+							goto error_return;
+						}
+						pqGetc(&resp, conn);
+						index = resp;
+						if (index == (char)-1)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server does not support requested compression algorithms %s\n"),
+											  conn->compression);
+							goto error_return;
+						}
+						if ((unsigned)index >= conn->n_compressors)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server returns incorrect compression aslogirhm  index: %d\n"),
+											  index);
+							goto error_return;
+						}
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create(conn->compressors[index].impl,
+												   conn->compressors[index].level,
+                                                   conn->compressors[index].impl,
+												   (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+												   &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+						if (!conn->zstream)
+						{
+							char** supported_algorithms = zpq_get_supported_algorithms();
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "failed to initialize compressor %s\n"),
+											  supported_algorithms[conn->compressors[index].impl]);
+							free(supported_algorithms);
+							goto error_return;
+						}
+						/* reset buffer */
+						conn->inStart = conn->inCursor = conn->inEnd = 0;
+					}
+					else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol version */
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext(
+											  "server is not supporting libpq compression\n"));
+						goto error_return;
+					} else
+						break;
 				}
 
 				/*
@@ -4024,6 +4106,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index d48f0fd587..5fd099015f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
 		 * EOF indication.  We expect therefore that this won't result in any
 		 * undue delay in reporting a previous write failure.)
 		 */
-		if (flushResult ||
-			pqWait(true, false, conn) ||
+		if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+			pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			/*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 6094f048f3..18ee6053b4 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,12 +53,24 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
 						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
+/*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)												\
+	(conn->zstream														\
+	 ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,			\
+				conn->inBufSize - conn->inEnd)							\
+	 : pqsecure_read(conn, conn->inBuffer + conn->inEnd,				\
+					 conn->inBufSize - conn->inEnd))
+
 /*
  * PQlibVersion: return the libpq version number
  */
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -759,10 +778,18 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered_tx(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+        /*
+		 * Use zpq_write if compression is switched on
+		 */
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
@@ -1034,6 +1069,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+	if (forRead && conn->inCursor < conn->inEnd)
+		return 0;
 	return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index a4d6ee2674..d50dd9a8fd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 			if (async)
 				return 0;
 			/* Need to load more data */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				return -2;
 			continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
 	while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
 	{
 		/* need to load more data */
-		if (pqWait(true, false, conn) ||
+		if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 		if (needInput)
 		{
 			/* Wait for some data to arrive (or for the channel to close) */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				break;
 		}
@@ -2134,6 +2134,154 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 	return startpacket;
 }
 
+/*
+ * Build comma-separated list of compression algorithms requested by client.
+ * It can be either explicitly specified by user in connection string, or
+ * include all algorithms supported by client library.
+ * This function returns true if the compression string is successfully parsed and
+ * stores a comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to *client_compressors.
+ * Also it creates an array of compressor descriptors, each element of which corresponds to
+ * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when a compression acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors)
+{
+	char** supported_algorithms = zpq_get_supported_algorithms();
+	char* value = conn->compression;
+	int n_supported_algorithms;
+	int total_len = 0;
+	int i;
+
+	for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+	{
+		total_len += strlen(supported_algorithms[n_supported_algorithms])+1;
+	}
+
+	if (pg_strcasecmp(value, "true") == 0 ||
+		pg_strcasecmp(value, "yes") == 0 ||
+		pg_strcasecmp(value, "on") == 0 ||
+		pg_strcasecmp(value, "any") == 0 ||
+		pg_strcasecmp(value, "1") == 0)
+	{
+		/* Compression is enabled: choose algorithm automatically */
+		char* p;
+
+		if (n_supported_algorithms == 0)
+		{
+			*client_compressors = NULL; /* no compressors are available */
+			conn->compressors = NULL;
+			conn->n_compressors = 0;
+			return true;
+		}
+		*client_compressors = p = malloc(total_len);
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+		for (i = 0; i < n_supported_algorithms; i++)
+		{
+			strcpy(p, supported_algorithms[i]);
+			p += strlen(p);
+			*p++ = ',';
+			if (build_descriptors)
+			{
+				conn->compressors[i].impl = i;
+				conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			}
+		}
+		p[-1] = '\0';
+		return true;
+	}
+	else if (*value == 0 ||
+			 pg_strcasecmp(value, "false") == 0 ||
+			 pg_strcasecmp(value, "no") == 0 ||
+			 pg_strcasecmp(value, "off") == 0 ||
+			 pg_strcasecmp(value, "0") == 0)
+	{
+		/* Compression is disabled */
+		*client_compressors = NULL;
+		conn->compressors = NULL;
+		conn->n_compressors = 0;
+		return true;
+	}
+	else
+	{
+		/* List of compression algorithms separated by commas */
+		char *src, *dst;
+		int n_suggested_algorithms = 0;
+		char* suggested_algorithms = strdup(value);
+		src = suggested_algorithms;
+		*client_compressors = dst = strdup(value);
+
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+
+		while (*src != '\0')
+		{
+			char* sep = strchr(src, ',');
+			char* col;
+			int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			strcpy(dst, src);
+
+			col = strchr(src, ':');
+			if (col != NULL)
+			{
+				*col = '\0';
+				if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors)
+				{
+					fprintf(stderr,
+							libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"),
+							col+1, value);
+					return false;
+				}
+			}
+			for (i = 0; supported_algorithms[i] != NULL; i++)
+			{
+				if (pg_strcasecmp(src, supported_algorithms[i]) == 0)
+				{
+					if (build_descriptors)
+					{
+						conn->compressors[n_suggested_algorithms].impl = i;
+						conn->compressors[n_suggested_algorithms].level = compression_level;
+					}
+					n_suggested_algorithms += 1;
+					dst += strlen(dst);
+					*dst++ = ',';
+					break;
+				}
+			}
+			if (sep)
+				src = sep+1;
+			else
+				break;
+		}
+		free(suggested_algorithms);
+		conn->n_compressors = n_suggested_algorithms;
+		if (n_suggested_algorithms == 0)
+		{
+			if (!build_descriptors)
+				fprintf(stderr,
+						libpq_gettext("WARNING: none of the specified algorithms are supported by client: %s\n"),
+						value);
+			else
+			{
+				free(conn->compressors);
+				conn->compressors = NULL;
+				conn->n_compressors = 0;
+			}
+			free(*client_compressors);
+			*client_compressors = NULL;
+			return false;
+		}
+		dst[-1] = '\0';
+		return true;
+	}
+}
+
 /*
  * Build a startup packet given a filled-in PGconn structure.
  *
@@ -2180,6 +2328,17 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		char* client_compression_algorithms;
+		if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL))
+		{
+			if (client_compression_algorithms)
+			{
+				ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+			}
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index e1018adb9e..88fbd0c371 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
 								 * found in password file. */
 } pg_conn_host;
 
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+	int			impl;			/* compression implementation index */
+	int			level;			/* compression level */
+}			pg_conn_compressor;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -370,6 +381,13 @@ struct pg_conn
 	char	   *ssl_min_protocol_version;	/* minimum TLS protocol version */
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 
+	char	   *compression;	/* stream compression (boolean value, "any" or
+								 * list of compression algorithms separated by
+								 * comma) */
+	pg_conn_compressor *compressors;	/* descriptors of compression
+										 * algorithms chosen by client */
+	unsigned			n_compressors;  /* size of compressors array  */
+
 	/* Type of connection to make.  Possible values: any, read-write. */
 	char	   *target_session_attrs;
 
@@ -527,6 +545,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream  *zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a687e99d1e..93d6346a85 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2024,7 +2030,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2055,7 +2061,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 7f014a12c9..fd9efc1b64 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -125,7 +125,7 @@ sub mkvcbuild
 	  keywords.c kwlookup.c link-canary.c md5_common.c
 	  pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
-	  wait_error.c wchar.c);
+	  wait_error.c wchar.c zpq_stream.c);
 
 	if ($solution->{options}->{openssl})
 	{
-- 
2.26.2

Reply via email to