On 17.03.2018 17:18, Peter Eisentraut wrote:
On 3/11/18 13:28, Tom Lane wrote:
My proposal is the attached patch that sets the default in libpq to off
and adjusts the documentation a bit so it doesn't sound like we have
missed the news altogether.
Seems reasonable as far as it goes, but do we need to make corresponding
server-side changes?
We could add a setting to disable SSL compression on the server, as some
web servers have been doing, but the niche of version combinations where
that would be applicable seems pretty low.
One of our customers was managed to improve speed about 10 times by
using SSL compression for the system where client and servers are
located in different geographical regions
and query results are very large because of JSON columns. Them actually
do not need encryption, just compression.
I expect that it is not the only case where compression of libpq
protocol can be useful. Please notice that logical replication is also
using libpq protocol.
If SSL compression is deprecated, should we provide own compression?
I have implemented some prototype implementation of it (patch is attached).
I have added compression=on/off parameter to connection string and -Z
option to psql and pgbench utilities.
Below are some results:
Compression ratio (raw->compressed):
libz (level=1)
libzstd (level=1)
pgbench -i -s 10
16997209->2536330
16997209->268077
pgbench -t 100000 -S
6289036->1523862
6600338<-900293
6288933->1777400
6600338<-1000318
There is no mistyping: libzstd compress COPY data about 10 times better
than libz, with wonderful compression ratio 63.
Influence on execution time is minimal (I have tested local
configuration when client and server are at the same host):
no compression
libz (level=1)
libzstd (level=1)
pgbench -i -s 10
1.552
1.572
1.611
pgbench -t 100000 -S
4.482
4.926
4.877
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/configure b/configure
index 6f659a5..f2ef62d 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ ELF_SYS
EGREP
GREP
with_zlib
+with_zstd
with_system_tzdata
with_libxslt
with_libxml
@@ -862,6 +863,7 @@ with_libxml
with_libxslt
with_system_tzdata
with_zlib
+with_zstd
with_gnu_ld
enable_largefile
enable_float4_byval
@@ -8016,6 +8018,86 @@ fi
#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+ withval=$with_zstd;
+ case $withval in
+ yes)
+ ;;
+ no)
+ :
+ ;;
+ *)
+ as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+ ;;
+ esac
+
+else
+ with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zstd_ZSTD_compress=yes
+else
+ ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+ LIBS="-lzstd $LIBS"
+
+else
+ as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
+#
# Elf
#
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 3bbdf17..08b8ab2 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -195,6 +195,7 @@ with_llvm = @with_llvm@
with_system_tzdata = @with_system_tzdata@
with_uuid = @with_uuid@
with_zlib = @with_zlib@
+with_zstd = @with_zstd@
enable_rpath = @enable_rpath@
enable_nls = @enable_nls@
enable_debug = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 6450c5a..1522c70 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -50,6 +50,14 @@ ifeq ($(with_systemd),yes)
LIBS += -lsystemd
endif
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
##########################################################################
all: submake-libpgport submake-schemapg postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..3a6f54b 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
+#include "common/zpq_stream.h"
/*
* Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
+static ZpqStream* PqStream;
+
+
/*
* Message status
*/
@@ -185,6 +189,31 @@ PQcommMethods *PqCommMethods = &PqCommSocketMethods;
WaitEventSet *FeBeWaitSet;
+/* --------------------------------
+ * pq_configure - configure connection using port settings
+ *
+ * Right now only conpression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+ if (port->use_compression)
+ {
+ char compression = 'z'; /* Request compression message */
+ int rc;
+ /* Switch on compression at client side */
+ socket_set_nonblocking(false);
+ while ((rc = secure_write(MyProcPort, &compression, 1)) < 0 && errno == EINTR);
+ if (rc != 1)
+ return -1;
+
+ /* initialize compression */
+ PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+ }
+ return 0;
+}
/* --------------------------------
* pq_init - initialize libpq at backend startup
@@ -225,6 +254,7 @@ pq_init(void)
NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
}
/* --------------------------------
@@ -282,6 +312,9 @@ socket_close(int code, Datum arg)
free(MyProcPort->gss);
#endif /* ENABLE_GSS || ENABLE_SSPI */
+ /* Release compression streams */
+ zpq_free(PqStream);
+
/*
* Cleanly shut down SSL layer. Nowhere else does a postmaster child
* call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +965,14 @@ socket_set_nonblocking(bool nonblocking)
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
*
- * returns 0 if OK, EOF if trouble
+ * returns number of read bytes, EOF if trouble
* --------------------------------
*/
static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
{
+ int r;
+
if (PqRecvPointer > 0)
{
if (PqRecvLength > PqRecvPointer)
@@ -953,21 +988,34 @@ pq_recvbuf(void)
}
/* Ensure that we're in blocking mode */
- socket_set_nonblocking(false);
+ socket_set_nonblocking(nowait);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
- int r;
-
- r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
- PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ size_t processed = 0;
+ r = PqStream
+ ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
+ : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+ PQ_RECV_BUFFER_SIZE - PqRecvLength);
+ PqRecvLength += processed;
if (r < 0)
{
+ if (r == ZPQ_DECOMPRESS_ERROR)
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("Failed to decompress data: %s", zpq_error(PqStream))));
+ return EOF;
+ }
if (errno == EINTR)
continue; /* Ok if interrupted */
+ if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return 0;
+
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
@@ -988,7 +1036,7 @@ pq_recvbuf(void)
}
/* r contains number of bytes read, so just incr length */
PqRecvLength += r;
- return 0;
+ return r;
}
}
@@ -1003,7 +1051,7 @@ pq_getbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1070,7 @@ pq_peekbyte(void)
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1091,11 @@ pq_getbyte_if_available(unsigned char *c)
Assert(PqCommReadingMsg);
- if (PqRecvPointer < PqRecvLength)
+ if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
{
*c = PqRecvBuffer[PqRecvPointer++];
return 1;
}
-
- /* Put the socket into non-blocking mode */
- socket_set_nonblocking(true);
-
- r = secure_read(MyProcPort, c, 1);
- if (r < 0)
- {
- /*
- * Ok if no data available without blocking or interrupted (though
- * EINTR really shouldn't happen with a non-blocking socket). Report
- * other errors.
- */
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
- r = 0;
- else
- {
- /*
- * Careful: an ereport() that tries to write to the client would
- * cause recursion to here, leading to stack overflow and core
- * dump! This message must go *only* to the postmaster log.
- */
- ereport(COMMERROR,
- (errcode_for_socket_access(),
- errmsg("could not receive data from client: %m")));
- r = EOF;
- }
- }
- else if (r == 0)
- {
- /* EOF detected */
- r = EOF;
- }
-
return r;
}
@@ -1101,7 +1116,7 @@ pq_getbytes(char *s, size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1150,7 @@ pq_discardbytes(size_t len)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1191,7 @@ pq_getstring(StringInfo s)
{
while (PqRecvPointer >= PqRecvLength)
{
- if (pq_recvbuf()) /* If nothing in buffer, then recv some */
+ if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */
return EOF; /* Failed to recv data */
}
@@ -1426,13 +1441,18 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
- while (bufptr < bufend)
+ while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
{
- int r;
-
- r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
- if (r <= 0)
+ int r;
+ size_t processed = 0;
+ size_t available = bufend - bufptr;
+ r = PqStream
+ ? zpq_write(PqStream, bufptr, available, &processed)
+ : secure_write(MyProcPort, bufptr, available);
+ bufptr += processed;
+ PqSendStart += processed;
+
+ if (r < 0 || (r == 0 && available))
{
if (errno == EINTR)
continue; /* Ok if we were interrupted */
@@ -1480,7 +1500,6 @@ internal_flush(void)
bufptr += r;
PqSendStart += r;
}
-
PqSendStart = PqSendPointer = 0;
return 0;
}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 660f318..295d756 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2101,6 +2101,16 @@ retry1:
port->database_name = pstrdup(valptr);
else if (strcmp(nameptr, "user") == 0)
port->user_name = pstrdup(valptr);
+ else if (strcmp(nameptr, "compression") == 0)
+ {
+ if (!parse_bool(valptr, &port->use_compression))
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid boolean value for parameter \"%s\": \"%s\"",
+ "compression",
+ valptr),
+ errhint("Valid values are: \"false\", \"off\", 0, \"true\", \"on\", 1.")));
+ }
else if (strcmp(nameptr, "options") == 0)
port->cmdline_options = pstrdup(valptr);
else if (strcmp(nameptr, "replication") == 0)
@@ -4305,6 +4315,14 @@ BackendInitialize(Port *port)
if (status != STATUS_OK)
proc_exit(0);
+ if (pq_configure(port))
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("failed to send compression message: %m")));
+ proc_exit(0);
+ }
+
/*
* Now that we have the user and database name, we can set the process
* title for ps. It's good to do this as early as possible in startup.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a15aa06..769a06d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -183,6 +183,7 @@ int nclients = 1; /* number of clients */
int nthreads = 1; /* number of threads */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
+bool libpq_compression; /* use libpq compression */
int main_pid; /* main process id used in log filename */
char *pghost = "";
@@ -575,6 +576,7 @@ usage(void)
" -h, --host=HOSTNAME database server host or socket directory\n"
" -p, --port=PORT database server port number\n"
" -U, --username=USERNAME connect as specified database user\n"
+ " -Z, --compression use libpq compression\n"
" -V, --version output version information, then exit\n"
" -?, --help show this help, then exit\n"
"\n"
@@ -1092,7 +1094,7 @@ doConnect(void)
*/
do
{
-#define PARAMS_ARRAY_SIZE 7
+#define PARAMS_ARRAY_SIZE 8
const char *keywords[PARAMS_ARRAY_SIZE];
const char *values[PARAMS_ARRAY_SIZE];
@@ -1109,8 +1111,10 @@ doConnect(void)
values[4] = dbName;
keywords[5] = "fallback_application_name";
values[5] = progname;
- keywords[6] = NULL;
- values[6] = NULL;
+ keywords[6] = "compression";
+ values[6] = libpq_compression ? "on" : "off";
+ keywords[7] = NULL;
+ values[7] = NULL;
new_pass = false;
@@ -4442,6 +4446,7 @@ main(int argc, char **argv)
{"builtin", required_argument, NULL, 'b'},
{"client", required_argument, NULL, 'c'},
{"connect", no_argument, NULL, 'C'},
+ {"compression", no_argument, NULL, 'Z'},
{"debug", no_argument, NULL, 'd'},
{"define", required_argument, NULL, 'D'},
{"file", required_argument, NULL, 'f'},
@@ -4543,12 +4548,15 @@ main(int argc, char **argv)
state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
- while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:Z", long_options, &optindex)) != -1)
{
char *script;
switch (c)
{
+ case 'Z':
+ libpq_compression = true;
+ break;
case 'i':
is_init_mode = true;
break;
diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c
index be57574..9271716 100644
--- a/src/bin/psql/startup.c
+++ b/src/bin/psql/startup.c
@@ -75,6 +75,7 @@ struct adhoc_opts
bool no_psqlrc;
bool single_txn;
bool list_dbs;
+ bool compression;
SimpleActionList actions;
};
@@ -237,8 +238,10 @@ main(int argc, char *argv[])
values[5] = pset.progname;
keywords[6] = "client_encoding";
values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto";
- keywords[7] = NULL;
- values[7] = NULL;
+ keywords[7] = "compression";
+ values[7] = options.compression ? "on" : "off";
+ keywords[8] = NULL;
+ values[8] = NULL;
new_pass = false;
pset.db = PQconnectdbParams(keywords, values, true);
@@ -436,6 +439,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
{"echo-all", no_argument, NULL, 'a'},
{"no-align", no_argument, NULL, 'A'},
{"command", required_argument, NULL, 'c'},
+ {"compression", no_argument, NULL, 'Z'},
{"dbname", required_argument, NULL, 'd'},
{"echo-queries", no_argument, NULL, 'e'},
{"echo-errors", no_argument, NULL, 'b'},
@@ -476,7 +480,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
memset(options, 0, sizeof *options);
- while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01",
+ while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01Z",
long_options, &optindex)) != -1)
{
switch (c)
@@ -540,6 +544,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
case 'p':
options->port = pg_strdup(optarg);
break;
+ case 'Z':
+ options->compression = true;
+ break;
case 'P':
{
char *value;
diff --git a/src/bin/scripts/pg_isready.c b/src/bin/scripts/pg_isready.c
index f7ad7b4..5d035cd 100644
--- a/src/bin/scripts/pg_isready.c
+++ b/src/bin/scripts/pg_isready.c
@@ -34,7 +34,7 @@ main(int argc, char **argv)
const char *pghostaddr_str = NULL;
const char *pgport_str = NULL;
-#define PARAMS_ARRAY_SIZE 7
+#define PARAMS_ARRAY_SIZE 8
const char *keywords[PARAMS_ARRAY_SIZE];
const char *values[PARAMS_ARRAY_SIZE];
diff --git a/src/common/Makefile b/src/common/Makefile
index 80e78d7..1548196 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -43,7 +43,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\""
OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o ip.o \
keywords.o md5.o pg_lzcompress.o pgfnames.o psprintf.o relpath.o \
rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
- username.o wait_error.o
+ username.o wait_error.o zpq_stream.o
ifeq ($(with_openssl),yes)
OBJS_COMMON += sha2_openssl.o
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 7698cd1..5203f2d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -182,6 +182,8 @@ typedef struct Port
char *peer_cn;
bool peer_cert_valid;
+ bool use_compression;
+
/*
* OpenSSL structures. (Keep these last so that the locations of other
* fields are the same whether or not you build with OpenSSL.)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 997947b..4ae548a 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -61,6 +61,7 @@ extern void StreamClose(pgsocket sock);
extern void TouchSocketFiles(void);
extern void RemoveSocketFiles(void);
extern void pq_init(void);
+extern int pq_configure(Port* port);
extern int pq_getbytes(char *s, size_t len);
extern int pq_getstring(StringInfo s);
extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 625cd21..305a53b 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -365,6 +365,9 @@
/* Define to 1 if you have the `z' library (-lz). */
#undef HAVE_LIBZ
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
/* Define to 1 if the system has the type `locale_t'. */
#undef HAVE_LOCALE_T
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index abe0a50..e14c1f7 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,10 @@ endif
# platforms require special flags.
LIBS := $(LIBS:-lpgport=)
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
# We can't use Makefile variables here because the MSVC build system scrapes
# OBJS from this file.
OBJS= fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 39c1999..67462eb 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -72,6 +72,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
#include "common/ip.h"
#include "common/scram-common.h"
+#include "common/zpq_stream.h"
#include "mb/pg_wchar.h"
#include "port/pg_bswap.h"
@@ -269,6 +270,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
21, /* sizeof("tls-server-end-point") == 21 */
offsetof(struct pg_conn, scram_channel_binding)},
+ {"compression", "COMPRESSION", "0", NULL,
+ "ZSTD-Compression", "", 1,
+ offsetof(struct pg_conn, compression)},
+
/*
* ssl options are allowed even without client SSL support because the
* client can still handle SSL modes "disable" and "allow". Other
@@ -325,6 +330,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
"Replication", "D", 5,
offsetof(struct pg_conn, replication)},
+ {"compression", NULL, NULL, NULL,
+ "Compression", "Z", 5,
+ offsetof(struct pg_conn, compression)},
+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
DefaultTargetSessionAttrs, NULL,
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -430,6 +439,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
void
pqDropConnection(PGconn *conn, bool flushInput)
{
+ /* Release compression streams */
+ zpq_free(conn->zstream);
+ conn->zstream = NULL;
+
/* Drop any SSL state */
pqsecure_close(conn);
@@ -2648,11 +2661,23 @@ keep_going: /* We will come back to here until there is
*/
conn->inCursor = conn->inStart;
- /* Read type byte */
- if (pqGetc(&beresp, conn))
+ while (1)
{
- /* We'll come back when there is more data */
- return PGRES_POLLING_READING;
+ /* Read type byte */
+ if (pqGetc(&beresp, conn))
+ {
+ /* We'll come back when there is more data */
+ return PGRES_POLLING_READING;
+ }
+
+ if (beresp == 'z') /* Switch on compression */
+ {
+ /* mark byte consumed */
+ conn->inStart = conn->inCursor;
+ Assert(!conn->zstream);
+ conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+ } else
+ break;
}
/*
@@ -3462,6 +3487,8 @@ freePGconn(PGconn *conn)
free(conn->dbName);
if (conn->replication)
free(conn->replication);
+ if (conn->compression)
+ free(conn->compression);
if (conn->pguser)
free(conn->pguser);
if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 2a6637f..488f8d2 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,11 +53,12 @@
#include "port/pg_bswap.h"
#include "pg_config_paths.h"
+#include <common/zpq_stream.h>
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
- time_t end_time);
+static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+ time_t end_time);
static int pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
/*
@@ -630,6 +631,7 @@ pqReadData(PGconn *conn)
{
int someread = 0;
int nread;
+ size_t processed;
if (conn->sock == PGINVALID_SOCKET)
{
@@ -678,10 +680,23 @@ pqReadData(PGconn *conn)
/* OK, try to read some data */
retry3:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ processed = 0;
+ nread = conn->zstream
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd, &processed)
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd);
+ conn->inEnd += processed;
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
if (SOCK_ERRNO == EINTR)
goto retry3;
/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -768,10 +783,24 @@ retry3:
* arrived.
*/
retry4:
- nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
- conn->inBufSize - conn->inEnd);
+ processed = 0;
+ nread = conn->zstream
+ ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd, &processed)
+ : pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+ conn->inBufSize - conn->inEnd);
+ conn->inEnd += processed;
+
if (nread < 0)
{
+ if (nread == ZPQ_DECOMPRESS_ERROR)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("decompress error: %s\n"),
+ zpq_error(conn->zstream));
+ return -1;
+ }
+
if (SOCK_ERRNO == EINTR)
goto retry4;
/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -842,12 +871,14 @@ pqSendSome(PGconn *conn, int len)
}
/* while there's still data to send */
- while (len > 0)
+ while (len > 0 || zpq_buffered(conn->zstream))
{
int sent;
-
+ size_t processed = 0;
+ sent = conn->zstream
+ ? zpq_write(conn->zstream, ptr, len, &processed)
#ifndef WIN32
- sent = pqsecure_write(conn, ptr, len);
+ : pqsecure_write(conn, ptr, len);
#else
/*
@@ -855,8 +886,11 @@ pqSendSome(PGconn *conn, int len)
* failure-point appears to be different in different versions of
* Windows, but 64k should always be safe.
*/
- sent = pqsecure_write(conn, ptr, Min(len, 65536));
+ : pqsecure_write(conn, ptr, Min(len, 65536));
#endif
+ ptr += processed;
+ len -= processed;
+ remaining -= processed;
if (sent < 0)
{
@@ -896,7 +930,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}
- if (len > 0)
+ if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index d3ca5d2..fe82457 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2171,6 +2171,8 @@ build_startup_packet(const PGconn *conn, char *packet,
ADD_STARTUP_OPTION("database", conn->dbName);
if (conn->replication && conn->replication[0])
ADD_STARTUP_OPTION("replication", conn->replication);
+ if (conn->compression && conn->compression[0])
+ ADD_STARTUP_OPTION("compression", conn->compression);
if (conn->pgoptions && conn->pgoptions[0])
ADD_STARTUP_OPTION("options", conn->pgoptions);
if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index eba23dc..c9bf84c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
/* include stuff common to fe and be */
#include "getaddrinfo.h"
#include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
/* include stuff found in fe only */
#include "pqexpbuffer.h"
@@ -357,6 +358,7 @@ struct pg_conn
char *sslrootcert; /* root certificate filename */
char *sslcrl; /* certificate revocation list filename */
char *requirepeer; /* required peer credentials for local sockets */
+ char *compression; /* stream compression (0 or 1) */
#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
char *krbsrvname; /* Kerberos service name */
@@ -495,6 +497,9 @@ struct pg_conn
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
+
+ /* Compression stream */
+ ZpqStream* zstream;
};
/* PGcancel stores all data necessary to cancel a connection. A copy of this