On 18.06.2018 23:34, Robbie Harwood wrote:
t


Konstantin Knizhnik <k.knizh...@postgrespro.ru> writes:

On 06.06.2018 02:03, Thomas Munro wrote:
On Wed, Jun 6, 2018 at 2:06 AM, Konstantin Knizhnik
<k.knizh...@postgrespro.ru> wrote:
Thank you for review. Updated version of the patch fixing all reported
problems is attached.
Small problem on Windows[1]:

    C:\projects\postgresql\src\include\common/zpq_stream.h(17): error
C2143: syntax error : missing ')' before '*'
[C:\projects\postgresql\libpq.vcxproj]
2395

You used ssize_t in zpq_stream.h, but Windows doesn't have that type.
We have our own typedef in win32_port.h.  Perhaps zpq_stream.c should
include postgres.h/postgres_fe.h (depending on FRONTEND) like the
other .c files in src/common, before it includes zpq_stream.h?
Instead of "c.h".

[1] https://ci.appveyor.com/project/postgresql-cfbot/postgresql/build/1.0.1106

Thank you very much for reporting the problem.
I attached new patch with include of postgres_fe.h added to zpq_stream.c
Hello!

Due to being in a similar place, I'm offering some code review.  I'm
excited that you're looking at throughput on the network stack - it's
not usually what we think of in database performance.  Here are some
first thoughts, which have some overlap with what others have said on
this thread already:

###

This build still doesn't pass Windows:
https://ci.appveyor.com/project/postgresql-cfbot/postgresql/build/1.0.2277

You can find more about what the bot is doing here:
http://cfbot.cputube.org/
Thank you.
Looks like I found the reason: Mkvsbuild.pm has to be patched.



###

I have a few misgivings about pq_configure(), starting with the name.
The *only* thing this function does is set up compression, so it's
mis-named.  (There's no point in making something generic unless it's
needed - it's just confusing.)
Well, my intention was that this function *may* in future perform some other configuration setting not related with compression. And it is better to encapsulate this knowledge in pqcomm, rather than makeĀ  postmaster (BackendStartup) worry about it. But I can rename this function to pq_cofigure_compression() or whatever your prefer.


I also don't like that you've injected into the *startup* path - before
authentication takes place.  Fundamentally, authentication (if it
happens) consists of exchanging some combination of short strings (e.g.,
usernames) and binary blobs (e.g., keys).  None of this will compress
well, so I don't see it as worth performing this negotiation there - it
can wait.  It's also another message in every startup.  I'd leave it to
connection parameters, personally, but up to you.

From my point of view compression of libpq traffic is similar with SSL and should be toggled at the same stage. Definitely authentication parameter are not so large to be efficiently compressed, by compression (may be in future password protected) can somehow protect this data. In any case I do not think that compression of authentication data may have any influence on negotiation speed. So I am not 100% sure that toggling compression just after receiving startup package is the only right solution. But I am not also convinced in that there is some better place where compressor should be configured. Do you have some concrete suggestions for it? In InitPostgres just after PerformAuthentication ? Also please notice that compression is useful not only for client-server communication, but also for replication channels. Right now it is definitely used in both cases, but if we move pq_configure somewhere else, we should check that this code is invoked in both for normal backends and walsender.


###

Documentation!  You're going to need it.  There needs to be enough
around for other people to implement the protocol (or if you prefer,
enough for us to debug the protocol as it exists).

In conjunction with that, please add information on how to set up
compressed vs. uncompressed connections - similarly to how we've
documentation on setting up TLS connection (though presumably compressed
connection documentation will be shorter).

Sorry, definitely I will add documentation for configuring compression.


###

Using terminology from https://facebook.github.io/zstd/zstd_manual.html :

Right now you use streaming (ZSTD_{compress,decompress}Stream()) as the
basis for your API.  I think this is probably a mismatch for what we're
doing here - we're doing one-shot compression/decompression of packets,
not sending video or something.

I think our use case is better served by the non-streaming interface, or
what they call the "Simple API" (ZSTD_{decompress,compress}()).
Documentation suggests it may be worth it to keep an explicit context
around and use that interface instead (i.e.,
ZSTD_{compressCCTx,decompressDCtx}()), but that's something you'll have
to figure out.

You may find making this change helpful for addressing the next issue.

Sorry, but here I completely disagree with you.
What we are doing is really streaming compression, not compression of individual messages/packages. Yes, it is not a video, but actually COPY data has the same nature as video data. The main benefit of streaming compression is that we can use the same dictionary for compressing all messages (and adjust this dictionary based on new data). We do not need to write dictionary and separate header for each record. Otherwize compression of libpq messages will be completely useless: typical size of message is too short to be efficiently compressed. The main drawback of streaming compression is that you can not decompress some particular message without decompression of all previous messages. This is why streaming compression can not be used to compress database pagesĀ  (as it is done in CFS, provided in PostgresPro EE). But for libpq it is no needed.


###

I don't like where you've put the entry points to the compression logic:
it's a layering violation.  A couple people have expressed similar
reservations I think, so let me see if I can explain using
`pqsecure_read()` as an example.  In pseudocode, `pqsecure_read()` looks
like this:

     if conn->is_tls:
         n = tls_read(conn, ptr, len)
     else:
         n = pqsecure_raw_read(conn, ptr, len)
     return n

I want to see this extended by your code to something like:

     if conn->is_tls:
         n = tls_read(conn, ptr, len)
     else:
         n = pqsecure_raw_read(conn, ptr, len)

     if conn->is_compressed:
         n = decompress(ptr, n)

     return n

In conjunction with the above change, this should also significantly
reduce the size of the patch (I think).

Yes, it will simplify patch. But make libpq compression completely useless (see my explanation above). We need to use streaming compression, and to be able to use streaming compression I have to pass function for fetching more data to compression library.


###

The compression flag has proven somewhat contentious, as you've already
seen on this thread.  I recommend removing it for now and putting it in
a separate patch to be merged later, since it's separable.

###

It's not worth flagging style violations in your code right now, but you
should be aware that there are quite a few style and whitespace
problems.  In particular, please be sure that you're using hard tabs
when appropriate, and that line lengths are fewer than 80 characters
(unless they contain error messages), and that pointers are correctly
declared (`void *arg`, not `void* arg`).

###

Ok, I will fix it.


Thanks,
--Robbie

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/configure b/configure
index 0aafd9c..fc5685c 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ ELF_SYS
 EGREP
 GREP
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 with_libxml
@@ -863,6 +864,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 enable_float4_byval
@@ -8017,6 +8019,86 @@ fi
 
 
 #
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
+#
 # Elf
 #
 
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 95d090e..b59629c 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 25af514..a8e461a 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -51,6 +51,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..c963657 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include  "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream* PqStream;
+
+
 /*
  * Message status
  */
@@ -185,6 +189,31 @@ PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only conpression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+	if (port->use_compression)
+	{
+		char compression = 'z'; /* Request compression message */
+		int rc;
+		/* Switch on compression at client side */
+		socket_set_nonblocking(false);
+		while ((rc = secure_write(MyProcPort, &compression, 1)) < 0 && errno == EINTR);
+		if (rc != 1)
+			return -1;
+
+		/* initialize compression */
+		PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -225,6 +254,7 @@ pq_init(void)
 					  NULL, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
 }
 
 /* --------------------------------
@@ -282,6 +312,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +965,14 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			   r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -953,21 +988,37 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		size_t processed = 0;
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		PqRecvLength += processed;
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const* msg = zpq_error(PqStream);
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -988,7 +1039,7 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		return r;
 	}
 }
 
@@ -1003,7 +1054,7 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1073,7 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1094,11 @@ pq_getbyte_if_available(unsigned char *c)
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1101,7 +1119,7 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1153,7 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1194,7 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1426,13 +1444,18 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
 	{
-		int			r;
-
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
-		if (r <= 0)
+		int		r;
+		size_t  processed = 0;
+		size_t  available = bufend - bufptr;
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
+
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1480,7 +1503,6 @@ internal_flush(void)
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b3..3928e89 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2053,6 +2053,16 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "compression") == 0)
+			{
+				if (!parse_bool(valptr, &port->use_compression))
+					ereport(FATAL,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid boolean value for parameter \"%s\": \"%s\"",
+									"compression",
+									valptr),
+							 errhint("Valid values are: \"false\", \"off\", 0, \"true\", \"on\", 1.")));
+			}
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4257,6 +4267,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index f0c5149..efc4ac5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -188,6 +188,7 @@ int			nclients = 1;		/* number of clients */
 int			nthreads = 1;		/* number of threads */
 bool		is_connect;			/* establish connection for each transaction */
 bool		is_latencies;		/* report per-command latencies */
+bool        libpq_compression;  /* use libpq compression */
 int			main_pid;			/* main process id used in log filename */
 
 char	   *pghost = "";
@@ -590,6 +591,7 @@ usage(void)
 		   "  -h, --host=HOSTNAME      database server host or socket directory\n"
 		   "  -p, --port=PORT          database server port number\n"
 		   "  -U, --username=USERNAME  connect as specified database user\n"
+		   "  -Z, --compression        use libpq compression\n"
 		   "  -V, --version            output version information, then exit\n"
 		   "  -?, --help               show this help, then exit\n"
 		   "\n"
@@ -1107,7 +1109,7 @@ doConnect(void)
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 		const char *keywords[PARAMS_ARRAY_SIZE];
 		const char *values[PARAMS_ARRAY_SIZE];
@@ -1124,8 +1126,10 @@ doConnect(void)
 		values[4] = dbName;
 		keywords[5] = "fallback_application_name";
 		values[5] = progname;
-		keywords[6] = NULL;
-		values[6] = NULL;
+		keywords[6] = "compression";
+		values[6] = libpq_compression ? "on" : "off";
+		keywords[7] = NULL;
+		values[7] = NULL;
 
 		new_pass = false;
 
@@ -4759,6 +4763,7 @@ main(int argc, char **argv)
 		{"builtin", required_argument, NULL, 'b'},
 		{"client", required_argument, NULL, 'c'},
 		{"connect", no_argument, NULL, 'C'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"debug", no_argument, NULL, 'd'},
 		{"define", required_argument, NULL, 'D'},
 		{"file", required_argument, NULL, 'f'},
@@ -4868,12 +4873,15 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:Z", long_options, &optindex)) != -1)
 	{
 		char	   *script;
 
 		switch (c)
 		{
+			case 'Z':
+				libpq_compression = true;
+				break;
 			case 'i':
 				is_init_mode = true;
 				break;
diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c
index 702e742..ae7a14c 100644
--- a/src/bin/psql/help.c
+++ b/src/bin/psql/help.c
@@ -139,6 +139,7 @@ usage(unsigned short int pager)
 	fprintf(output, _("  -U, --username=USERNAME  database user name (default: \"%s\")\n"), env);
 	fprintf(output, _("  -w, --no-password        never prompt for password\n"));
 	fprintf(output, _("  -W, --password           force password prompt (should happen automatically)\n"));
+	fprintf(output, _("  -Z, --compression        compress traffic with server\n"));
 
 	fprintf(output, _("\nFor more information, type \"\\?\" (for internal commands) or \"\\help\" (for SQL\n"
 					  "commands) from within psql, or consult the psql section in the PostgreSQL\n"
diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c
index be57574..9271716 100644
--- a/src/bin/psql/startup.c
+++ b/src/bin/psql/startup.c
@@ -75,6 +75,7 @@ struct adhoc_opts
 	bool		no_psqlrc;
 	bool		single_txn;
 	bool		list_dbs;
+	bool        compression;
 	SimpleActionList actions;
 };
 
@@ -237,8 +238,10 @@ main(int argc, char *argv[])
 		values[5] = pset.progname;
 		keywords[6] = "client_encoding";
 		values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto";
-		keywords[7] = NULL;
-		values[7] = NULL;
+		keywords[7] = "compression";
+		values[7] = options.compression ? "on" : "off";
+		keywords[8] = NULL;
+		values[8] = NULL;
 
 		new_pass = false;
 		pset.db = PQconnectdbParams(keywords, values, true);
@@ -436,6 +439,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 		{"echo-all", no_argument, NULL, 'a'},
 		{"no-align", no_argument, NULL, 'A'},
 		{"command", required_argument, NULL, 'c'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"dbname", required_argument, NULL, 'd'},
 		{"echo-queries", no_argument, NULL, 'e'},
 		{"echo-errors", no_argument, NULL, 'b'},
@@ -476,7 +480,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 
 	memset(options, 0, sizeof *options);
 
-	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01",
+	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01Z",
 							long_options, &optindex)) != -1)
 	{
 		switch (c)
@@ -540,6 +544,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 			case 'p':
 				options->port = pg_strdup(optarg);
 				break;
+			case 'Z':
+			    options->compression = true;
+				break;
 			case 'P':
 				{
 					char	   *value;
diff --git a/src/bin/scripts/pg_isready.c b/src/bin/scripts/pg_isready.c
index f7ad7b4..5d035cd 100644
--- a/src/bin/scripts/pg_isready.c
+++ b/src/bin/scripts/pg_isready.c
@@ -34,7 +34,7 @@ main(int argc, char **argv)
 	const char *pghostaddr_str = NULL;
 	const char *pgport_str = NULL;
 
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 	const char *keywords[PARAMS_ARRAY_SIZE];
 	const char *values[PARAMS_ARRAY_SIZE];
diff --git a/src/common/Makefile b/src/common/Makefile
index 1fc2c66..f804fe1 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -43,7 +43,7 @@ LIBS += $(PTHREAD_LIBS)
 OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o file_perm.o \
 	ip.o keywords.o md5.o pg_lzcompress.o pgfnames.o psprintf.o relpath.o \
 	rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
-	username.o wait_error.o
+	username.o wait_error.o zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += sha2_openssl.o
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..dbb05f2
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,368 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+#if HAVE_LIBZSTD
+
+#include <malloc.h>
+#include <zstd.h>
+
+#define ZPQ_BUFFER_SIZE (8*1024)
+#define ZSTD_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+	ZSTD_CStream*  tx_stream;
+	ZSTD_DStream*  rx_stream;
+	ZSTD_outBuffer tx;
+	ZSTD_inBuffer  rx;
+	size_t         tx_not_flushed; /* Amount of datas in internal zstd buffer */
+	size_t         tx_buffered;    /* Data which is consumed by zpq_read but not yet sent */
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+	char const*    rx_error;    /* Decompress error message */
+	size_t         tx_total;
+	size_t         tx_total_raw;
+	size_t         rx_total;
+	size_t         rx_total_raw;
+	char           tx_buf[ZPQ_BUFFER_SIZE];
+	char           rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+	zs->tx_stream = ZSTD_createCStream();
+	ZSTD_initCStream(zs->tx_stream, ZSTD_COMPRESSION_LEVEL);
+	zs->rx_stream = ZSTD_createDStream();
+	ZSTD_initDStream(zs->rx_stream);
+	zs->tx.dst = zs->tx_buf;
+	zs->tx.pos = 0;
+	zs->tx.size = ZPQ_BUFFER_SIZE;
+	zs->rx.src = zs->rx_buf;
+	zs->rx.pos = 0;
+	zs->rx.size = 0;
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->tx_buffered = 0;
+	zs->tx_not_flushed = 0;
+	zs->rx_error = NULL;
+	zs->arg = arg;
+	zs->tx_total = zs->tx_total_raw = 0;
+	zs->rx_total = zs->rx_total_raw = 0;
+	return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size, size_t *processed)
+{
+	ssize_t rc;
+	ZSTD_outBuffer out;
+	out.dst = buf;
+	out.pos = 0;
+	out.size = size;
+
+	while (1)
+	{
+		rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+		if (ZSTD_isError(rc))
+		{
+			zs->rx_error = ZSTD_getErrorName(rc);
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+		/* Return result if we fill requested amount of bytes or read operation was performed */
+		if (out.pos != 0)
+		{
+			zs->rx_total_raw += out.pos;
+			return out.pos;
+		}
+		if (zs->rx.pos == zs->rx.size)
+		{
+			zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+		}
+		rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZPQ_BUFFER_SIZE - zs->rx.size);
+		if (rc > 0) /* read fetches some data */
+		{
+			zs->rx.size += rc;
+			zs->rx_total += rc;
+		}
+		else /* read failed */
+		{
+			*processed = out.pos;
+			zs->rx_total_raw += out.pos;
+			return rc;
+		}
+	}
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t *processed)
+{
+	ssize_t rc;
+	ZSTD_inBuffer in_buf;
+	in_buf.src = buf;
+	in_buf.pos = 0;
+	in_buf.size = size;
+
+	do
+	{
+		if (zs->tx.pos == 0) /* Compress buffer is empty */
+		{
+			zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+			if (in_buf.pos < size) /* Has something to compress in input buffer */
+				ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+			if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+			{
+				zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+		if (rc > 0)
+		{
+			zs->tx.pos -= rc;
+			zs->tx.dst = (char*)zs->tx.dst + rc;
+			zs->tx_total += rc;
+		}
+		else
+		{
+			*processed = in_buf.pos;
+			zs->tx_buffered = zs->tx.pos;
+			zs->tx_total_raw += in_buf.pos;
+			return rc;
+		}
+	} while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
+
+	zs->tx_total_raw += in_buf.pos;
+	zs->tx_buffered = zs->tx.pos;
+	return in_buf.pos;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+	if (zs != NULL)
+	{
+		ZSTD_freeCStream(zs->tx_stream);
+		ZSTD_freeDStream(zs->rx_stream);
+		free(zs);
+	}
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+	return zs->rx_error;
+}
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+	return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+#elif HAVE_LIBZ
+
+#include <malloc.h>
+#include <zlib.h>
+
+#define ZPQ_BUFFER_SIZE 8192
+#define ZLIB_COMPRESSION_LEVEL 1
+
+struct ZpqStream
+{
+	z_stream tx;
+	z_stream rx;
+
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+
+	size_t         tx_buffered;
+
+	Bytef          tx_buf[ZPQ_BUFFER_SIZE];
+	Bytef          rx_buf[ZPQ_BUFFER_SIZE];
+};
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	int rc;
+	ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream));
+	memset(&zs->tx, 0, sizeof(zs->tx));
+	zs->tx.next_out = zs->tx_buf;
+	zs->tx.avail_out = ZPQ_BUFFER_SIZE;
+	zs->tx_buffered = 0;
+	rc = deflateInit(&zs->tx, ZLIB_COMPRESSION_LEVEL);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZPQ_BUFFER_SIZE);
+
+	memset(&zs->rx, 0, sizeof(zs->tx));
+	zs->rx.next_in = zs->rx_buf;
+	zs->rx.avail_in = ZPQ_BUFFER_SIZE;
+	rc = inflateInit(&zs->rx);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZPQ_BUFFER_SIZE);
+	zs->rx.avail_in = 0;
+
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->arg = arg;
+
+	return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size, size_t *processed)
+{
+	int rc;
+	zs->rx.next_out = (Bytef *)buf;
+	zs->rx.avail_out = size;
+
+	while (1)
+	{
+		if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+		{
+			rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+			if (rc != Z_OK)
+			{
+				return ZPQ_DECOMPRESS_ERROR;
+			}
+			if (zs->rx.avail_out != size)
+			{
+				return size - zs->rx.avail_out;
+			}
+			if (zs->rx.avail_in == 0)
+			{
+				zs->rx.next_in = zs->rx_buf;
+			}
+		}
+		else
+		{
+			zs->rx.next_in = zs->rx_buf;
+		}
+		rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZPQ_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+		if (rc > 0)
+		{
+			zs->rx.avail_in += rc;
+		}
+		else
+		{
+			*processed = size - zs->rx.avail_out;
+			return rc;
+		}
+	}
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t *processed)
+{
+    int rc;
+	zs->tx.next_in = (Bytef *)buf;
+	zs->tx.avail_in = size;
+	do
+	{
+		if (zs->tx.avail_out == ZPQ_BUFFER_SIZE) /* Compress buffer is empty */
+		{
+			zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+
+			if (zs->tx.avail_in != 0) /* Has something in input buffer */
+			{
+				rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+				Assert(rc == Z_OK);
+				zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.next_out, ZPQ_BUFFER_SIZE - zs->tx.avail_out);
+		if (rc > 0)
+		{
+			zs->tx.next_out += rc;
+			zs->tx.avail_out += rc;
+		}
+		else
+		{
+			*processed = size - zs->tx.avail_in;
+			zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+			return rc;
+		}
+	} while (zs->tx.avail_out == ZPQ_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
+
+	zs->tx_buffered = ZPQ_BUFFER_SIZE - zs->tx.avail_out;
+
+	return size - zs->tx.avail_in;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+	if (zs != NULL)
+	{
+		inflateEnd(&zs->rx);
+		deflateEnd(&zs->tx);
+		free(zs);
+	}
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+	return zs->rx.msg;
+}
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+	return zs != NULL ? zs->tx_buffered : 0;
+}
+
+#else
+
+ZpqStream*
+zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg)
+{
+	return NULL;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+	return -1;
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size)
+{
+	return -1;
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+	return NULL;
+}
+
+
+size_t
+zpq_buffered(ZpqStream *zs)
+{
+	return 0;
+}
+
+#endif
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..dc765af
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,28 @@
+/*
+ * zpq_stream.h
+ *     Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+
+ZpqStream* zpq_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size, size_t* processed);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 7698cd1..5203f2d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -182,6 +182,8 @@ typedef struct Port
 	char	   *peer_cn;
 	bool		peer_cert_valid;
 
+	bool        use_compression;
+	
 	/*
 	 * OpenSSL structures. (Keep these last so that the locations of other
 	 * fields are the same whether or not you build with OpenSSL.)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 7bf06c6..cb0b69e 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -61,6 +61,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int  pq_configure(Port* port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 9411f48..c0ea383 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -365,6 +365,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index abe0a50..2dad4fb 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,16 @@ endif
 # platforms require special flags.
 LIBS := $(LIBS:-lpgport=)
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
 # We can't use Makefile variables here because the MSVC build system scrapes
 # OBJS from this file.
 OBJS=	fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
@@ -49,7 +59,7 @@ endif
 # src/backend/utils/mb
 OBJS += encnames.o wchar.o
 # src/common
-OBJS += base64.o ip.o md5.o scram-common.o saslprep.o unicode_norm.o
+OBJS += base64.o ip.o md5.o scram-common.o saslprep.o unicode_norm.o zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS += fe-secure-openssl.o fe-secure-common.o sha2_openssl.o
@@ -106,7 +116,7 @@ backend_src = $(top_srcdir)/src/backend
 chklocale.c crypt.c erand48.c getaddrinfo.c getpeereid.c inet_aton.c inet_net_ntop.c noblock.c open.c system.c pgsleep.c pg_strong_random.c pgstrcasecmp.c pqsignal.c snprintf.c strerror.c strlcpy.c strnlen.c thread.c win32error.c win32setlocale.c: % : $(top_srcdir)/src/port/%
 	rm -f $@ && $(LN_S) $< .
 
-ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c: % : $(top_srcdir)/src/common/%
+ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c zpq_stream.c: % : $(top_srcdir)/src/common/%
 	rm -f $@ && $(LN_S) $< .
 
 encnames.c wchar.c: % : $(backend_src)/utils/mb/%
@@ -156,7 +166,7 @@ clean distclean: clean-lib
 	rm -f pg_config_paths.h
 # Remove files we (may have) symlinked in from src/port and other places
 	rm -f chklocale.c crypt.c erand48.c getaddrinfo.c getpeereid.c inet_aton.c inet_net_ntop.c noblock.c open.c system.c pgsleep.c pg_strong_random.c pgstrcasecmp.c pqsignal.c snprintf.c strerror.c strlcpy.c strnlen.c thread.c win32error.c win32setlocale.c
-	rm -f ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c
+	rm -f ip.c md5.c base64.c scram-common.c sha2.c sha2_openssl.c saslprep.c unicode_norm.c zpq_stream.c
 	rm -f encnames.c wchar.c
 
 maintainer-clean: distclean maintainer-clean-lib
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a7e969d..6fabc5b 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -72,6 +72,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 
 #include "common/ip.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "mb/pg_wchar.h"
 #include "port/pg_bswap.h"
 
@@ -269,6 +270,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		21,						/* sizeof("tls-server-end-point") == 21 */
 	offsetof(struct pg_conn, scram_channel_binding)},
 
+	{"compression", "COMPRESSION", "0", NULL,
+		"ZSTD-Compression", "", 1,
+	offsetof(struct pg_conn, compression)},
+
 	/*
 	 * ssl options are allowed even without client SSL support because the
 	 * client can still handle SSL modes "disable" and "allow". Other
@@ -325,6 +330,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", NULL, NULL, NULL,
+		"Compression", "Z", 5,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -430,6 +439,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2648,11 +2661,23 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						/* mark byte consumed */
+						conn->inStart = conn->inCursor;
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+					} else
+						break;
 				}
 
 				/*
@@ -3462,6 +3487,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 2a6637f..488f8d2 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,11 +53,12 @@
 #include "port/pg_bswap.h"
 #include "pg_config_paths.h"
 
+#include  <common/zpq_stream.h>
 
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
-			  time_t end_time);
+static int  pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
@@ -630,6 +631,7 @@ pqReadData(PGconn *conn)
 {
 	int			someread = 0;
 	int			nread;
+	size_t      processed;
 
 	if (conn->sock == PGINVALID_SOCKET)
 	{
@@ -678,10 +680,23 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry3;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -768,10 +783,24 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry4;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -842,12 +871,14 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -855,8 +886,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -896,7 +930,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8345faf..3942be1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2179,6 +2179,8 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("database", conn->dbName);
 	if (conn->replication && conn->replication[0])
 		ADD_STARTUP_OPTION("replication", conn->replication);
+	if (conn->compression && conn->compression[0])
+		ADD_STARTUP_OPTION("compression", conn->compression);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
 	if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 9a586ff..6344eb6 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -357,6 +358,7 @@ struct pg_conn
 	char	   *sslrootcert;	/* root certificate filename */
 	char	   *sslcrl;			/* certificate revocation list filename */
 	char	   *requirepeer;	/* required peer credentials for local sockets */
+	char	   *compression;    /* stream compression (0 or 1) */
 
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 	char	   *krbsrvname;		/* Kerberos service name */
@@ -495,6 +497,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream* zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 593732f..ab2bad5 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -116,7 +116,7 @@ sub mkvcbuild
 
 	our @pgcommonallfiles = qw(
 	  base64.c config_info.c controldata_utils.c exec.c file_perm.c ip.c
-	  keywords.c md5.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+	  keywords.c md5.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c unicode_norm.c username.c
 	  wait_error.c);
 

Reply via email to