On Mon, Dec 21, 2020 at 03:02:40PM -0500, Tom Lane wrote:
> Justin Pryzby <pry...@telsasoft.com> writes:
> > I found that our largest tables are 40% smaller and 20% faster to pipe
> > pg_dump -Fc -Z0 |zstd relative to native zlib
> 
> The patch might be a tad smaller if you hadn't included a core file in it.

About 89% smaller.

This also fixes the extension (.zst)
And fixes zlib default compression.
And a bunch of cleanup.

-- 
Justin
>From c506c8a93ae70726a5b4b33a1d0098caf5665f3a Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Mon, 21 Dec 2020 00:32:32 -0600
Subject: [PATCH 1/7] fix pre-existing docs/comments

---
 doc/src/sgml/ref/pg_dump.sgml         | 2 +-
 src/bin/pg_dump/pg_backup_directory.c | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 0aa35cf0c3..dcb25dc3cd 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -621,7 +621,7 @@ PostgreSQL documentation
       <listitem>
        <para>
         Specify the compression level to use.  Zero means no compression.
-        For the custom archive format, this specifies compression of
+        For the custom and directory archive formats, this specifies compression of
         individual table-data segments, and the default is to compress
         at a moderate level.
         For plain text output, setting a nonzero compression level causes
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 48fa7cb1a3..650b542fce 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -4,7 +4,7 @@
  *
  *	A directory format dump is a directory, which contains a "toc.dat" file
  *	for the TOC, and a separate file for each data entry, named "<oid>.dat".
- *	Large objects (BLOBs) are stored in separate files named "blob_<uid>.dat",
+ *	Large objects (BLOBs) are stored in separate files named "blob_<oid>.dat",
  *	and there's a plain-text TOC file for them called "blobs.toc". If
  *	compression is used, each data file is individually compressed and the
  *	".gz" suffix is added to the filenames. The TOC files are never
-- 
2.17.0

>From 49f400b5b2e559cc2ed6be82e04a4dc4ba5c63b3 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 12 Dec 2020 00:11:37 -0600
Subject: [PATCH 2/7] Fix malformed comment..

broken since bf50caf10.
---
 src/bin/pg_dump/pg_backup_archiver.h | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 177360ed6e..6a5a22637b 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -329,9 +329,12 @@ struct _archiveHandle
 	DumpId	   *tableDataId;	/* TABLE DATA ids, indexed by table dumpId */
 
 	struct _tocEntry *currToc;	/* Used when dumping data */
-	int			compression;	/* Compression requested on open Possible
-								 * values for compression: -1
-								 * Z_DEFAULT_COMPRESSION 0	COMPRESSION_NONE
+	int			compression;	/*---------
+								 * Compression requested on open
+								 * Possible values for compression:
+								 * -2	ZSTD_COMPRESSION
+								 * -1	Z_DEFAULT_COMPRESSION
+								 *  0	COMPRESSION_NONE
 								 * 1-9 levels for gzip compression */
 	bool		dosync;			/* data requested to be synced on sight */
 	ArchiveMode mode;			/* File mode - r or w */
-- 
2.17.0

>From 8a8939a3060c984af289b5fe62754d94f2675248 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 11 Dec 2020 15:01:25 -0600
Subject: [PATCH 3/7] pg_dump: zstd compression

document any change in search for .gz?
docs
---
 configure                             | 123 ++++++-
 configure.ac                          |  22 ++
 src/bin/pg_dump/compress_io.c         | 475 ++++++++++++++++++++++++--
 src/bin/pg_dump/compress_io.h         |   5 +-
 src/bin/pg_dump/pg_backup_archiver.h  |   5 +
 src/bin/pg_dump/pg_backup_directory.c |   6 +-
 src/bin/pg_dump/pg_dump.c             |   3 +-
 src/include/pg_config.h.in            |   3 +
 8 files changed, 597 insertions(+), 45 deletions(-)

diff --git a/configure b/configure
index 11a4284e5b..240e536e04 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ with_gnu_ld
 LD
 LDFLAGS_SL
 LDFLAGS_EX
+with_zstd
 with_zlib
 with_system_tzdata
 with_libxslt
@@ -798,6 +799,7 @@ infodir
 docdir
 oldincludedir
 includedir
+runstatedir
 localstatedir
 sharedstatedir
 sysconfdir
@@ -866,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -935,6 +938,7 @@ datadir='${datarootdir}'
 sysconfdir='${prefix}/etc'
 sharedstatedir='${prefix}/com'
 localstatedir='${prefix}/var'
+runstatedir='${localstatedir}/run'
 includedir='${prefix}/include'
 oldincludedir='/usr/include'
 docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@@ -1187,6 +1191,15 @@ do
   | -silent | --silent | --silen | --sile | --sil)
     silent=yes ;;
 
+  -runstatedir | --runstatedir | --runstatedi | --runstated \
+  | --runstate | --runstat | --runsta | --runst | --runs \
+  | --run | --ru | --r)
+    ac_prev=runstatedir ;;
+  -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
+  | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
+  | --run=* | --ru=* | --r=*)
+    runstatedir=$ac_optarg ;;
+
   -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
     ac_prev=sbindir ;;
   -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@@ -1324,7 +1337,7 @@ fi
 for ac_var in	exec_prefix prefix bindir sbindir libexecdir datarootdir \
 		datadir sysconfdir sharedstatedir localstatedir includedir \
 		oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
-		libdir localedir mandir
+		libdir localedir mandir runstatedir
 do
   eval ac_val=\$$ac_var
   # Remove trailing slashes.
@@ -1477,6 +1490,7 @@ Fine tuning of the installation directories:
   --sysconfdir=DIR        read-only single-machine data [PREFIX/etc]
   --sharedstatedir=DIR    modifiable architecture-independent data [PREFIX/com]
   --localstatedir=DIR     modifiable single-machine data [PREFIX/var]
+  --runstatedir=DIR       modifiable per-process data [LOCALSTATEDIR/run]
   --libdir=DIR            object code libraries [EPREFIX/lib]
   --includedir=DIR        C header files [PREFIX/include]
   --oldincludedir=DIR     C header files for non-gcc [/usr/include]
@@ -1570,6 +1584,7 @@ Optional Packages:
   --with-system-tzdata=DIR
                           use system time zone data in DIR
   --without-zlib          do not use Zlib
+  --without-zstd          do not use Zstd
   --with-gnu-ld           assume the C compiler uses GNU ld [default=no]
 
 Some influential environment variables:
@@ -8601,6 +8616,35 @@ fi
 
 
 
+#
+# Zstd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      :
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=yes
+
+fi
+
+
+
+
 #
 # Assignments
 #
@@ -12092,6 +12136,59 @@ fi
 
 fi
 
+if test "$with_zstd" = yes; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressStream2 in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compressStream2 in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compressStream2+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compressStream2 ();
+int
+main ()
+{
+return ZSTD_compressStream2 ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compressStream2=yes
+else
+  ac_cv_lib_zstd_ZSTD_compressStream2=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressStream2" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compressStream2" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compressStream2" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+fi
+
 if test "$enable_spinlocks" = yes; then
 
 $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13295,6 +13392,20 @@ Use --without-zlib to disable zlib support." "$LINENO" 5
 fi
 
 
+fi
+
+if test "$with_zstd" = yes; then
+  ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default"
+if test "x$ac_cv_header_zstd_h" = xyes; then :
+
+else
+  as_fn_error $? "zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+
 fi
 
 if test "$with_gssapi" = yes ; then
@@ -14689,7 +14800,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14735,7 +14846,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14804,7 +14915,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
diff --git a/configure.ac b/configure.ac
index fc523c6aeb..7f7222159a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, yes,
+              [do not use Zstd])
+AC_SUBST(with_zstd)
+
 #
 # Assignments
 #
@@ -1186,6 +1193,14 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_compressStream2, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1415,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 1417401086..b51ba680a2 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -13,7 +13,7 @@
  * friends, providing an interface similar to those, but abstracts away
  * the possible compression. Both APIs use libz for the compression, but
  * the second API uses gzip headers, so the resulting files can be easily
- * manipulated with the gzip utility.
+ * manipulated with the gzip utility. XXX
  *
  * Compressor API
  * --------------
@@ -41,7 +41,7 @@
  *	libz's gzopen() APIs. It allows you to use the same functions for
  *	compressed and uncompressed streams. cfopen_read() first tries to open
  *	the file with given name, and if it fails, it tries to open the same
- *	file with the .gz suffix. cfopen_write() opens a file for writing, an
+ *	file with the .gz suffix. cfopen_write() opens a file for writing, an XXX
  *	extra argument specifies if the file should be compressed, and adds the
  *	.gz suffix to the filename if so. This allows you to easily handle both
  *	compressed and uncompressed files.
@@ -72,6 +72,18 @@ struct CompressorState
 	char	   *zlibOut;
 	size_t		zlibOutSize;
 #endif
+
+#ifdef HAVE_LIBZSTD
+	union {
+		struct {
+			ZSTD_outBuffer output;
+			ZSTD_inBuffer input;
+			// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+			ZSTD_CStream *cstream;
+		} zstd;
+	} u;
+#endif
+
 };
 
 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
@@ -88,6 +100,15 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
 #endif
 
+#ifdef HAVE_LIBZSTD
+static void InitCompressorZstd(CompressorState *cs, int level);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF);
+#endif
+
 /* Routines that support uncompressed data I/O */
 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
 static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
@@ -101,15 +122,25 @@ static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
 static void
 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
 {
-	if (compression == Z_DEFAULT_COMPRESSION ||
-		(compression > 0 && compression <= 9))
-		*alg = COMPR_ALG_LIBZ;
-	else if (compression == 0)
-		*alg = COMPR_ALG_NONE;
-	else
+	switch (compression)
 	{
-		fatal("invalid compression code: %d", compression);
-		*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
+#ifdef HAVE_LIBZSTD
+		case ZSTD_COMPRESSION:
+			*alg = COMPR_ALG_ZSTD;
+			break;
+#endif
+#ifdef HAVE_ZLIB
+		case Z_DEFAULT_COMPRESSION:
+		case 1..9:
+			*alg = COMPR_ALG_LIBZ;
+			break;
+#endif
+		case 0:
+			*alg = COMPR_ALG_NONE;
+			break;
+		default:
+			fatal("invalid compression code: %d", compression);
+			*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
 	}
 
 	/* The level is just the passed-in value. */
@@ -141,10 +172,23 @@ AllocateCompressor(int compression, WriteFunc writeF)
 	/*
 	 * Perform compression algorithm specific initialization.
 	 */
+	switch (alg)
+	{
 #ifdef HAVE_LIBZ
-	if (alg == COMPR_ALG_LIBZ)
+	case COMPR_ALG_LIBZ:
 		InitCompressorZlib(cs, level);
+		break;
+#endif
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		InitCompressorZstd(cs, level);
+		break;
 #endif
+	case COMPR_ALG_NONE:
+		/* Do nothing */
+		break;
+	// default:
+	}
 
 	return cs;
 }
@@ -162,12 +206,20 @@ ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
 
 	if (alg == COMPR_ALG_NONE)
 		ReadDataFromArchiveNone(AH, readF);
-	if (alg == COMPR_ALG_LIBZ)
+	else if (alg == COMPR_ALG_LIBZ)
 	{
 #ifdef HAVE_LIBZ
 		ReadDataFromArchiveZlib(AH, readF);
 #else
 		fatal("not built with zlib support");
+#endif
+	}
+	else if (alg == COMPR_ALG_ZSTD)
+	{
+#ifdef HAVE_LIBZSTD
+		ReadDataFromArchiveZstd(AH, readF);
+#else
+		fatal("not built with zstd support");
 #endif
 	}
 }
@@ -188,6 +240,15 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 			fatal("not built with zlib support");
 #endif
 			break;
+
+		case COMPR_ALG_ZSTD:
+#ifdef HAVE_LIBZSTD
+			WriteDataToArchiveZstd(AH, cs, data, dLen);
+#else
+			fatal("not built with zstd support");
+#endif
+			break;
+
 		case COMPR_ALG_NONE:
 			WriteDataToArchiveNone(AH, cs, data, dLen);
 			break;
@@ -204,12 +265,172 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs)
 	if (cs->comprAlg == COMPR_ALG_LIBZ)
 		EndCompressorZlib(AH, cs);
 #endif
+#ifdef HAVE_LIBZSTD
+	if (cs->comprAlg == COMPR_ALG_ZSTD)
+		EndCompressorZstd(AH, cs);
+#endif
+
 	free(cs);
 }
 
 /* Private routines, specific to each compression method. */
+// XXX: put in separate files ?
+
+#ifdef HAVE_LIBZSTD
+static void
+InitCompressorZstd(CompressorState *cs, int level)
+{
+	cs->u.zstd.cstream = ZSTD_createCStream();
+	if (cs->u.zstd.cstream == NULL)
+		fatal("could not initialize compression library");
+
+	/* XXX: initialize safely like the corresponding zlib "paranoia" */
+	cs->u.zstd.output.size = ZSTD_CStreamOutSize();
+	cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size);
+	cs->u.zstd.output.pos = 0;
+}
+
+static void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	for (;;)
+	{
+		size_t res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				&cs->u.zstd.input, ZSTD_e_end);
+
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+
+		if (res == 0)
+			break;
+
+		if (ZSTD_isError(res))
+			fatal("could not close compression stream: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	// XXX: retval
+	ZSTD_freeCStream(cs->u.zstd.cstream);
+}
+
+static void
+DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_inBuffer	*input = &cs->u.zstd.input;
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	while (input->pos != input->size)
+	{
+		size_t		res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				input, ZSTD_e_continue);
+
+		if (output->pos == output->size ||
+				input->pos != input->size)
+		{
+			/*
+			 * Extra paranoia: avoid zero-length chunks, since a zero length
+			 * chunk is the EOF marker in the custom format. This should never
+			 * happen but...
+			 */
+			if (output->pos > 0)
+				cs->writeF(AH, output->dst, output->pos);
+
+			output->pos = 0;
+		}
+
+		if (ZSTD_isError(res))
+			fatal("could not compress data: %s", ZSTD_getErrorName(res));
+	}
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen)
+{
+	cs->u.zstd.input.src = (void *) unconstify(char *, data);
+	cs->u.zstd.input.size = dLen;
+	cs->u.zstd.input.pos = 0;
+	DeflateCompressorZstd(AH, cs);
+}
+
+/* Read data from a compressed zstd archive */
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF)
+{
+	ZSTD_DStream	*dstream;
+	ZSTD_outBuffer	output;
+	ZSTD_inBuffer	input;
+	size_t			res;
+	size_t			input_size;
+
+	dstream = ZSTD_createDStream();
+	if (dstream == NULL)
+		fatal("could not initialize compression library");
+
+	input_size = ZSTD_DStreamInSize();
+	input.src = pg_malloc(input_size);
+
+	output.size = ZSTD_DStreamOutSize();
+	output.dst = pg_malloc(output.size);
+
+	/* read compressed data */
+	for (;;)
+	{
+		size_t			cnt;
+
+		input.size = input_size; // XXX: the buffer can grow, we shouldn't keep resetting it to the original value..
+		cnt = readF(AH, (char **)unconstify(void **, &input.src), &input.size);
+		input.pos = 0;
+		input.size = cnt;
+
+		if (cnt == 0)
+			break;
+
+		while (input.pos < input.size)
+		{
+			/* decompress */
+			output.pos = 0;
+			res = ZSTD_decompressStream(dstream, &output, &input);
+
+			if (ZSTD_isError(res))
+				fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* write to output handle */
+			((char *)output.dst)[output.pos] = '\0';
+			ahwrite(output.dst, 1, output.pos, AH);
+		}
+	}
+
+	/* write any remainder to output handle */
+	/* XXX: is it needed? */
+	/* If `input.pos < input.size`, some input has not been consumed. */
+	/* But if `output.pos == output.size`, there might be some data left within internal buffers., */
+#if 0
+	while (input.pos < input.size || output.pos == output.size)
+	{
+		output.pos = 0;
+		res = ZSTD_decompressStream(dstream, &output, &input);
+		if (ZSTD_isError(res))
+			fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+		((char *)output.dst)[output.pos] = '\0';
+		ahwrite(output.dst, 1, output.pos, AH);
+	}
+#endif
+
+	pg_free(unconstify(void *, input.src));
+	pg_free(output.dst);
+}
+
+#endif		/* HAVE_LIBZSTD */
 
 #ifdef HAVE_LIBZ
+
 /*
  * Functions for zlib compressed output.
  */
@@ -422,6 +643,19 @@ struct cfp
 #ifdef HAVE_LIBZ
 	gzFile		compressedfp;
 #endif
+
+#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg?
+	/* This is a normal file to which we read/write compressed data */
+	struct {
+		FILE			*fp;
+		// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+		ZSTD_CStream	*cstream;
+		ZSTD_DStream	*dstream;
+		ZSTD_outBuffer	output;
+		ZSTD_inBuffer	input;
+	} zstd;
+#endif
+
 };
 
 #ifdef HAVE_LIBZ
@@ -449,24 +683,25 @@ free_keep_errno(void *p)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_read(const char *path, const char *mode)
+cfopen_read(const char *path, const char *mode, int compression)
 {
 	cfp		   *fp;
 
 #ifdef HAVE_LIBZ
-	if (hasSuffix(path, ".gz"))
-		fp = cfopen(path, mode, 1);
+	if (hasSuffix(path, ".gz") || hasSuffix(path, ".zst"))
+		fp = cfopen(path, mode, compression);
 	else
 #endif
 	{
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, compression);
 #ifdef HAVE_LIBZ
 		if (fp == NULL)
 		{
 			char	   *fname;
+			char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
 
-			fname = psprintf("%s.gz", path);
-			fp = cfopen(fname, mode, 1);
+			fname = psprintf("%s.%s", path, suffix);
+			fp = cfopen(fname, mode, compression);
 			free_keep_errno(fname);
 		}
 #endif
@@ -491,13 +726,14 @@ cfopen_write(const char *path, const char *mode, int compression)
 	cfp		   *fp;
 
 	if (compression == 0)
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, compression);
 	else
 	{
-#ifdef HAVE_LIBZ
+#ifdef HAVE_LIBZ // XXX
 		char	   *fname;
+		char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
 
-		fname = psprintf("%s.gz", path);
+		fname = psprintf("%s.%s", path, suffix);
 		fp = cfopen(fname, mode, compression);
 		free_keep_errno(fname);
 #else
@@ -505,11 +741,12 @@ cfopen_write(const char *path, const char *mode, int compression)
 		fp = NULL;				/* keep compiler quiet */
 #endif
 	}
+
 	return fp;
 }
 
 /*
- * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
+ * Opens file 'path' in 'mode'. If 'alg' is COMPR_ALG_ZLIB, the file
  * is opened with libz gzopen(), otherwise with plain fopen().
  *
  * On failure, return NULL with an error code in errno.
@@ -519,9 +756,19 @@ cfopen(const char *path, const char *mode, int compression)
 {
 	cfp		   *fp = pg_malloc(sizeof(cfp));
 
-	if (compression != 0)
+	fp->uncompressedfp = NULL;
+#ifdef HAVE_LIBZ
+	fp->compressedfp = NULL;
+#endif
+#ifdef HAVE_LIBZSTD
+	fp->zstd.fp = NULL;
+#endif
+
+	switch (compression)
 	{
 #ifdef HAVE_LIBZ
+	case 1 ... 9: // XXX: nonportable
+	case Z_DEFAULT_COMPRESSION:
 		if (compression != Z_DEFAULT_COMPRESSION)
 		{
 			/* user has specified a compression level, so tell zlib to use it */
@@ -537,30 +784,57 @@ cfopen(const char *path, const char *mode, int compression)
 			fp->compressedfp = gzopen(path, mode);
 		}
 
-		fp->uncompressedfp = NULL;
 		if (fp->compressedfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
-#else
-		fatal("not built with zlib support");
+
+		return fp;
+		break;
 #endif
-	}
-	else
-	{
-#ifdef HAVE_LIBZ
-		fp->compressedfp = NULL;
+
+#ifdef HAVE_LIBZSTD
+	case ZSTD_COMPRESSION:
+		fp->zstd.fp = fopen(path, mode);
+		// XXX: save the compression params
+		if (fp->zstd.fp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		else if (strchr(mode, 'w'))
+		{
+			fp->zstd.dstream = NULL;
+			fp->zstd.output.size = ZSTD_CStreamOutSize(); // XXX
+			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
+			fp->zstd.cstream = ZSTD_createCStream();
+			if (fp->zstd.cstream == NULL)
+				fatal("could not initialize compression library");
+		}
+		else if (strchr(mode, 'r'))
+		{
+			fp->zstd.cstream = NULL;
+			fp->zstd.input.size = ZSTD_DStreamOutSize(); // XXX
+			fp->zstd.input.src = pg_malloc0(fp->zstd.input.size);
+			fp->zstd.dstream = ZSTD_createDStream();
+			if (fp->zstd.dstream == NULL)
+				fatal("could not initialize compression library");
+		} // XXX else: bad mode
+		return fp;
+		break;
 #endif
+
+	default:
 		fp->uncompressedfp = fopen(path, mode);
 		if (fp->uncompressedfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
-	}
 
-	return fp;
+		return fp;
+	}
 }
 
 
@@ -587,6 +861,44 @@ cfread(void *ptr, int size, cfp *fp)
 	}
 	else
 #endif
+
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t			input_size = ZSTD_DStreamInSize();
+		size_t			res, cnt;
+
+		output->size = size;
+		output->dst = ptr;
+		output->pos = 0;
+
+		/* read compressed data */
+		while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp)))
+		{
+			input->size = cnt;
+			input->pos = 0;
+
+			for ( ; input->pos < input->size; )
+			{
+				/* decompress */
+				res = ZSTD_decompressStream(fp->zstd.dstream, output, input);
+				if (res == 0 || output->pos == output->size)
+					break;
+				if (ZSTD_isError(res))
+					fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+			}
+
+			if (output->pos == output->size)
+				break; /* We read all the data that fits */
+		}
+
+		ret = output->pos;
+	}
+	else
+#endif
+
 	{
 		ret = fread(ptr, 1, size, fp->uncompressedfp);
 		if (ret != size && !feof(fp->uncompressedfp))
@@ -603,6 +915,35 @@ cfwrite(const void *ptr, int size, cfp *fp)
 		return gzwrite(fp->compressedfp, ptr, size);
 	else
 #endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		size_t      res, cnt;
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+
+		input->src = ptr;
+		input->size = size;
+		input->pos = 0;
+
+		/* Consume all input, and flush later */
+		while (input->pos != input->size)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue);
+			if (ZSTD_isError(res))
+				fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+			cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+			if (cnt != output->pos)
+				fatal("could not write data: %s", strerror(errno));
+		}
+
+		return size;
+	}
+	else
+#endif
+
 		return fwrite(ptr, 1, size, fp->uncompressedfp);
 }
 
@@ -625,6 +966,20 @@ cfgetc(cfp *fp)
 	}
 	else
 #endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		if (cfread(&ret, 1, fp) != 1)
+		{
+			if (feof(fp->zstd.fp))
+				fatal("could not read from input file: end of file");
+			else
+				fatal("could not read from input file: %s", strerror(errno));
+		}
+fprintf(stderr, "cfgetc %d\n", ret);
+	}
+#endif
+
 	{
 		ret = fgetc(fp->uncompressedfp);
 		if (ret == EOF)
@@ -641,6 +996,18 @@ cfgets(cfp *fp, char *buf, int len)
 	if (fp->compressedfp)
 		return gzgets(fp->compressedfp, buf, len);
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		int res;
+		res = cfread(buf, len, fp);
+		buf[res] = 0;
+		if (strchr(buf, '\n'))
+			*strchr(buf, '\n') = '\0';
+		return buf;
+	}
+	else
 #endif
 		return fgets(buf, len, fp->uncompressedfp);
 }
@@ -662,6 +1029,43 @@ cfclose(cfp *fp)
 		fp->compressedfp = NULL;
 	}
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t res, cnt;
+
+		if (fp->zstd.cstream)
+		{
+			for (;;)
+			{
+				output->pos = 0;
+				res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end);
+				if (ZSTD_isError(res))
+					fatal("could not compress data: %s", ZSTD_getErrorName(res));
+				cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+				if (cnt != output->pos)
+					fatal("could not write data: %s", strerror(errno));
+				if (res == 0)
+					break;
+			}
+
+			ZSTD_freeCStream(fp->zstd.cstream);
+			pg_free(fp->zstd.output.dst);
+		}
+
+		if (fp->zstd.dstream)
+		{
+			ZSTD_freeDStream(fp->zstd.dstream);
+			pg_free(unconstify(void *, fp->zstd.input.src));
+		}
+
+		result = fclose(fp->zstd.fp);
+		fp->zstd.fp = NULL;
+	}
+	else
 #endif
 	{
 		result = fclose(fp->uncompressedfp);
@@ -679,6 +1083,11 @@ cfeof(cfp *fp)
 	if (fp->compressedfp)
 		return gzeof(fp->compressedfp);
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+		return feof(fp->zstd.fp);
+	else
 #endif
 		return feof(fp->uncompressedfp);
 }
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index d2e6e1b854..f0ce06f176 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -24,7 +24,8 @@
 typedef enum
 {
 	COMPR_ALG_NONE,
-	COMPR_ALG_LIBZ
+	COMPR_ALG_LIBZ,
+	COMPR_ALG_ZSTD,
 } CompressionAlgorithm;
 
 /* Prototype for callback function to WriteDataToArchive() */
@@ -57,7 +58,7 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
 typedef struct cfp cfp;
 
 extern cfp *cfopen(const char *path, const char *mode, int compression);
-extern cfp *cfopen_read(const char *path, const char *mode);
+extern cfp *cfopen_read(const char *path, const char *mode, int compression);
 extern cfp *cfopen_write(const char *path, const char *mode, int compression);
 extern int	cfread(void *ptr, int size, cfp *fp);
 extern int	cfwrite(const void *ptr, int size, cfp *fp);
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 6a5a22637b..1a229ebedb 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -60,6 +60,11 @@ typedef struct _z_stream
 typedef z_stream *z_streamp;
 #endif
 
+#ifdef HAVE_LIBZSTD
+#include <zstd.h>
+#define ZSTD_COMPRESSION	-2
+#endif	/* HAVE_LIBZSTD */
+
 /* Data block types */
 #define BLK_DATA 1
 #define BLK_BLOBS 3
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 650b542fce..6e55cb425e 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = cfopen_read(fname, PG_BINARY_R);
+		tocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
 		if (tocFH == NULL)
 			fatal("could not open input file \"%s\": %m", fname);
 
@@ -388,7 +388,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	cfp = cfopen_read(filename, PG_BINARY_R);
+	cfp = cfopen_read(filename, PG_BINARY_R, AH->compression);
 
 	if (!cfp)
 		fatal("could not open input file \"%s\": %m", filename);
@@ -440,7 +440,7 @@ _LoadBlobs(ArchiveHandle *AH)
 
 	setFilePath(AH, fname, "blobs.toc");
 
-	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
+	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
 
 	if (ctx->blobsTocFH == NULL)
 		fatal("could not open large object TOC file \"%s\" for input: %m",
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8b1e5cc2b5..f21eb021c7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -533,7 +533,8 @@ main(int argc, char **argv)
 
 			case 'Z':			/* Compression Level */
 				compressLevel = atoi(optarg);
-				if (compressLevel < 0 || compressLevel > 9)
+				if ((compressLevel < 0 || compressLevel > 9) &&
+					compressLevel != -2)
 				{
 					pg_log_error("compression level must be in range 0..9");
 					exit_nicely(1);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838e53..da35415c72 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -346,6 +346,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
-- 
2.17.0

>From 4b87bdba077da1e1609a1d7dbb40772d432c28dd Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 12 Dec 2020 00:40:39 -0600
Subject: [PATCH 4/7] cmdline parser for compression alg/level/opts

---
 src/bin/pg_dump/compress_io.c         | 235 ++++++++++++--------------
 src/bin/pg_dump/compress_io.h         |  20 +--
 src/bin/pg_dump/pg_backup.h           |  19 ++-
 src/bin/pg_dump/pg_backup_archiver.c  |  45 ++---
 src/bin/pg_dump/pg_backup_archiver.h  |  11 +-
 src/bin/pg_dump/pg_backup_custom.c    |   6 +-
 src/bin/pg_dump/pg_backup_directory.c |  19 ++-
 src/bin/pg_dump/pg_backup_tar.c       |  33 ++--
 src/bin/pg_dump/pg_dump.c             | 157 ++++++++++++++---
 9 files changed, 324 insertions(+), 221 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index b51ba680a2..421b8e04cb 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -86,12 +86,9 @@ struct CompressorState
 
 };
 
-static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
-								   int *level);
-
 /* Routines that support zlib compressed data I/O */
 #ifdef HAVE_LIBZ
-static void InitCompressorZlib(CompressorState *cs, int level);
+static void InitCompressorZlib(CompressorState *cs, Compress *compress);
 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
 								  bool flush);
 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
@@ -101,9 +98,9 @@ static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
 #endif
 
 #ifdef HAVE_LIBZSTD
-static void InitCompressorZstd(CompressorState *cs, int level);
+static ZSTD_CStream *ZstdCStreamParams(Compress *compress);
+static void InitCompressorZstd(CompressorState *cs, Compress *compress);
 static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
-static void DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
 static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
 					   const char *data, size_t dLen);
 static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF);
@@ -114,80 +111,40 @@ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
 static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
 								   const char *data, size_t dLen);
 
-/*
- * Interprets a numeric 'compression' value. The algorithm implied by the
- * value (zlib or none at the moment), is returned in *alg, and the
- * zlib compression level in *level.
- */
-static void
-ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
-{
-	switch (compression)
-	{
-#ifdef HAVE_LIBZSTD
-		case ZSTD_COMPRESSION:
-			*alg = COMPR_ALG_ZSTD;
-			break;
-#endif
-#ifdef HAVE_ZLIB
-		case Z_DEFAULT_COMPRESSION:
-		case 1..9:
-			*alg = COMPR_ALG_LIBZ;
-			break;
-#endif
-		case 0:
-			*alg = COMPR_ALG_NONE;
-			break;
-		default:
-			fatal("invalid compression code: %d", compression);
-			*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
-	}
-
-	/* The level is just the passed-in value. */
-	if (level)
-		*level = compression;
-}
-
 /* Public interface routines */
 
 /* Allocate a new compressor */
 CompressorState *
-AllocateCompressor(int compression, WriteFunc writeF)
+AllocateCompressor(Compress *compression, WriteFunc writeF)
 {
 	CompressorState *cs;
-	CompressionAlgorithm alg;
-	int			level;
-
-	ParseCompressionOption(compression, &alg, &level);
-
-#ifndef HAVE_LIBZ
-	if (alg == COMPR_ALG_LIBZ)
-		fatal("not built with zlib support");
-#endif
 
 	cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
 	cs->writeF = writeF;
-	cs->comprAlg = alg;
+	cs->comprAlg = compression->alg;
 
 	/*
 	 * Perform compression algorithm specific initialization.
 	 */
-	switch (alg)
+	Assert (compression->alg != COMPR_ALG_DEFAULT);
+	switch (compression->alg)
 	{
 #ifdef HAVE_LIBZ
 	case COMPR_ALG_LIBZ:
-		InitCompressorZlib(cs, level);
+		InitCompressorZlib(cs, compression);
 		break;
 #endif
 #ifdef HAVE_LIBZSTD
 	case COMPR_ALG_ZSTD:
-		InitCompressorZstd(cs, level);
+		InitCompressorZstd(cs, compression);
 		break;
 #endif
 	case COMPR_ALG_NONE:
 		/* Do nothing */
 		break;
-	// default:
+	default:
+		/* Should not happen */
+		fatal("requested compression not available in this installation");
 	}
 
 	return cs;
@@ -198,12 +155,9 @@ AllocateCompressor(int compression, WriteFunc writeF)
  * out with ahwrite().
  */
 void
-ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
+ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF)
 {
-	CompressionAlgorithm alg;
-
-	ParseCompressionOption(compression, &alg, NULL);
-
+	CompressionAlgorithm alg = AH->compression.alg;
 	if (alg == COMPR_ALG_NONE)
 		ReadDataFromArchiveNone(AH, readF);
 	else if (alg == COMPR_ALG_LIBZ)
@@ -233,25 +187,25 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 {
 	switch (cs->comprAlg)
 	{
-		case COMPR_ALG_LIBZ:
 #ifdef HAVE_LIBZ
+		case COMPR_ALG_LIBZ:
 			WriteDataToArchiveZlib(AH, cs, data, dLen);
-#else
-			fatal("not built with zlib support");
-#endif
 			break;
+#endif
 
-		case COMPR_ALG_ZSTD:
 #ifdef HAVE_LIBZSTD
+		case COMPR_ALG_ZSTD:
 			WriteDataToArchiveZstd(AH, cs, data, dLen);
-#else
-			fatal("not built with zstd support");
-#endif
 			break;
+#endif
 
 		case COMPR_ALG_NONE:
 			WriteDataToArchiveNone(AH, cs, data, dLen);
 			break;
+
+		default:
+			/* Should not happen */
+			fatal("requested compression not available in this installation");
 	}
 }
 
@@ -277,13 +231,42 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs)
 // XXX: put in separate files ?
 
 #ifdef HAVE_LIBZSTD
-static void
-InitCompressorZstd(CompressorState *cs, int level)
+static ZSTD_CStream*
+ZstdCStreamParams(Compress *compress)
 {
-	cs->u.zstd.cstream = ZSTD_createCStream();
-	if (cs->u.zstd.cstream == NULL)
+	size_t res;
+	ZSTD_CStream *cstream;
+
+	cstream = ZSTD_createCStream();
+	if (cstream == NULL)
 		fatal("could not initialize compression library");
 
+	if (compress->level != 0) // XXX: ZSTD_CLEVEL_DEFAULT
+	{
+		res = ZSTD_CCtx_setParameter(cstream,
+				ZSTD_c_compressionLevel, compress->level);
+		if (ZSTD_isError(res))
+			fatal("could not set compression level: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	if (compress->zstdlong)
+	{
+		res = ZSTD_CCtx_setParameter(cstream,
+				ZSTD_c_enableLongDistanceMatching, 1); // XXX: allow to disable it
+		if (ZSTD_isError(res))
+			fatal("could not set compression parameter: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	return cstream;
+}
+
+static void
+InitCompressorZstd(CompressorState *cs, Compress *compress)
+{
+	cs->u.zstd.cstream = ZstdCStreamParams(compress);
+
 	/* XXX: initialize safely like the corresponding zlib "paranoia" */
 	cs->u.zstd.output.size = ZSTD_CStreamOutSize();
 	cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size);
@@ -295,6 +278,8 @@ EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
 {
 	ZSTD_outBuffer	*output = &cs->u.zstd.output;
 
+
+
 	for (;;)
 	{
 		size_t res;
@@ -318,11 +303,16 @@ EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
 }
 
 static void
-DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen)
 {
 	ZSTD_inBuffer	*input = &cs->u.zstd.input;
 	ZSTD_outBuffer	*output = &cs->u.zstd.output;
 
+	input->src = (void *) unconstify(char *, data);
+	input->size = dLen;
+	input->pos = 0;
+
 	while (input->pos != input->size)
 	{
 		size_t		res;
@@ -349,16 +339,6 @@ DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
 	}
 }
 
-static void
-WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
-					   const char *data, size_t dLen)
-{
-	cs->u.zstd.input.src = (void *) unconstify(char *, data);
-	cs->u.zstd.input.size = dLen;
-	cs->u.zstd.input.pos = 0;
-	DeflateCompressorZstd(AH, cs);
-}
-
 /* Read data from a compressed zstd archive */
 static void
 ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF)
@@ -436,7 +416,7 @@ ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF)
  */
 
 static void
-InitCompressorZlib(CompressorState *cs, int level)
+InitCompressorZlib(CompressorState *cs, Compress *compress)
 {
 	z_streamp	zp;
 
@@ -453,7 +433,7 @@ InitCompressorZlib(CompressorState *cs, int level)
 	cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
 	cs->zlibOutSize = ZLIB_OUT_SIZE;
 
-	if (deflateInit(zp, level) != Z_OK)
+	if (deflateInit(zp, compress->level) != Z_OK)
 		fatal("could not initialize compression library: %s",
 			  zp->msg);
 
@@ -676,35 +656,31 @@ free_keep_errno(void *p)
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
- * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
- * doesn't already have it) and try again. So if you pass "foo" as 'path',
+ * If the file at 'path' does not exist, we search with compressed suffix (if 'path'
+ * doesn't already have one) and try again. So if you pass "foo" as 'path',
  * this will open either "foo" or "foo.gz".
  *
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_read(const char *path, const char *mode, int compression)
+cfopen_read(const char *path, const char *mode, Compress *compression)
 {
 	cfp		   *fp;
 
-#ifdef HAVE_LIBZ
 	if (hasSuffix(path, ".gz") || hasSuffix(path, ".zst"))
 		fp = cfopen(path, mode, compression);
 	else
-#endif
 	{
 		fp = cfopen(path, mode, compression);
-#ifdef HAVE_LIBZ
 		if (fp == NULL)
 		{
 			char	   *fname;
-			char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
+			const char *suffix = compress_suffix(compression);
 
-			fname = psprintf("%s.%s", path, suffix);
+			fname = psprintf("%s%s", path, suffix);
 			fp = cfopen(fname, mode, compression);
 			free_keep_errno(fname);
 		}
-#endif
 	}
 	return fp;
 }
@@ -714,32 +690,26 @@ cfopen_read(const char *path, const char *mode, int compression)
  * be a filemode as accepted by fopen() and gzopen() that indicates writing
  * ("w", "wb", "a", or "ab").
  *
- * If 'compression' is non-zero, a gzip compressed stream is opened, and
- * 'compression' indicates the compression level used. The ".gz" suffix
- * is automatically added to 'path' in that case.
+ * Use compression if specified.
+ * The appropriate suffix is automatically added to 'path' in that case.
  *
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_write(const char *path, const char *mode, int compression)
+cfopen_write(const char *path, const char *mode, Compress *compression)
 {
 	cfp		   *fp;
 
-	if (compression == 0)
+	if (compression->alg == COMPR_ALG_NONE)
 		fp = cfopen(path, mode, compression);
 	else
 	{
-#ifdef HAVE_LIBZ // XXX
 		char	   *fname;
-		char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
+		const char *suffix = compress_suffix(compression);
 
-		fname = psprintf("%s.%s", path, suffix);
+		fname = psprintf("%s%s", path, suffix);
 		fp = cfopen(fname, mode, compression);
 		free_keep_errno(fname);
-#else
-		fatal("not built with zlib support");
-		fp = NULL;				/* keep compiler quiet */
-#endif
 	}
 
 	return fp;
@@ -752,7 +722,7 @@ cfopen_write(const char *path, const char *mode, int compression)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen(const char *path, const char *mode, int compression)
+cfopen(const char *path, const char *mode, Compress *compression)
 {
 	cfp		   *fp = pg_malloc(sizeof(cfp));
 
@@ -764,18 +734,17 @@ cfopen(const char *path, const char *mode, int compression)
 	fp->zstd.fp = NULL;
 #endif
 
-	switch (compression)
+	switch (compression->alg)
 	{
 #ifdef HAVE_LIBZ
-	case 1 ... 9: // XXX: nonportable
-	case Z_DEFAULT_COMPRESSION:
-		if (compression != Z_DEFAULT_COMPRESSION)
+	case COMPR_ALG_LIBZ:
+		if (compression->level != Z_DEFAULT_COMPRESSION)
 		{
 			/* user has specified a compression level, so tell zlib to use it */
 			char		mode_compression[32];
 
 			snprintf(mode_compression, sizeof(mode_compression), "%s%d",
-					 mode, compression);
+					 mode, compression->level);
 			fp->compressedfp = gzopen(path, mode_compression);
 		}
 		else
@@ -795,9 +764,8 @@ cfopen(const char *path, const char *mode, int compression)
 #endif
 
 #ifdef HAVE_LIBZSTD
-	case ZSTD_COMPRESSION:
+	case COMPR_ALG_ZSTD:
 		fp->zstd.fp = fopen(path, mode);
-		// XXX: save the compression params
 		if (fp->zstd.fp == NULL)
 		{
 			free_keep_errno(fp);
@@ -806,16 +774,14 @@ cfopen(const char *path, const char *mode, int compression)
 		else if (strchr(mode, 'w'))
 		{
 			fp->zstd.dstream = NULL;
-			fp->zstd.output.size = ZSTD_CStreamOutSize(); // XXX
+			fp->zstd.output.size = ZSTD_CStreamOutSize();
 			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
-			fp->zstd.cstream = ZSTD_createCStream();
-			if (fp->zstd.cstream == NULL)
-				fatal("could not initialize compression library");
+			fp->zstd.cstream = ZstdCStreamParams(compression);
 		}
 		else if (strchr(mode, 'r'))
 		{
 			fp->zstd.cstream = NULL;
-			fp->zstd.input.size = ZSTD_DStreamOutSize(); // XXX
+			fp->zstd.input.size = ZSTD_DStreamInSize();
 			fp->zstd.input.src = pg_malloc0(fp->zstd.input.size);
 			fp->zstd.dstream = ZSTD_createDStream();
 			if (fp->zstd.dstream == NULL)
@@ -825,15 +791,19 @@ cfopen(const char *path, const char *mode, int compression)
 		break;
 #endif
 
-	default:
+	case COMPR_ALG_NONE:
 		fp->uncompressedfp = fopen(path, mode);
 		if (fp->uncompressedfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
-
 		return fp;
+		break;
+
+	default:
+		/* should not happen */
+		fatal("requested compression not available in this installation");
 	}
 }
 
@@ -944,7 +914,7 @@ cfwrite(const void *ptr, int size, cfp *fp)
 	else
 #endif
 
-		return fwrite(ptr, 1, size, fp->uncompressedfp);
+	return fwrite(ptr, 1, size, fp->uncompressedfp);
 }
 
 int
@@ -1005,7 +975,7 @@ cfgets(cfp *fp, char *buf, int len)
 		buf[res] = 0;
 		if (strchr(buf, '\n'))
 			*strchr(buf, '\n') = '\0';
-		return buf;
+		return res > 0 ? buf : 0;
 	}
 	else
 #endif
@@ -1122,5 +1092,22 @@ hasSuffix(const char *filename, const char *suffix)
 				  suffix,
 				  suffixlen) == 0;
 }
-
 #endif
+
+/*
+ * Return a string for the given AH's compression.
+ * The string is statically allocated.
+ */
+const char *
+compress_suffix(Compress *compression)
+{
+	switch (compression->alg)
+	{
+		case COMPR_ALG_LIBZ:
+			return ".gz";
+		case COMPR_ALG_ZSTD:
+			return ".zst";
+		default:
+			return "";
+	}
+}
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index f0ce06f176..2c073676eb 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -21,13 +21,6 @@
 #define ZLIB_OUT_SIZE	4096
 #define ZLIB_IN_SIZE	4096
 
-typedef enum
-{
-	COMPR_ALG_NONE,
-	COMPR_ALG_LIBZ,
-	COMPR_ALG_ZSTD,
-} CompressionAlgorithm;
-
 /* Prototype for callback function to WriteDataToArchive() */
 typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
 
@@ -47,8 +40,8 @@ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen);
 /* struct definition appears in compress_io.c */
 typedef struct CompressorState CompressorState;
 
-extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF);
-extern void ReadDataFromArchive(ArchiveHandle *AH, int compression,
+extern CompressorState *AllocateCompressor(Compress *compression, WriteFunc writeF);
+extern void ReadDataFromArchive(ArchiveHandle *AH,
 								ReadFunc readF);
 extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 							   const void *data, size_t dLen);
@@ -57,9 +50,9 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
 
 typedef struct cfp cfp;
 
-extern cfp *cfopen(const char *path, const char *mode, int compression);
-extern cfp *cfopen_read(const char *path, const char *mode, int compression);
-extern cfp *cfopen_write(const char *path, const char *mode, int compression);
+extern cfp *cfopen(const char *path, const char *mode, Compress *compression);
+extern cfp *cfopen_read(const char *path, const char *mode, Compress *compression);
+extern cfp *cfopen_write(const char *path, const char *mode, Compress *compression);
 extern int	cfread(void *ptr, int size, cfp *fp);
 extern int	cfwrite(const void *ptr, int size, cfp *fp);
 extern int	cfgetc(cfp *fp);
@@ -68,4 +61,7 @@ extern int	cfclose(cfp *fp);
 extern int	cfeof(cfp *fp);
 extern const char *get_cfp_error(cfp *fp);
 
+/* also used by tar */
+extern const char * compress_suffix(Compress *compression);
+
 #endif
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9d0056a569..1c1954dd1a 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -72,6 +72,21 @@ typedef struct _connParams
 	char	   *override_dbname;
 } ConnParams;
 
+typedef enum
+{
+		COMPR_ALG_DEFAULT = -1,
+		COMPR_ALG_NONE,
+		COMPR_ALG_LIBZ,
+		COMPR_ALG_ZSTD,
+} CompressionAlgorithm;
+
+typedef struct Compress {
+	CompressionAlgorithm	alg;
+	int			level; // XXX: default
+	bool		zstdlong;
+} Compress;
+
+
 typedef struct _restoreOptions
 {
 	int			createDB;		/* Issue commands to create the database */
@@ -125,7 +140,7 @@ typedef struct _restoreOptions
 
 	int			noDataForFailedTables;
 	int			exit_on_error;
-	int			compression;
+	Compress	compression;
 	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
 										 * to stderr */
 	bool		single_txn;
@@ -281,7 +296,7 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
 
 /* Create a new archive */
 extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
-							  const int compression, bool dosync, ArchiveMode mode,
+							  Compress *compression, bool dosync, ArchiveMode mode,
 							  SetupWorkerPtrType setupDumpWorker);
 
 /* The --list option */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 1f82c6499b..2f1d59ce7a 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -70,7 +70,7 @@ typedef struct _parallelReadyList
 
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
-							   const int compression, bool dosync, ArchiveMode mode,
+							   Compress *compression, bool dosync, ArchiveMode mode,
 							   SetupWorkerPtrType setupWorkerPtr);
 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te);
 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
@@ -98,7 +98,7 @@ static int	_discoverArchiveFormat(ArchiveHandle *AH);
 static int	RestoringToDB(ArchiveHandle *AH);
 static void dump_lo_buf(ArchiveHandle *AH);
 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
-static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
+static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compress);
 static OutputContext SaveOutput(ArchiveHandle *AH);
 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
 
@@ -238,7 +238,7 @@ setupRestoreWorker(Archive *AHX)
 /* Public */
 Archive *
 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
-			  const int compression, bool dosync, ArchiveMode mode,
+			  Compress *compression, bool dosync, ArchiveMode mode,
 			  SetupWorkerPtrType setupDumpWorker)
 
 {
@@ -253,7 +253,9 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 Archive *
 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
 {
-	ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
+	Compress compress = {0};
+	ArchiveHandle *AH = _allocAH(FileSpec, fmt, &compress, true, archModeRead,
+			setupRestoreWorker);
 
 	return (Archive *) AH;
 }
@@ -382,7 +384,7 @@ RestoreArchive(Archive *AHX)
 	 * Make sure we won't need (de)compression we haven't got
 	 */
 #ifndef HAVE_LIBZ
-	if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+	if (AH->compression.alg != COMPR_ALG_NONE && AH->PrintTocDataPtr != NULL)
 	{
 		for (te = AH->toc->next; te != AH->toc; te = te->next)
 		{
@@ -457,8 +459,8 @@ RestoreArchive(Archive *AHX)
 	 * Setup the output file if necessary.
 	 */
 	sav = SaveOutput(AH);
-	if (ropt->filename || ropt->compression)
-		SetOutput(AH, ropt->filename, ropt->compression);
+	if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE)
+		SetOutput(AH, ropt->filename, &ropt->compression);
 
 	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
 
@@ -738,7 +740,7 @@ RestoreArchive(Archive *AHX)
 	 */
 	AH->stage = STAGE_FINALIZING;
 
-	if (ropt->filename || ropt->compression)
+	if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE)
 		RestoreOutput(AH, sav);
 
 	if (ropt->useDB)
@@ -1121,10 +1123,11 @@ PrintTOCSummary(Archive *AHX)
 	OutputContext sav;
 	const char *fmtName;
 	char		stamp_str[64];
+	Compress	nocompression = {0};
 
 	sav = SaveOutput(AH);
 	if (ropt->filename)
-		SetOutput(AH, ropt->filename, 0 /* no compression */ );
+		SetOutput(AH, ropt->filename, &nocompression);
 
 	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
 				 localtime(&AH->createDate)) == 0)
@@ -1133,7 +1136,7 @@ PrintTOCSummary(Archive *AHX)
 	ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
 	ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
 			 sanitize_line(AH->archdbname, false),
-			 AH->tocCount, AH->compression);
+			 AH->tocCount, AH->compression.alg);
 
 	switch (AH->format)
 	{
@@ -1487,7 +1490,7 @@ archprintf(Archive *AH, const char *fmt,...)
  *******************************/
 
 static void
-SetOutput(ArchiveHandle *AH, const char *filename, int compression)
+SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression)
 {
 	int			fn;
 
@@ -1510,12 +1513,12 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression)
 
 	/* If compression explicitly requested, use gzopen */
 #ifdef HAVE_LIBZ
-	if (compression != 0)
+	if (compression->alg != COMPR_ALG_NONE)
 	{
 		char		fmode[14];
 
 		/* Don't use PG_BINARY_x since this is zlib */
-		sprintf(fmode, "wb%d", compression);
+		sprintf(fmode, "wb%d", compression->level);
 		if (fn >= 0)
 			AH->OF = gzdopen(dup(fn), fmode);
 		else
@@ -2259,7 +2262,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
  */
 static ArchiveHandle *
 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
-		 const int compression, bool dosync, ArchiveMode mode,
+		 Compress *compression, bool dosync, ArchiveMode mode,
 		 SetupWorkerPtrType setupWorkerPtr)
 {
 	ArchiveHandle *AH;
@@ -2310,7 +2313,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	AH->toc->prev = AH->toc;
 
 	AH->mode = mode;
-	AH->compression = compression;
+	AH->compression = *compression;
 	AH->dosync = dosync;
 
 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
@@ -2325,7 +2328,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	 * Force stdin/stdout into binary mode if that is what we are using.
 	 */
 #ifdef WIN32
-	if ((fmt != archNull || compression != 0) &&
+	if ((fmt != archNull || compression->alg != COMPR_ALG_NONE) &&
 		(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
 	{
 		if (mode == archModeWrite)
@@ -3741,7 +3744,7 @@ WriteHead(ArchiveHandle *AH)
 	AH->WriteBytePtr(AH, AH->intSize);
 	AH->WriteBytePtr(AH, AH->offSize);
 	AH->WriteBytePtr(AH, AH->format);
-	WriteInt(AH, AH->compression);
+	WriteInt(AH, AH->compression.alg);
 	crtm = *localtime(&AH->createDate);
 	WriteInt(AH, crtm.tm_sec);
 	WriteInt(AH, crtm.tm_min);
@@ -3816,15 +3819,15 @@ ReadHead(ArchiveHandle *AH)
 	if (AH->version >= K_VERS_1_2)
 	{
 		if (AH->version < K_VERS_1_4)
-			AH->compression = AH->ReadBytePtr(AH);
+			AH->compression.alg = AH->ReadBytePtr(AH);
 		else
-			AH->compression = ReadInt(AH);
+			AH->compression.alg = ReadInt(AH);
 	}
 	else
-		AH->compression = Z_DEFAULT_COMPRESSION;
+		AH->compression.alg = Z_DEFAULT_COMPRESSION;
 
 #ifndef HAVE_LIBZ
-	if (AH->compression != 0)
+	if (AH->compression.alg != COMPR_ALG_NONE)
 		pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
 #endif
 
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 1a229ebedb..641ba2d043 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -47,8 +47,6 @@
 #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
 #define GZREAD(p, s, n, fh) fread(p, s, n, fh)
 #define GZEOF(fh)	feof(fh)
-/* this is just the redefinition of a libz constant */
-#define Z_DEFAULT_COMPRESSION (-1)
 
 typedef struct _z_stream
 {
@@ -62,7 +60,6 @@ typedef z_stream *z_streamp;
 
 #ifdef HAVE_LIBZSTD
 #include <zstd.h>
-#define ZSTD_COMPRESSION	-2
 #endif	/* HAVE_LIBZSTD */
 
 /* Data block types */
@@ -334,13 +331,7 @@ struct _archiveHandle
 	DumpId	   *tableDataId;	/* TABLE DATA ids, indexed by table dumpId */
 
 	struct _tocEntry *currToc;	/* Used when dumping data */
-	int			compression;	/*---------
-								 * Compression requested on open
-								 * Possible values for compression:
-								 * -2	ZSTD_COMPRESSION
-								 * -1	Z_DEFAULT_COMPRESSION
-								 *  0	COMPRESSION_NONE
-								 * 1-9 levels for gzip compression */
+	Compress	compression;	/* Compression requested on open */
 	bool		dosync;			/* data requested to be synced on sight */
 	ArchiveMode mode;			/* File mode - r or w */
 	void	   *formatData;		/* Header data specific to file format */
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 77d402c323..55a887a236 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 	_WriteByte(AH, BLK_DATA);	/* Block type */
 	WriteInt(AH, te->dumpId);	/* For sanity check */
 
-	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+	ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc);
 }
 
 /*
@@ -377,7 +377,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 
 	WriteInt(AH, oid);
 
-	ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+	ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc);
 }
 
 /*
@@ -566,7 +566,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
 static void
 _PrintData(ArchiveHandle *AH)
 {
-	ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
+	ReadDataFromArchive(AH, _CustomReadFunc);
 }
 
 static void
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 6e55cb425e..dfa36d5dc1 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
+		tocFH = cfopen_read(fname, PG_BINARY_R, &AH->compression);
 		if (tocFH == NULL)
 			fatal("could not open input file \"%s\": %m", fname);
 
@@ -327,7 +327,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression);
 	if (ctx->dataFH == NULL)
 		fatal("could not open output file \"%s\": %m", fname);
 }
@@ -388,12 +388,12 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	cfp = cfopen_read(filename, PG_BINARY_R, AH->compression);
+	cfp = cfopen_read(filename, PG_BINARY_R, &AH->compression);
 
 	if (!cfp)
 		fatal("could not open input file \"%s\": %m", filename);
 
-	buf = pg_malloc(ZLIB_OUT_SIZE);
+	buf = pg_malloc(ZLIB_OUT_SIZE); // XXX
 	buflen = ZLIB_OUT_SIZE;
 
 	while ((cnt = cfread(buf, buflen, cfp)))
@@ -435,12 +435,13 @@ _LoadBlobs(ArchiveHandle *AH)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
 	char		line[MAXPGPATH];
+	Compress	nocompression = {0};
 
 	StartRestoreBlobs(AH);
 
 	setFilePath(AH, fname, "blobs.toc");
 
-	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
+	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, &nocompression);
 
 	if (ctx->blobsTocFH == NULL)
 		fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -573,6 +574,7 @@ _CloseArchive(ArchiveHandle *AH)
 	{
 		cfp		   *tocFH;
 		char		fname[MAXPGPATH];
+		Compress	nocompression = {0};
 
 		setFilePath(AH, fname, "toc.dat");
 
@@ -580,7 +582,7 @@ _CloseArchive(ArchiveHandle *AH)
 		ctx->pstate = ParallelBackupStart(AH);
 
 		/* The TOC is always created uncompressed */
-		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
+		tocFH = cfopen_write(fname, PG_BINARY_W, &nocompression);
 		if (tocFH == NULL)
 			fatal("could not open output file \"%s\": %m", fname);
 		ctx->dataFH = tocFH;
@@ -639,11 +641,12 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
+	Compress	nocompression = {0};
 
 	setFilePath(AH, fname, "blobs.toc");
 
 	/* The blob TOC file is never compressed */
-	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
+	ctx->blobsTocFH = cfopen_write(fname, "ab", &nocompression);
 	if (ctx->blobsTocFH == NULL)
 		fatal("could not open output file \"%s\": %m", fname);
 }
@@ -661,7 +664,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 
 	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
 
-	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression);
 
 	if (ctx->dataFH == NULL)
 		fatal("could not open output file \"%s\": %m", fname);
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 54e708875c..5c9d17bae7 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -39,6 +39,7 @@
 #include "pg_backup_archiver.h"
 #include "pg_backup_tar.h"
 #include "pg_backup_utils.h"
+#include "compress_io.h"
 #include "pgtar.h"
 
 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
@@ -196,10 +197,10 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
 
 		/*
 		 * We don't support compression because reading the files back is not
-		 * possible since gzdopen uses buffered IO which totally screws file
+		 * possible since gzdopen uses buffered IO which totally screws file XXX
 		 * positioning.
 		 */
-		if (AH->compression != 0)
+		if (AH->compression.alg != COMPR_ALG_NONE)
 			fatal("compression is not supported by tar archive format");
 	}
 	else
@@ -254,14 +255,8 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
 	ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
 	if (te->dataDumper != NULL)
 	{
-#ifdef HAVE_LIBZ
-		if (AH->compression == 0)
-			sprintf(fn, "%d.dat", te->dumpId);
-		else
-			sprintf(fn, "%d.dat.gz", te->dumpId);
-#else
-		sprintf(fn, "%d.dat", te->dumpId);
-#endif
+		const char *suffix = compress_suffix(&AH->compression);
+		sprintf(fn, "%d.dat%s", te->dumpId, suffix);
 		ctx->filename = pg_strdup(fn);
 	}
 	else
@@ -352,7 +347,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 
 #ifdef HAVE_LIBZ
 
-		if (AH->compression == 0)
+		if (AH->compression.alg == COMPR_ALG_NONE)
 			tm->nFH = ctx->tarFH;
 		else
 			fatal("compression is not supported by tar archive format");
@@ -413,9 +408,9 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 
 #ifdef HAVE_LIBZ
 
-		if (AH->compression != 0)
+		if (AH->compression.alg != COMPR_ALG_NONE)
 		{
-			sprintf(fmode, "wb%d", AH->compression);
+			sprintf(fmode, "wb%d", AH->compression.level);
 			tm->zFH = gzdopen(dup(fileno(tm->tmpFH)), fmode);
 			if (tm->zFH == NULL)
 				fatal("could not open temporary file");
@@ -443,7 +438,7 @@ tarClose(ArchiveHandle *AH, TAR_MEMBER *th)
 	/*
 	 * Close the GZ file since we dup'd. This will flush the buffers.
 	 */
-	if (AH->compression != 0)
+	if (AH->compression.alg != COMPR_ALG_NONE)
 		if (GZCLOSE(th->zFH) != 0)
 			fatal("could not close tar member");
 
@@ -868,7 +863,7 @@ _CloseArchive(ArchiveHandle *AH)
 		memcpy(ropt, AH->public.ropt, sizeof(RestoreOptions));
 		ropt->filename = NULL;
 		ropt->dropSchema = 1;
-		ropt->compression = 0;
+		ropt->compression.alg = COMPR_ALG_NONE;
 		ropt->superuser = NULL;
 		ropt->suppressDumpWarnings = true;
 
@@ -952,16 +947,12 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 	char		fname[255];
-	char	   *sfx;
+	const char *sfx;
 
 	if (oid == 0)
 		fatal("invalid OID for large object (%u)", oid);
 
-	if (AH->compression != 0)
-		sfx = ".gz";
-	else
-		sfx = "";
-
+	sfx = compress_suffix(&AH->compression);
 	sprintf(fname, "blob_%u.dat%s", oid, sfx);
 
 	tarPrintf(ctx->blobToc, "%u %s\n", oid, fname);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f21eb021c7..5f80d05716 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -59,6 +59,7 @@
 #include "getopt_long.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
+#include "compress_io.h"
 #include "pg_backup_db.h"
 #include "pg_backup_utils.h"
 #include "pg_dump.h"
@@ -297,6 +298,111 @@ static void setupDumpWorker(Archive *AHX);
 static TableInfo *getRootTableInfo(TableInfo *tbinfo);
 
 
+/* Parse the string into compression options */
+static void
+parse_compression(const char *optarg, Compress *compress)
+{
+	if (optarg[0] == '0' && optarg[1] == '\0')
+		compress->alg = COMPR_ALG_NONE;
+	else if ((optarg[0] > '0' && optarg[0] <= '9') ||
+		optarg[0] == '-')
+	{
+		compress->alg = COMPR_ALG_LIBZ;
+		compress->level = atoi(optarg);
+		if (optarg[1] != '\0')
+		{
+			pg_log_error("compression level must be in range 0..9");
+			exit_nicely(1);
+		}
+	}
+	else
+	{
+		/* Parse a more flexible string like level=3 alg=zlib opts=long */
+		for (;;)
+		{
+			char *eq = strchr(optarg, '=');
+			int len;
+
+			if (eq == NULL)
+			{
+				pg_log_error("compression options must be key=value: %s", optarg);
+				exit_nicely(1);
+			}
+
+			len = eq - optarg;
+			if (strncmp(optarg, "alg", len) == 0)
+			{
+				if (strchr(eq, ' '))
+					len = strchr(eq, ' ') - eq - 1;
+				else
+					len = strlen(eq) - len;
+				if (strncmp(1+eq, "zlib", len) == 0 ||
+						strncmp(1+eq, "libz", len) == 0)
+					compress->alg = COMPR_ALG_LIBZ;
+				else if (strncmp(1+eq, "zstd", len) == 0)
+					compress->alg = COMPR_ALG_ZSTD;
+				else
+				{
+					pg_log_error("unknown compression algorithm: %s", 1+eq);
+					exit_nicely(1);
+				}
+			}
+			else if (strncmp(optarg, "level", len) == 0)
+				compress->level = atoi(1+eq);
+			else if (strncmp(optarg, "opt", len) == 0)
+			{
+				if (strchr(eq, ' '))
+					len = strchr(eq, ' ') - eq - 1;
+				else
+					len = strlen(eq) - len;
+				if (strncmp(1+eq, "zstdlong", len) == 0)
+					compress->zstdlong = true;
+				else
+				{
+					pg_log_error("unknown compression option: %s", 1+eq);
+					exit_nicely(1);
+				}
+			}
+			else
+			{
+				pg_log_error("unknown compression setting: %s", optarg);
+				exit_nicely(1);
+			}
+
+			optarg = strchr(eq, ' ');
+			if (!optarg++)
+				break;
+		}
+
+		/* zstd will check its own compression level later */
+		if (compress->alg != COMPR_ALG_ZSTD)
+		{
+			if (compress->level < 0 || compress->level > 9)
+			{
+				pg_log_error("compression level must be in range 0..9");
+				exit_nicely(1);
+			}
+			if (compress->zstdlong)
+			{
+				pg_log_error("compression option not supported with this algorithm");
+				exit_nicely(1);
+			}
+		}
+
+		/* XXX: 0 means "unset" */
+		if (compress->level == 0)
+		{
+			const int default_compress_level[] = {
+				0,			/* COMPR_ALG_NONE */
+				Z_DEFAULT_COMPRESSION,	/* COMPR_ALG_ZLIB */
+				ZSTD_CLEVEL_DEFAULT,	/* COMPR_ALG_ZSTD */
+			};
+
+			compress->level = default_compress_level[compress->alg];
+		}
+	}
+}
+
 int
 main(int argc, char **argv)
 {
@@ -319,7 +425,7 @@ main(int argc, char **argv)
 	char	   *use_role = NULL;
 	long		rowsPerInsert;
 	int			numWorkers = 1;
-	int			compressLevel = -1;
+	Compress	compress = { .alg = COMPR_ALG_DEFAULT };
 	int			plainText = 0;
 	ArchiveFormat archiveFormat = archUnknown;
 	ArchiveMode archiveMode;
@@ -532,13 +638,7 @@ main(int argc, char **argv)
 				break;
 
 			case 'Z':			/* Compression Level */
-				compressLevel = atoi(optarg);
-				if ((compressLevel < 0 || compressLevel > 9) &&
-					compressLevel != -2)
-				{
-					pg_log_error("compression level must be in range 0..9");
-					exit_nicely(1);
-				}
+				parse_compression(optarg, &compress);
 				break;
 
 			case 0:
@@ -680,20 +780,40 @@ main(int argc, char **argv)
 		plainText = 1;
 
 	/* Custom and directory formats are compressed by default, others not */
-	if (compressLevel == -1)
+	if (compress.alg == COMPR_ALG_DEFAULT)
 	{
-#ifdef HAVE_LIBZ
 		if (archiveFormat == archCustom || archiveFormat == archDirectory)
-			compressLevel = Z_DEFAULT_COMPRESSION;
-		else
+		{
+#ifdef HAVE_LIBZ
+			compress.alg = COMPR_ALG_LIBZ;
+			compress.level = Z_DEFAULT_COMPRESSION;
+#endif
+#ifdef HAVE_LIBZSTD
+			compress.alg = COMPR_ALG_ZSTD; // Set default for testing purposes
+			compress.level = ZSTD_CLEVEL_DEFAULT;
 #endif
-			compressLevel = 0;
+		}
+		else
+		{
+			compress.alg = COMPR_ALG_NONE;
+			compress.level = 0;
+		}
 	}
 
 #ifndef HAVE_LIBZ
-	if (compressLevel != 0)
+	if (compress.alg == COMPR_ALG_LIBZ)
+	{
 		pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
-	compressLevel = 0;
+		compress.alg = 0;
+	}
+#endif
+
+#ifndef HAVE_LIBZ
+	if (compress.alg == COMPR_ALG_ZSTD)
+	{
+		pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
+		compress.alg = 0;
+	}
 #endif
 
 	/*
@@ -724,7 +844,7 @@ main(int argc, char **argv)
 		fatal("option --index-collation-versions-unknown only works in binary upgrade mode");
 
 	/* Open the output file */
-	fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
+	fout = CreateArchive(filename, archiveFormat, &compress, dosync,
 						 archiveMode, setupDumpWorker);
 
 	/* Make dump options accessible right away */
@@ -958,10 +1078,7 @@ main(int argc, char **argv)
 	ropt->sequence_data = dopt.sequence_data;
 	ropt->binary_upgrade = dopt.binary_upgrade;
 
-	if (compressLevel == -1)
-		ropt->compression = 0;
-	else
-		ropt->compression = compressLevel;
+	ropt->compression = compress;
 
 	ropt->suppressDumpWarnings = true;	/* We've already shown them */
 
-- 
2.17.0

>From bc6bc514d784da4cab7262c8386b0b8ed29ed02f Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Mon, 21 Dec 2020 00:11:43 -0600
Subject: [PATCH 5/7] union{} with a CompressionAlgorithm alg

---
 src/bin/pg_dump/compress_io.c | 218 ++++++++++++++++------------------
 1 file changed, 105 insertions(+), 113 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 421b8e04cb..06005c3fba 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -619,23 +619,27 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
  */
 struct cfp
 {
-	FILE	   *uncompressedfp;
+	CompressionAlgorithm alg;
+
+	union {
+		FILE	   *fp;
+
 #ifdef HAVE_LIBZ
-	gzFile		compressedfp;
+		gzFile		gzfp;
 #endif
 
-#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg?
-	/* This is a normal file to which we read/write compressed data */
-	struct {
-		FILE			*fp;
-		// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
-		ZSTD_CStream	*cstream;
-		ZSTD_DStream	*dstream;
-		ZSTD_outBuffer	output;
-		ZSTD_inBuffer	input;
-	} zstd;
+#ifdef HAVE_LIBZSTD
+		struct {
+			/* This is a normal file to which we read/write compressed data */
+			FILE			*fp;
+			// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+			ZSTD_CStream	*cstream;
+			ZSTD_DStream	*dstream;
+			ZSTD_outBuffer	output;
+			ZSTD_inBuffer	input;
+		} zstd;
 #endif
-
+	} u;
 };
 
 #ifdef HAVE_LIBZ
@@ -726,13 +730,7 @@ cfopen(const char *path, const char *mode, Compress *compression)
 {
 	cfp		   *fp = pg_malloc(sizeof(cfp));
 
-	fp->uncompressedfp = NULL;
-#ifdef HAVE_LIBZ
-	fp->compressedfp = NULL;
-#endif
-#ifdef HAVE_LIBZSTD
-	fp->zstd.fp = NULL;
-#endif
+	fp->alg = compression->alg;
 
 	switch (compression->alg)
 	{
@@ -745,15 +743,15 @@ cfopen(const char *path, const char *mode, Compress *compression)
 
 			snprintf(mode_compression, sizeof(mode_compression), "%s%d",
 					 mode, compression->level);
-			fp->compressedfp = gzopen(path, mode_compression);
+			fp->u.gzfp = gzopen(path, mode_compression);
 		}
 		else
 		{
 			/* don't specify a level, just use the zlib default */
-			fp->compressedfp = gzopen(path, mode);
+			fp->u.gzfp = gzopen(path, mode);
 		}
 
-		if (fp->compressedfp == NULL)
+		if (fp->u.gzfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
@@ -765,26 +763,26 @@ cfopen(const char *path, const char *mode, Compress *compression)
 
 #ifdef HAVE_LIBZSTD
 	case COMPR_ALG_ZSTD:
-		fp->zstd.fp = fopen(path, mode);
-		if (fp->zstd.fp == NULL)
+		fp->u.zstd.fp = fopen(path, mode);
+		if (fp->u.zstd.fp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
 		else if (strchr(mode, 'w'))
 		{
-			fp->zstd.dstream = NULL;
-			fp->zstd.output.size = ZSTD_CStreamOutSize();
-			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
-			fp->zstd.cstream = ZstdCStreamParams(compression);
+			fp->u.zstd.dstream = NULL;
+			fp->u.zstd.output.size = ZSTD_CStreamOutSize();
+			fp->u.zstd.output.dst = pg_malloc0(fp->u.zstd.output.size);
+			fp->u.zstd.cstream = ZstdCStreamParams(compression);
 		}
 		else if (strchr(mode, 'r'))
 		{
-			fp->zstd.cstream = NULL;
-			fp->zstd.input.size = ZSTD_DStreamInSize();
-			fp->zstd.input.src = pg_malloc0(fp->zstd.input.size);
-			fp->zstd.dstream = ZSTD_createDStream();
-			if (fp->zstd.dstream == NULL)
+			fp->u.zstd.cstream = NULL;
+			fp->u.zstd.input.size = ZSTD_DStreamInSize();
+			fp->u.zstd.input.src = pg_malloc0(fp->u.zstd.input.size);
+			fp->u.zstd.dstream = ZSTD_createDStream();
+			if (fp->u.zstd.dstream == NULL)
 				fatal("could not initialize compression library");
 		} // XXX else: bad mode
 		return fp;
@@ -792,8 +790,8 @@ cfopen(const char *path, const char *mode, Compress *compression)
 #endif
 
 	case COMPR_ALG_NONE:
-		fp->uncompressedfp = fopen(path, mode);
-		if (fp->uncompressedfp == NULL)
+		fp->u.fp = fopen(path, mode);
+		if (fp->u.fp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
@@ -817,26 +815,26 @@ cfread(void *ptr, int size, cfp *fp)
 		return 0;
 
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
+	if (fp->alg == COMPR_ALG_LIBZ)
 	{
-		ret = gzread(fp->compressedfp, ptr, size);
-		if (ret != size && !gzeof(fp->compressedfp))
+		ret = gzread(fp->u.gzfp, ptr, size);
+		if (ret != size && !gzeof(fp->u.gzfp))
 		{
 			int			errnum;
-			const char *errmsg = gzerror(fp->compressedfp, &errnum);
+			const char *errmsg = gzerror(fp->u.gzfp, &errnum);
 
 			fatal("could not read from input file: %s",
 				  errnum == Z_ERRNO ? strerror(errno) : errmsg);
 		}
+		return ret;
 	}
-	else
 #endif
 
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
+	if (fp->alg == COMPR_ALG_ZSTD)
 	{
-		ZSTD_outBuffer	*output = &fp->zstd.output;
-		ZSTD_inBuffer	*input = &fp->zstd.input;
+		ZSTD_outBuffer	*output = &fp->u.zstd.output;
+		ZSTD_inBuffer	*input = &fp->u.zstd.input;
 		size_t			input_size = ZSTD_DStreamInSize();
 		size_t			res, cnt;
 
@@ -845,7 +843,7 @@ cfread(void *ptr, int size, cfp *fp)
 		output->pos = 0;
 
 		/* read compressed data */
-		while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp)))
+		while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->u.zstd.fp)))
 		{
 			input->size = cnt;
 			input->pos = 0;
@@ -853,7 +851,7 @@ cfread(void *ptr, int size, cfp *fp)
 			for ( ; input->pos < input->size; )
 			{
 				/* decompress */
-				res = ZSTD_decompressStream(fp->zstd.dstream, output, input);
+				res = ZSTD_decompressStream(fp->u.zstd.dstream, output, input);
 				if (res == 0 || output->pos == output->size)
 					break;
 				if (ZSTD_isError(res))
@@ -864,16 +862,13 @@ cfread(void *ptr, int size, cfp *fp)
 				break; /* We read all the data that fits */
 		}
 
-		ret = output->pos;
+		return output->pos;
 	}
-	else
 #endif
 
-	{
-		ret = fread(ptr, 1, size, fp->uncompressedfp);
-		if (ret != size && !feof(fp->uncompressedfp))
-			READ_ERROR_EXIT(fp->uncompressedfp);
-	}
+	ret = fread(ptr, 1, size, fp->u.fp);
+	if (ret != size && !feof(fp->u.fp))
+		READ_ERROR_EXIT(fp->u.fp);
 	return ret;
 }
 
@@ -881,16 +876,16 @@ int
 cfwrite(const void *ptr, int size, cfp *fp)
 {
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
-		return gzwrite(fp->compressedfp, ptr, size);
-	else
+	if (fp->alg == COMPR_ALG_LIBZ)
+		return gzwrite(fp->u.gzfp, ptr, size);
 #endif
+
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
+	if (fp->alg == COMPR_ALG_ZSTD)
 	{
 		size_t      res, cnt;
-		ZSTD_outBuffer	*output = &fp->zstd.output;
-		ZSTD_inBuffer	*input = &fp->zstd.input;
+		ZSTD_outBuffer	*output = &fp->u.zstd.output;
+		ZSTD_inBuffer	*input = &fp->u.zstd.input;
 
 		input->src = ptr;
 		input->size = size;
@@ -900,21 +895,20 @@ cfwrite(const void *ptr, int size, cfp *fp)
 		while (input->pos != input->size)
 		{
 			output->pos = 0;
-			res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue);
+			res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_continue);
 			if (ZSTD_isError(res))
 				fatal("could not compress data: %s", ZSTD_getErrorName(res));
 
-			cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+			cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp);
 			if (cnt != output->pos)
 				fatal("could not write data: %s", strerror(errno));
 		}
 
 		return size;
 	}
-	else
 #endif
 
-	return fwrite(ptr, 1, size, fp->uncompressedfp);
+	return fwrite(ptr, 1, size, fp->u.fp);
 }
 
 int
@@ -923,39 +917,38 @@ cfgetc(cfp *fp)
 	int			ret;
 
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
+	if (fp->alg == COMPR_ALG_LIBZ)
 	{
-		ret = gzgetc(fp->compressedfp);
+		ret = gzgetc(fp->u.gzfp);
 		if (ret == EOF)
 		{
-			if (!gzeof(fp->compressedfp))
+			if (!gzeof(fp->u.gzfp))
 				fatal("could not read from input file: %s", strerror(errno));
 			else
 				fatal("could not read from input file: end of file");
 		}
+		return ret;
 	}
-	else
 #endif
+
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
+	if (fp->alg == COMPR_ALG_ZSTD)
 	{
 		if (cfread(&ret, 1, fp) != 1)
 		{
-			if (feof(fp->zstd.fp))
+			if (feof(fp->u.zstd.fp))
 				fatal("could not read from input file: end of file");
 			else
 				fatal("could not read from input file: %s", strerror(errno));
 		}
 fprintf(stderr, "cfgetc %d\n", ret);
+		return ret;
 	}
 #endif
 
-	{
-		ret = fgetc(fp->uncompressedfp);
-		if (ret == EOF)
-			READ_ERROR_EXIT(fp->uncompressedfp);
-	}
-
+	ret = fgetc(fp->u.fp);
+	if (ret == EOF)
+		READ_ERROR_EXIT(fp->u.fp);
 	return ret;
 }
 
@@ -963,12 +956,12 @@ char *
 cfgets(cfp *fp, char *buf, int len)
 {
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
-		return gzgets(fp->compressedfp, buf, len);
-	else
+	if (fp->alg == COMPR_ALG_LIBZ)
+		return gzgets(fp->u.gzfp, buf, len);
 #endif
+
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
+	if (fp->alg == COMPR_ALG_ZSTD)
 	{
 		int res;
 		res = cfread(buf, len, fp);
@@ -977,9 +970,9 @@ cfgets(cfp *fp, char *buf, int len)
 			*strchr(buf, '\n') = '\0';
 		return res > 0 ? buf : 0;
 	}
-	else
 #endif
-		return fgets(buf, len, fp->uncompressedfp);
+
+		return fgets(buf, len, fp->u.fp);
 }
 
 int
@@ -993,56 +986,55 @@ cfclose(cfp *fp)
 		return EOF;
 	}
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
+	if (fp->alg == COMPR_ALG_LIBZ)
 	{
-		result = gzclose(fp->compressedfp);
-		fp->compressedfp = NULL;
+		result = gzclose(fp->u.gzfp);
+		fp->u.gzfp = NULL;
+		return result;
 	}
-	else
 #endif
+
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
+	if (fp->alg == COMPR_ALG_ZSTD)
 	{
-		ZSTD_outBuffer	*output = &fp->zstd.output;
-		ZSTD_inBuffer	*input = &fp->zstd.input;
+		ZSTD_outBuffer	*output = &fp->u.zstd.output;
+		ZSTD_inBuffer	*input = &fp->u.zstd.input;
 		size_t res, cnt;
 
-		if (fp->zstd.cstream)
+		if (fp->u.zstd.cstream)
 		{
 			for (;;)
 			{
 				output->pos = 0;
-				res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end);
+				res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_end);
 				if (ZSTD_isError(res))
 					fatal("could not compress data: %s", ZSTD_getErrorName(res));
-				cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+				cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp);
 				if (cnt != output->pos)
 					fatal("could not write data: %s", strerror(errno));
 				if (res == 0)
 					break;
 			}
 
-			ZSTD_freeCStream(fp->zstd.cstream);
-			pg_free(fp->zstd.output.dst);
+			ZSTD_freeCStream(fp->u.zstd.cstream);
+			pg_free(fp->u.zstd.output.dst);
 		}
 
-		if (fp->zstd.dstream)
+		if (fp->u.zstd.dstream)
 		{
-			ZSTD_freeDStream(fp->zstd.dstream);
-			pg_free(unconstify(void *, fp->zstd.input.src));
+			ZSTD_freeDStream(fp->u.zstd.dstream);
+			pg_free(unconstify(void *, fp->u.zstd.input.src));
 		}
 
-		result = fclose(fp->zstd.fp);
-		fp->zstd.fp = NULL;
+		result = fclose(fp->u.zstd.fp);
+		fp->u.zstd.fp = NULL;
+		return result;
 	}
-	else
 #endif
-	{
-		result = fclose(fp->uncompressedfp);
-		fp->uncompressedfp = NULL;
-	}
-	free_keep_errno(fp);
 
+	result = fclose(fp->u.fp);
+	fp->u.fp = NULL;
+	free_keep_errno(fp);
 	return result;
 }
 
@@ -1050,26 +1042,26 @@ int
 cfeof(cfp *fp)
 {
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
-		return gzeof(fp->compressedfp);
-	else
+	if (fp->alg == COMPR_ALG_LIBZ)
+		return gzeof(fp->u.gzfp);
 #endif
+
 #ifdef HAVE_LIBZSTD
-	if (fp->zstd.fp)
-		return feof(fp->zstd.fp);
-	else
+	if (fp->alg == COMPR_ALG_ZSTD)
+		return feof(fp->u.zstd.fp);
 #endif
-		return feof(fp->uncompressedfp);
+
+	return feof(fp->u.fp);
 }
 
 const char *
 get_cfp_error(cfp *fp)
 {
 #ifdef HAVE_LIBZ
-	if (fp->compressedfp)
+	if (fp->alg == COMPR_ALG_LIBZ)
 	{
 		int			errnum;
-		const char *errmsg = gzerror(fp->compressedfp, &errnum);
+		const char *errmsg = gzerror(fp->u.gzfp, &errnum);
 
 		if (errnum != Z_ERRNO)
 			return errmsg;
-- 
2.17.0

>From 15cd65b4d650009cf18f48357e9dfe9173d32446 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 11 Dec 2020 22:22:31 -0600
Subject: [PATCH 6/7] Move zlib into the union{}

---
 src/bin/pg_dump/compress_io.c | 54 ++++++++++++++++++-----------------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 06005c3fba..a94efbea4e 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -67,23 +67,25 @@ struct CompressorState
 	CompressionAlgorithm comprAlg;
 	WriteFunc	writeF;
 
+	union {
 #ifdef HAVE_LIBZ
-	z_streamp	zp;
-	char	   *zlibOut;
-	size_t		zlibOutSize;
+		struct {
+			z_streamp	zp;
+			char	   *zlibOut;
+			size_t		zlibOutSize;
+		} zlib;
 #endif
 
 #ifdef HAVE_LIBZSTD
-	union {
+		/* This is used for compression but not decompression */
 		struct {
-			ZSTD_outBuffer output;
-			ZSTD_inBuffer input;
 			// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
-			ZSTD_CStream *cstream;
+			ZSTD_CStream	*cstream;
+			ZSTD_outBuffer	output;
+			ZSTD_inBuffer	input;
 		} zstd;
-	} u;
 #endif
-
+	} u;
 };
 
 /* Routines that support zlib compressed data I/O */
@@ -420,7 +422,7 @@ InitCompressorZlib(CompressorState *cs, Compress *compress)
 {
 	z_streamp	zp;
 
-	zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+	zp = cs->u.zlib.zp = (z_streamp) pg_malloc(sizeof(z_stream));
 	zp->zalloc = Z_NULL;
 	zp->zfree = Z_NULL;
 	zp->opaque = Z_NULL;
@@ -430,22 +432,22 @@ InitCompressorZlib(CompressorState *cs, Compress *compress)
 	 * actually allocate one extra byte because some routines want to append a
 	 * trailing zero byte to the zlib output.
 	 */
-	cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
-	cs->zlibOutSize = ZLIB_OUT_SIZE;
+	cs->u.zlib.zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+	cs->u.zlib.zlibOutSize = ZLIB_OUT_SIZE;
 
 	if (deflateInit(zp, compress->level) != Z_OK)
 		fatal("could not initialize compression library: %s",
 			  zp->msg);
 
 	/* Just be paranoid - maybe End is called after Start, with no Write */
-	zp->next_out = (void *) cs->zlibOut;
-	zp->avail_out = cs->zlibOutSize;
+	zp->next_out = (void *) cs->u.zlib.zlibOut;
+	zp->avail_out = cs->u.zlib.zlibOutSize;
 }
 
 static void
 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
 {
-	z_streamp	zp = cs->zp;
+	z_streamp	zp = cs->u.zlib.zp;
 
 	zp->next_in = NULL;
 	zp->avail_in = 0;
@@ -456,23 +458,23 @@ EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
 	if (deflateEnd(zp) != Z_OK)
 		fatal("could not close compression stream: %s", zp->msg);
 
-	free(cs->zlibOut);
-	free(cs->zp);
+	free(cs->u.zlib.zlibOut);
+	free(cs->u.zlib.zp);
 }
 
 static void
 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
 {
-	z_streamp	zp = cs->zp;
-	char	   *out = cs->zlibOut;
+	z_streamp	zp = cs->u.zlib.zp;
+	char	   *out = cs->u.zlib.zlibOut;
 	int			res = Z_OK;
 
-	while (cs->zp->avail_in != 0 || flush)
+	while (cs->u.zlib.zp->avail_in != 0 || flush)
 	{
 		res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
 		if (res == Z_STREAM_ERROR)
 			fatal("could not compress data: %s", zp->msg);
-		if ((flush && (zp->avail_out < cs->zlibOutSize))
+		if ((flush && (zp->avail_out < cs->u.zlib.zlibOutSize))
 			|| (zp->avail_out == 0)
 			|| (zp->avail_in != 0)
 			)
@@ -482,18 +484,18 @@ DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
 			 * chunk is the EOF marker in the custom format. This should never
 			 * happen but...
 			 */
-			if (zp->avail_out < cs->zlibOutSize)
+			if (zp->avail_out < cs->u.zlib.zlibOutSize)
 			{
 				/*
 				 * Any write function should do its own error checking but to
 				 * make sure we do a check here as well...
 				 */
-				size_t		len = cs->zlibOutSize - zp->avail_out;
+				size_t		len = cs->u.zlib.zlibOutSize - zp->avail_out;
 
 				cs->writeF(AH, out, len);
 			}
 			zp->next_out = (void *) out;
-			zp->avail_out = cs->zlibOutSize;
+			zp->avail_out = cs->u.zlib.zlibOutSize;
 		}
 
 		if (res == Z_STREAM_END)
@@ -505,8 +507,8 @@ static void
 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
 					   const char *data, size_t dLen)
 {
-	cs->zp->next_in = (void *) unconstify(char *, data);
-	cs->zp->avail_in = dLen;
+	cs->u.zlib.zp->next_in = (void *) unconstify(char *, data);
+	cs->u.zlib.zp->avail_in = dLen;
 	DeflateCompressorZlib(AH, cs, false);
 }
 
-- 
2.17.0

Reply via email to