On Mon, Dec 21, 2020 at 03:02:40PM -0500, Tom Lane wrote: > Justin Pryzby <pry...@telsasoft.com> writes: > > I found that our largest tables are 40% smaller and 20% faster to pipe > > pg_dump -Fc -Z0 |zstd relative to native zlib > > The patch might be a tad smaller if you hadn't included a core file in it.
About 89% smaller. This also fixes the extension (.zst) And fixes zlib default compression. And a bunch of cleanup. -- Justin
>From c506c8a93ae70726a5b4b33a1d0098caf5665f3a Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Mon, 21 Dec 2020 00:32:32 -0600 Subject: [PATCH 1/7] fix pre-existing docs/comments --- doc/src/sgml/ref/pg_dump.sgml | 2 +- src/bin/pg_dump/pg_backup_directory.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index 0aa35cf0c3..dcb25dc3cd 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -621,7 +621,7 @@ PostgreSQL documentation <listitem> <para> Specify the compression level to use. Zero means no compression. - For the custom archive format, this specifies compression of + For the custom and directory archive formats, this specifies compression of individual table-data segments, and the default is to compress at a moderate level. For plain text output, setting a nonzero compression level causes diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 48fa7cb1a3..650b542fce 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -4,7 +4,7 @@ * * A directory format dump is a directory, which contains a "toc.dat" file * for the TOC, and a separate file for each data entry, named "<oid>.dat". - * Large objects (BLOBs) are stored in separate files named "blob_<uid>.dat", + * Large objects (BLOBs) are stored in separate files named "blob_<oid>.dat", * and there's a plain-text TOC file for them called "blobs.toc". If * compression is used, each data file is individually compressed and the * ".gz" suffix is added to the filenames. The TOC files are never -- 2.17.0
>From 49f400b5b2e559cc2ed6be82e04a4dc4ba5c63b3 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 12 Dec 2020 00:11:37 -0600 Subject: [PATCH 2/7] Fix malformed comment.. broken since bf50caf10. --- src/bin/pg_dump/pg_backup_archiver.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 177360ed6e..6a5a22637b 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -329,9 +329,12 @@ struct _archiveHandle DumpId *tableDataId; /* TABLE DATA ids, indexed by table dumpId */ struct _tocEntry *currToc; /* Used when dumping data */ - int compression; /* Compression requested on open Possible - * values for compression: -1 - * Z_DEFAULT_COMPRESSION 0 COMPRESSION_NONE + int compression; /*--------- + * Compression requested on open + * Possible values for compression: + * -2 ZSTD_COMPRESSION + * -1 Z_DEFAULT_COMPRESSION + * 0 COMPRESSION_NONE * 1-9 levels for gzip compression */ bool dosync; /* data requested to be synced on sight */ ArchiveMode mode; /* File mode - r or w */ -- 2.17.0
>From 8a8939a3060c984af289b5fe62754d94f2675248 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Fri, 11 Dec 2020 15:01:25 -0600 Subject: [PATCH 3/7] pg_dump: zstd compression document any change in search for .gz? docs --- configure | 123 ++++++- configure.ac | 22 ++ src/bin/pg_dump/compress_io.c | 475 ++++++++++++++++++++++++-- src/bin/pg_dump/compress_io.h | 5 +- src/bin/pg_dump/pg_backup_archiver.h | 5 + src/bin/pg_dump/pg_backup_directory.c | 6 +- src/bin/pg_dump/pg_dump.c | 3 +- src/include/pg_config.h.in | 3 + 8 files changed, 597 insertions(+), 45 deletions(-) diff --git a/configure b/configure index 11a4284e5b..240e536e04 100755 --- a/configure +++ b/configure @@ -698,6 +698,7 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +with_zstd with_zlib with_system_tzdata with_libxslt @@ -798,6 +799,7 @@ infodir docdir oldincludedir includedir +runstatedir localstatedir sharedstatedir sysconfdir @@ -866,6 +868,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_zstd with_gnu_ld enable_largefile ' @@ -935,6 +938,7 @@ datadir='${datarootdir}' sysconfdir='${prefix}/etc' sharedstatedir='${prefix}/com' localstatedir='${prefix}/var' +runstatedir='${localstatedir}/run' includedir='${prefix}/include' oldincludedir='/usr/include' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' @@ -1187,6 +1191,15 @@ do | -silent | --silent | --silen | --sile | --sil) silent=yes ;; + -runstatedir | --runstatedir | --runstatedi | --runstated \ + | --runstate | --runstat | --runsta | --runst | --runs \ + | --run | --ru | --r) + ac_prev=runstatedir ;; + -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \ + | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \ + | --run=* | --ru=* | --r=*) + runstatedir=$ac_optarg ;; + -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) ac_prev=sbindir ;; -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ @@ -1324,7 +1337,7 @@ fi for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ datadir sysconfdir sharedstatedir localstatedir includedir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ - libdir localedir mandir + libdir localedir mandir runstatedir do eval ac_val=\$$ac_var # Remove trailing slashes. @@ -1477,6 +1490,7 @@ Fine tuning of the installation directories: --sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --localstatedir=DIR modifiable single-machine data [PREFIX/var] + --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run] --libdir=DIR object code libraries [EPREFIX/lib] --includedir=DIR C header files [PREFIX/include] --oldincludedir=DIR C header files for non-gcc [/usr/include] @@ -1570,6 +1584,7 @@ Optional Packages: --with-system-tzdata=DIR use system time zone data in DIR --without-zlib do not use Zlib + --without-zstd do not use Zstd --with-gnu-ld assume the C compiler uses GNU ld [default=no] Some influential environment variables: @@ -8601,6 +8616,35 @@ fi +# +# Zstd +# + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + : + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=yes + +fi + + + + # # Assignments # @@ -12092,6 +12136,59 @@ fi fi +if test "$with_zstd" = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressStream2 in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compressStream2 in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compressStream2+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compressStream2 (); +int +main () +{ +return ZSTD_compressStream2 (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compressStream2=yes +else + ac_cv_lib_zstd_ZSTD_compressStream2=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressStream2" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compressStream2" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compressStream2" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support." "$LINENO" 5 +fi + +fi + if test "$enable_spinlocks" = yes; then $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h @@ -13295,6 +13392,20 @@ Use --without-zlib to disable zlib support." "$LINENO" 5 fi +fi + +if test "$with_zstd" = yes; then + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + +else + as_fn_error $? "zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support." "$LINENO" 5 +fi + + fi if test "$with_gssapi" = yes ; then @@ -14689,7 +14800,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14735,7 +14846,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14804,7 +14915,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; diff --git a/configure.ac b/configure.ac index fc523c6aeb..7f7222159a 100644 --- a/configure.ac +++ b/configure.ac @@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes, [do not use Zlib]) AC_SUBST(with_zlib) +# +# Zstd +# +PGAC_ARG_BOOL(with, zstd, yes, + [do not use Zstd]) +AC_SUBST(with_zstd) + # # Assignments # @@ -1186,6 +1193,14 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_LIB(zstd, ZSTD_compressStream2, [], + [AC_MSG_ERROR([zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) +fi + if test "$enable_spinlocks" = yes; then AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) else @@ -1400,6 +1415,13 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 1417401086..b51ba680a2 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -13,7 +13,7 @@ * friends, providing an interface similar to those, but abstracts away * the possible compression. Both APIs use libz for the compression, but * the second API uses gzip headers, so the resulting files can be easily - * manipulated with the gzip utility. + * manipulated with the gzip utility. XXX * * Compressor API * -------------- @@ -41,7 +41,7 @@ * libz's gzopen() APIs. It allows you to use the same functions for * compressed and uncompressed streams. cfopen_read() first tries to open * the file with given name, and if it fails, it tries to open the same - * file with the .gz suffix. cfopen_write() opens a file for writing, an + * file with the .gz suffix. cfopen_write() opens a file for writing, an XXX * extra argument specifies if the file should be compressed, and adds the * .gz suffix to the filename if so. This allows you to easily handle both * compressed and uncompressed files. @@ -72,6 +72,18 @@ struct CompressorState char *zlibOut; size_t zlibOutSize; #endif + +#ifdef HAVE_LIBZSTD + union { + struct { + ZSTD_outBuffer output; + ZSTD_inBuffer input; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + } zstd; + } u; +#endif + }; static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, @@ -88,6 +100,15 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); #endif +#ifdef HAVE_LIBZSTD +static void InitCompressorZstd(CompressorState *cs, int level); +static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs); +static void DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs); +static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen); +static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF); +#endif + /* Routines that support uncompressed data I/O */ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, @@ -101,15 +122,25 @@ static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level) { - if (compression == Z_DEFAULT_COMPRESSION || - (compression > 0 && compression <= 9)) - *alg = COMPR_ALG_LIBZ; - else if (compression == 0) - *alg = COMPR_ALG_NONE; - else + switch (compression) { - fatal("invalid compression code: %d", compression); - *alg = COMPR_ALG_NONE; /* keep compiler quiet */ +#ifdef HAVE_LIBZSTD + case ZSTD_COMPRESSION: + *alg = COMPR_ALG_ZSTD; + break; +#endif +#ifdef HAVE_ZLIB + case Z_DEFAULT_COMPRESSION: + case 1..9: + *alg = COMPR_ALG_LIBZ; + break; +#endif + case 0: + *alg = COMPR_ALG_NONE; + break; + default: + fatal("invalid compression code: %d", compression); + *alg = COMPR_ALG_NONE; /* keep compiler quiet */ } /* The level is just the passed-in value. */ @@ -141,10 +172,23 @@ AllocateCompressor(int compression, WriteFunc writeF) /* * Perform compression algorithm specific initialization. */ + switch (alg) + { #ifdef HAVE_LIBZ - if (alg == COMPR_ALG_LIBZ) + case COMPR_ALG_LIBZ: InitCompressorZlib(cs, level); + break; +#endif +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + InitCompressorZstd(cs, level); + break; #endif + case COMPR_ALG_NONE: + /* Do nothing */ + break; + // default: + } return cs; } @@ -162,12 +206,20 @@ ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF) if (alg == COMPR_ALG_NONE) ReadDataFromArchiveNone(AH, readF); - if (alg == COMPR_ALG_LIBZ) + else if (alg == COMPR_ALG_LIBZ) { #ifdef HAVE_LIBZ ReadDataFromArchiveZlib(AH, readF); #else fatal("not built with zlib support"); +#endif + } + else if (alg == COMPR_ALG_ZSTD) + { +#ifdef HAVE_LIBZSTD + ReadDataFromArchiveZstd(AH, readF); +#else + fatal("not built with zstd support"); #endif } } @@ -188,6 +240,15 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, fatal("not built with zlib support"); #endif break; + + case COMPR_ALG_ZSTD: +#ifdef HAVE_LIBZSTD + WriteDataToArchiveZstd(AH, cs, data, dLen); +#else + fatal("not built with zstd support"); +#endif + break; + case COMPR_ALG_NONE: WriteDataToArchiveNone(AH, cs, data, dLen); break; @@ -204,12 +265,172 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs) if (cs->comprAlg == COMPR_ALG_LIBZ) EndCompressorZlib(AH, cs); #endif +#ifdef HAVE_LIBZSTD + if (cs->comprAlg == COMPR_ALG_ZSTD) + EndCompressorZstd(AH, cs); +#endif + free(cs); } /* Private routines, specific to each compression method. */ +// XXX: put in separate files ? + +#ifdef HAVE_LIBZSTD +static void +InitCompressorZstd(CompressorState *cs, int level) +{ + cs->u.zstd.cstream = ZSTD_createCStream(); + if (cs->u.zstd.cstream == NULL) + fatal("could not initialize compression library"); + + /* XXX: initialize safely like the corresponding zlib "paranoia" */ + cs->u.zstd.output.size = ZSTD_CStreamOutSize(); + cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size); + cs->u.zstd.output.pos = 0; +} + +static void +EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZSTD_outBuffer *output = &cs->u.zstd.output; + + for (;;) + { + size_t res; + + res = ZSTD_compressStream2(cs->u.zstd.cstream, output, + &cs->u.zstd.input, ZSTD_e_end); + + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + if (res == 0) + break; + + if (ZSTD_isError(res)) + fatal("could not close compression stream: %s", + ZSTD_getErrorName(res)); + } + + // XXX: retval + ZSTD_freeCStream(cs->u.zstd.cstream); +} + +static void +DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZSTD_inBuffer *input = &cs->u.zstd.input; + ZSTD_outBuffer *output = &cs->u.zstd.output; + + while (input->pos != input->size) + { + size_t res; + + res = ZSTD_compressStream2(cs->u.zstd.cstream, output, + input, ZSTD_e_continue); + + if (output->pos == output->size || + input->pos != input->size) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + output->pos = 0; + } + + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + } +} + +static void +WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen) +{ + cs->u.zstd.input.src = (void *) unconstify(char *, data); + cs->u.zstd.input.size = dLen; + cs->u.zstd.input.pos = 0; + DeflateCompressorZstd(AH, cs); +} + +/* Read data from a compressed zstd archive */ +static void +ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF) +{ + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + size_t res; + size_t input_size; + + dstream = ZSTD_createDStream(); + if (dstream == NULL) + fatal("could not initialize compression library"); + + input_size = ZSTD_DStreamInSize(); + input.src = pg_malloc(input_size); + + output.size = ZSTD_DStreamOutSize(); + output.dst = pg_malloc(output.size); + + /* read compressed data */ + for (;;) + { + size_t cnt; + + input.size = input_size; // XXX: the buffer can grow, we shouldn't keep resetting it to the original value.. + cnt = readF(AH, (char **)unconstify(void **, &input.src), &input.size); + input.pos = 0; + input.size = cnt; + + if (cnt == 0) + break; + + while (input.pos < input.size) + { + /* decompress */ + output.pos = 0; + res = ZSTD_decompressStream(dstream, &output, &input); + + if (ZSTD_isError(res)) + fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + /* write to output handle */ + ((char *)output.dst)[output.pos] = '\0'; + ahwrite(output.dst, 1, output.pos, AH); + } + } + + /* write any remainder to output handle */ + /* XXX: is it needed? */ + /* If `input.pos < input.size`, some input has not been consumed. */ + /* But if `output.pos == output.size`, there might be some data left within internal buffers., */ +#if 0 + while (input.pos < input.size || output.pos == output.size) + { + output.pos = 0; + res = ZSTD_decompressStream(dstream, &output, &input); + if (ZSTD_isError(res)) + fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + ((char *)output.dst)[output.pos] = '\0'; + ahwrite(output.dst, 1, output.pos, AH); + } +#endif + + pg_free(unconstify(void *, input.src)); + pg_free(output.dst); +} + +#endif /* HAVE_LIBZSTD */ #ifdef HAVE_LIBZ + /* * Functions for zlib compressed output. */ @@ -422,6 +643,19 @@ struct cfp #ifdef HAVE_LIBZ gzFile compressedfp; #endif + +#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg? + /* This is a normal file to which we read/write compressed data */ + struct { + FILE *fp; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + } zstd; +#endif + }; #ifdef HAVE_LIBZ @@ -449,24 +683,25 @@ free_keep_errno(void *p) * On failure, return NULL with an error code in errno. */ cfp * -cfopen_read(const char *path, const char *mode) +cfopen_read(const char *path, const char *mode, int compression) { cfp *fp; #ifdef HAVE_LIBZ - if (hasSuffix(path, ".gz")) - fp = cfopen(path, mode, 1); + if (hasSuffix(path, ".gz") || hasSuffix(path, ".zst")) + fp = cfopen(path, mode, compression); else #endif { - fp = cfopen(path, mode, 0); + fp = cfopen(path, mode, compression); #ifdef HAVE_LIBZ if (fp == NULL) { char *fname; + char *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz"; - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, 1); + fname = psprintf("%s.%s", path, suffix); + fp = cfopen(fname, mode, compression); free_keep_errno(fname); } #endif @@ -491,13 +726,14 @@ cfopen_write(const char *path, const char *mode, int compression) cfp *fp; if (compression == 0) - fp = cfopen(path, mode, 0); + fp = cfopen(path, mode, compression); else { -#ifdef HAVE_LIBZ +#ifdef HAVE_LIBZ // XXX char *fname; + char *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz"; - fname = psprintf("%s.gz", path); + fname = psprintf("%s.%s", path, suffix); fp = cfopen(fname, mode, compression); free_keep_errno(fname); #else @@ -505,11 +741,12 @@ cfopen_write(const char *path, const char *mode, int compression) fp = NULL; /* keep compiler quiet */ #endif } + return fp; } /* - * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file + * Opens file 'path' in 'mode'. If 'alg' is COMPR_ALG_ZLIB, the file * is opened with libz gzopen(), otherwise with plain fopen(). * * On failure, return NULL with an error code in errno. @@ -519,9 +756,19 @@ cfopen(const char *path, const char *mode, int compression) { cfp *fp = pg_malloc(sizeof(cfp)); - if (compression != 0) + fp->uncompressedfp = NULL; +#ifdef HAVE_LIBZ + fp->compressedfp = NULL; +#endif +#ifdef HAVE_LIBZSTD + fp->zstd.fp = NULL; +#endif + + switch (compression) { #ifdef HAVE_LIBZ + case 1 ... 9: // XXX: nonportable + case Z_DEFAULT_COMPRESSION: if (compression != Z_DEFAULT_COMPRESSION) { /* user has specified a compression level, so tell zlib to use it */ @@ -537,30 +784,57 @@ cfopen(const char *path, const char *mode, int compression) fp->compressedfp = gzopen(path, mode); } - fp->uncompressedfp = NULL; if (fp->compressedfp == NULL) { free_keep_errno(fp); fp = NULL; } -#else - fatal("not built with zlib support"); + + return fp; + break; #endif - } - else - { -#ifdef HAVE_LIBZ - fp->compressedfp = NULL; + +#ifdef HAVE_LIBZSTD + case ZSTD_COMPRESSION: + fp->zstd.fp = fopen(path, mode); + // XXX: save the compression params + if (fp->zstd.fp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } + else if (strchr(mode, 'w')) + { + fp->zstd.dstream = NULL; + fp->zstd.output.size = ZSTD_CStreamOutSize(); // XXX + fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); + fp->zstd.cstream = ZSTD_createCStream(); + if (fp->zstd.cstream == NULL) + fatal("could not initialize compression library"); + } + else if (strchr(mode, 'r')) + { + fp->zstd.cstream = NULL; + fp->zstd.input.size = ZSTD_DStreamOutSize(); // XXX + fp->zstd.input.src = pg_malloc0(fp->zstd.input.size); + fp->zstd.dstream = ZSTD_createDStream(); + if (fp->zstd.dstream == NULL) + fatal("could not initialize compression library"); + } // XXX else: bad mode + return fp; + break; #endif + + default: fp->uncompressedfp = fopen(path, mode); if (fp->uncompressedfp == NULL) { free_keep_errno(fp); fp = NULL; } - } - return fp; + return fp; + } } @@ -587,6 +861,44 @@ cfread(void *ptr, int size, cfp *fp) } else #endif + +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + size_t input_size = ZSTD_DStreamInSize(); + size_t res, cnt; + + output->size = size; + output->dst = ptr; + output->pos = 0; + + /* read compressed data */ + while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp))) + { + input->size = cnt; + input->pos = 0; + + for ( ; input->pos < input->size; ) + { + /* decompress */ + res = ZSTD_decompressStream(fp->zstd.dstream, output, input); + if (res == 0 || output->pos == output->size) + break; + if (ZSTD_isError(res)) + fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + } + + if (output->pos == output->size) + break; /* We read all the data that fits */ + } + + ret = output->pos; + } + else +#endif + { ret = fread(ptr, 1, size, fp->uncompressedfp); if (ret != size && !feof(fp->uncompressedfp)) @@ -603,6 +915,35 @@ cfwrite(const void *ptr, int size, cfp *fp) return gzwrite(fp->compressedfp, ptr, size); else #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + size_t res, cnt; + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + + input->src = ptr; + input->size = size; + input->pos = 0; + + /* Consume all input, and flush later */ + while (input->pos != input->size) + { + output->pos = 0; + res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue); + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + + cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + if (cnt != output->pos) + fatal("could not write data: %s", strerror(errno)); + } + + return size; + } + else +#endif + return fwrite(ptr, 1, size, fp->uncompressedfp); } @@ -625,6 +966,20 @@ cfgetc(cfp *fp) } else #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + if (cfread(&ret, 1, fp) != 1) + { + if (feof(fp->zstd.fp)) + fatal("could not read from input file: end of file"); + else + fatal("could not read from input file: %s", strerror(errno)); + } +fprintf(stderr, "cfgetc %d\n", ret); + } +#endif + { ret = fgetc(fp->uncompressedfp); if (ret == EOF) @@ -641,6 +996,18 @@ cfgets(cfp *fp, char *buf, int len) if (fp->compressedfp) return gzgets(fp->compressedfp, buf, len); else +#endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + int res; + res = cfread(buf, len, fp); + buf[res] = 0; + if (strchr(buf, '\n')) + *strchr(buf, '\n') = '\0'; + return buf; + } + else #endif return fgets(buf, len, fp->uncompressedfp); } @@ -662,6 +1029,43 @@ cfclose(cfp *fp) fp->compressedfp = NULL; } else +#endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + size_t res, cnt; + + if (fp->zstd.cstream) + { + for (;;) + { + output->pos = 0; + res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end); + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + if (cnt != output->pos) + fatal("could not write data: %s", strerror(errno)); + if (res == 0) + break; + } + + ZSTD_freeCStream(fp->zstd.cstream); + pg_free(fp->zstd.output.dst); + } + + if (fp->zstd.dstream) + { + ZSTD_freeDStream(fp->zstd.dstream); + pg_free(unconstify(void *, fp->zstd.input.src)); + } + + result = fclose(fp->zstd.fp); + fp->zstd.fp = NULL; + } + else #endif { result = fclose(fp->uncompressedfp); @@ -679,6 +1083,11 @@ cfeof(cfp *fp) if (fp->compressedfp) return gzeof(fp->compressedfp); else +#endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + return feof(fp->zstd.fp); + else #endif return feof(fp->uncompressedfp); } diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index d2e6e1b854..f0ce06f176 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -24,7 +24,8 @@ typedef enum { COMPR_ALG_NONE, - COMPR_ALG_LIBZ + COMPR_ALG_LIBZ, + COMPR_ALG_ZSTD, } CompressionAlgorithm; /* Prototype for callback function to WriteDataToArchive() */ @@ -57,7 +58,7 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); typedef struct cfp cfp; extern cfp *cfopen(const char *path, const char *mode, int compression); -extern cfp *cfopen_read(const char *path, const char *mode); +extern cfp *cfopen_read(const char *path, const char *mode, int compression); extern cfp *cfopen_write(const char *path, const char *mode, int compression); extern int cfread(void *ptr, int size, cfp *fp); extern int cfwrite(const void *ptr, int size, cfp *fp); diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 6a5a22637b..1a229ebedb 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -60,6 +60,11 @@ typedef struct _z_stream typedef z_stream *z_streamp; #endif +#ifdef HAVE_LIBZSTD +#include <zstd.h> +#define ZSTD_COMPRESSION -2 +#endif /* HAVE_LIBZSTD */ + /* Data block types */ #define BLK_DATA 1 #define BLK_BLOBS 3 diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 650b542fce..6e55cb425e 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = cfopen_read(fname, PG_BINARY_R, AH->compression); if (tocFH == NULL) fatal("could not open input file \"%s\": %m", fname); @@ -388,7 +388,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); + cfp = cfopen_read(filename, PG_BINARY_R, AH->compression); if (!cfp) fatal("could not open input file \"%s\": %m", filename); @@ -440,7 +440,7 @@ _LoadBlobs(ArchiveHandle *AH) setFilePath(AH, fname, "blobs.toc"); - ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R); + ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, AH->compression); if (ctx->blobsTocFH == NULL) fatal("could not open large object TOC file \"%s\" for input: %m", diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 8b1e5cc2b5..f21eb021c7 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -533,7 +533,8 @@ main(int argc, char **argv) case 'Z': /* Compression Level */ compressLevel = atoi(optarg); - if (compressLevel < 0 || compressLevel > 9) + if ((compressLevel < 0 || compressLevel > 9) && + compressLevel != -2) { pg_log_error("compression level must be in range 0..9"); exit_nicely(1); diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index de8f838e53..da35415c72 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -346,6 +346,9 @@ /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK -- 2.17.0
>From 4b87bdba077da1e1609a1d7dbb40772d432c28dd Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 12 Dec 2020 00:40:39 -0600 Subject: [PATCH 4/7] cmdline parser for compression alg/level/opts --- src/bin/pg_dump/compress_io.c | 235 ++++++++++++-------------- src/bin/pg_dump/compress_io.h | 20 +-- src/bin/pg_dump/pg_backup.h | 19 ++- src/bin/pg_dump/pg_backup_archiver.c | 45 ++--- src/bin/pg_dump/pg_backup_archiver.h | 11 +- src/bin/pg_dump/pg_backup_custom.c | 6 +- src/bin/pg_dump/pg_backup_directory.c | 19 ++- src/bin/pg_dump/pg_backup_tar.c | 33 ++-- src/bin/pg_dump/pg_dump.c | 157 ++++++++++++++--- 9 files changed, 324 insertions(+), 221 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index b51ba680a2..421b8e04cb 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -86,12 +86,9 @@ struct CompressorState }; -static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, - int *level); - /* Routines that support zlib compressed data I/O */ #ifdef HAVE_LIBZ -static void InitCompressorZlib(CompressorState *cs, int level); +static void InitCompressorZlib(CompressorState *cs, Compress *compress); static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush); static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); @@ -101,9 +98,9 @@ static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); #endif #ifdef HAVE_LIBZSTD -static void InitCompressorZstd(CompressorState *cs, int level); +static ZSTD_CStream *ZstdCStreamParams(Compress *compress); +static void InitCompressorZstd(CompressorState *cs, Compress *compress); static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs); -static void DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs); static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen); static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF); @@ -114,80 +111,40 @@ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen); -/* - * Interprets a numeric 'compression' value. The algorithm implied by the - * value (zlib or none at the moment), is returned in *alg, and the - * zlib compression level in *level. - */ -static void -ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level) -{ - switch (compression) - { -#ifdef HAVE_LIBZSTD - case ZSTD_COMPRESSION: - *alg = COMPR_ALG_ZSTD; - break; -#endif -#ifdef HAVE_ZLIB - case Z_DEFAULT_COMPRESSION: - case 1..9: - *alg = COMPR_ALG_LIBZ; - break; -#endif - case 0: - *alg = COMPR_ALG_NONE; - break; - default: - fatal("invalid compression code: %d", compression); - *alg = COMPR_ALG_NONE; /* keep compiler quiet */ - } - - /* The level is just the passed-in value. */ - if (level) - *level = compression; -} - /* Public interface routines */ /* Allocate a new compressor */ CompressorState * -AllocateCompressor(int compression, WriteFunc writeF) +AllocateCompressor(Compress *compression, WriteFunc writeF) { CompressorState *cs; - CompressionAlgorithm alg; - int level; - - ParseCompressionOption(compression, &alg, &level); - -#ifndef HAVE_LIBZ - if (alg == COMPR_ALG_LIBZ) - fatal("not built with zlib support"); -#endif cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); cs->writeF = writeF; - cs->comprAlg = alg; + cs->comprAlg = compression->alg; /* * Perform compression algorithm specific initialization. */ - switch (alg) + Assert (compression->alg != COMPR_ALG_DEFAULT); + switch (compression->alg) { #ifdef HAVE_LIBZ case COMPR_ALG_LIBZ: - InitCompressorZlib(cs, level); + InitCompressorZlib(cs, compression); break; #endif #ifdef HAVE_LIBZSTD case COMPR_ALG_ZSTD: - InitCompressorZstd(cs, level); + InitCompressorZstd(cs, compression); break; #endif case COMPR_ALG_NONE: /* Do nothing */ break; - // default: + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); } return cs; @@ -198,12 +155,9 @@ AllocateCompressor(int compression, WriteFunc writeF) * out with ahwrite(). */ void -ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF) +ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF) { - CompressionAlgorithm alg; - - ParseCompressionOption(compression, &alg, NULL); - + CompressionAlgorithm alg = AH->compression.alg; if (alg == COMPR_ALG_NONE) ReadDataFromArchiveNone(AH, readF); else if (alg == COMPR_ALG_LIBZ) @@ -233,25 +187,25 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, { switch (cs->comprAlg) { - case COMPR_ALG_LIBZ: #ifdef HAVE_LIBZ + case COMPR_ALG_LIBZ: WriteDataToArchiveZlib(AH, cs, data, dLen); -#else - fatal("not built with zlib support"); -#endif break; +#endif - case COMPR_ALG_ZSTD: #ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: WriteDataToArchiveZstd(AH, cs, data, dLen); -#else - fatal("not built with zstd support"); -#endif break; +#endif case COMPR_ALG_NONE: WriteDataToArchiveNone(AH, cs, data, dLen); break; + + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); } } @@ -277,13 +231,42 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs) // XXX: put in separate files ? #ifdef HAVE_LIBZSTD -static void -InitCompressorZstd(CompressorState *cs, int level) +static ZSTD_CStream* +ZstdCStreamParams(Compress *compress) { - cs->u.zstd.cstream = ZSTD_createCStream(); - if (cs->u.zstd.cstream == NULL) + size_t res; + ZSTD_CStream *cstream; + + cstream = ZSTD_createCStream(); + if (cstream == NULL) fatal("could not initialize compression library"); + if (compress->level != 0) // XXX: ZSTD_CLEVEL_DEFAULT + { + res = ZSTD_CCtx_setParameter(cstream, + ZSTD_c_compressionLevel, compress->level); + if (ZSTD_isError(res)) + fatal("could not set compression level: %s", + ZSTD_getErrorName(res)); + } + + if (compress->zstdlong) + { + res = ZSTD_CCtx_setParameter(cstream, + ZSTD_c_enableLongDistanceMatching, 1); // XXX: allow to disable it + if (ZSTD_isError(res)) + fatal("could not set compression parameter: %s", + ZSTD_getErrorName(res)); + } + + return cstream; +} + +static void +InitCompressorZstd(CompressorState *cs, Compress *compress) +{ + cs->u.zstd.cstream = ZstdCStreamParams(compress); + /* XXX: initialize safely like the corresponding zlib "paranoia" */ cs->u.zstd.output.size = ZSTD_CStreamOutSize(); cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size); @@ -295,6 +278,8 @@ EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) { ZSTD_outBuffer *output = &cs->u.zstd.output; + + for (;;) { size_t res; @@ -318,11 +303,16 @@ EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) } static void -DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen) { ZSTD_inBuffer *input = &cs->u.zstd.input; ZSTD_outBuffer *output = &cs->u.zstd.output; + input->src = (void *) unconstify(char *, data); + input->size = dLen; + input->pos = 0; + while (input->pos != input->size) { size_t res; @@ -349,16 +339,6 @@ DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs) } } -static void -WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) -{ - cs->u.zstd.input.src = (void *) unconstify(char *, data); - cs->u.zstd.input.size = dLen; - cs->u.zstd.input.pos = 0; - DeflateCompressorZstd(AH, cs); -} - /* Read data from a compressed zstd archive */ static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF) @@ -436,7 +416,7 @@ ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF) */ static void -InitCompressorZlib(CompressorState *cs, int level) +InitCompressorZlib(CompressorState *cs, Compress *compress) { z_streamp zp; @@ -453,7 +433,7 @@ InitCompressorZlib(CompressorState *cs, int level) cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); cs->zlibOutSize = ZLIB_OUT_SIZE; - if (deflateInit(zp, level) != Z_OK) + if (deflateInit(zp, compress->level) != Z_OK) fatal("could not initialize compression library: %s", zp->msg); @@ -676,35 +656,31 @@ free_keep_errno(void *p) * Open a file for reading. 'path' is the file to open, and 'mode' should * be either "r" or "rb". * - * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path' - * doesn't already have it) and try again. So if you pass "foo" as 'path', + * If the file at 'path' does not exist, we search with compressed suffix (if 'path' + * doesn't already have one) and try again. So if you pass "foo" as 'path', * this will open either "foo" or "foo.gz". * * On failure, return NULL with an error code in errno. */ cfp * -cfopen_read(const char *path, const char *mode, int compression) +cfopen_read(const char *path, const char *mode, Compress *compression) { cfp *fp; -#ifdef HAVE_LIBZ if (hasSuffix(path, ".gz") || hasSuffix(path, ".zst")) fp = cfopen(path, mode, compression); else -#endif { fp = cfopen(path, mode, compression); -#ifdef HAVE_LIBZ if (fp == NULL) { char *fname; - char *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz"; + const char *suffix = compress_suffix(compression); - fname = psprintf("%s.%s", path, suffix); + fname = psprintf("%s%s", path, suffix); fp = cfopen(fname, mode, compression); free_keep_errno(fname); } -#endif } return fp; } @@ -714,32 +690,26 @@ cfopen_read(const char *path, const char *mode, int compression) * be a filemode as accepted by fopen() and gzopen() that indicates writing * ("w", "wb", "a", or "ab"). * - * If 'compression' is non-zero, a gzip compressed stream is opened, and - * 'compression' indicates the compression level used. The ".gz" suffix - * is automatically added to 'path' in that case. + * Use compression if specified. + * The appropriate suffix is automatically added to 'path' in that case. * * On failure, return NULL with an error code in errno. */ cfp * -cfopen_write(const char *path, const char *mode, int compression) +cfopen_write(const char *path, const char *mode, Compress *compression) { cfp *fp; - if (compression == 0) + if (compression->alg == COMPR_ALG_NONE) fp = cfopen(path, mode, compression); else { -#ifdef HAVE_LIBZ // XXX char *fname; - char *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz"; + const char *suffix = compress_suffix(compression); - fname = psprintf("%s.%s", path, suffix); + fname = psprintf("%s%s", path, suffix); fp = cfopen(fname, mode, compression); free_keep_errno(fname); -#else - fatal("not built with zlib support"); - fp = NULL; /* keep compiler quiet */ -#endif } return fp; @@ -752,7 +722,7 @@ cfopen_write(const char *path, const char *mode, int compression) * On failure, return NULL with an error code in errno. */ cfp * -cfopen(const char *path, const char *mode, int compression) +cfopen(const char *path, const char *mode, Compress *compression) { cfp *fp = pg_malloc(sizeof(cfp)); @@ -764,18 +734,17 @@ cfopen(const char *path, const char *mode, int compression) fp->zstd.fp = NULL; #endif - switch (compression) + switch (compression->alg) { #ifdef HAVE_LIBZ - case 1 ... 9: // XXX: nonportable - case Z_DEFAULT_COMPRESSION: - if (compression != Z_DEFAULT_COMPRESSION) + case COMPR_ALG_LIBZ: + if (compression->level != Z_DEFAULT_COMPRESSION) { /* user has specified a compression level, so tell zlib to use it */ char mode_compression[32]; snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression); + mode, compression->level); fp->compressedfp = gzopen(path, mode_compression); } else @@ -795,9 +764,8 @@ cfopen(const char *path, const char *mode, int compression) #endif #ifdef HAVE_LIBZSTD - case ZSTD_COMPRESSION: + case COMPR_ALG_ZSTD: fp->zstd.fp = fopen(path, mode); - // XXX: save the compression params if (fp->zstd.fp == NULL) { free_keep_errno(fp); @@ -806,16 +774,14 @@ cfopen(const char *path, const char *mode, int compression) else if (strchr(mode, 'w')) { fp->zstd.dstream = NULL; - fp->zstd.output.size = ZSTD_CStreamOutSize(); // XXX + fp->zstd.output.size = ZSTD_CStreamOutSize(); fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); - fp->zstd.cstream = ZSTD_createCStream(); - if (fp->zstd.cstream == NULL) - fatal("could not initialize compression library"); + fp->zstd.cstream = ZstdCStreamParams(compression); } else if (strchr(mode, 'r')) { fp->zstd.cstream = NULL; - fp->zstd.input.size = ZSTD_DStreamOutSize(); // XXX + fp->zstd.input.size = ZSTD_DStreamInSize(); fp->zstd.input.src = pg_malloc0(fp->zstd.input.size); fp->zstd.dstream = ZSTD_createDStream(); if (fp->zstd.dstream == NULL) @@ -825,15 +791,19 @@ cfopen(const char *path, const char *mode, int compression) break; #endif - default: + case COMPR_ALG_NONE: fp->uncompressedfp = fopen(path, mode); if (fp->uncompressedfp == NULL) { free_keep_errno(fp); fp = NULL; } - return fp; + break; + + default: + /* should not happen */ + fatal("requested compression not available in this installation"); } } @@ -944,7 +914,7 @@ cfwrite(const void *ptr, int size, cfp *fp) else #endif - return fwrite(ptr, 1, size, fp->uncompressedfp); + return fwrite(ptr, 1, size, fp->uncompressedfp); } int @@ -1005,7 +975,7 @@ cfgets(cfp *fp, char *buf, int len) buf[res] = 0; if (strchr(buf, '\n')) *strchr(buf, '\n') = '\0'; - return buf; + return res > 0 ? buf : 0; } else #endif @@ -1122,5 +1092,22 @@ hasSuffix(const char *filename, const char *suffix) suffix, suffixlen) == 0; } - #endif + +/* + * Return a string for the given AH's compression. + * The string is statically allocated. + */ +const char * +compress_suffix(Compress *compression) +{ + switch (compression->alg) + { + case COMPR_ALG_LIBZ: + return ".gz"; + case COMPR_ALG_ZSTD: + return ".zst"; + default: + return ""; + } +} diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index f0ce06f176..2c073676eb 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -21,13 +21,6 @@ #define ZLIB_OUT_SIZE 4096 #define ZLIB_IN_SIZE 4096 -typedef enum -{ - COMPR_ALG_NONE, - COMPR_ALG_LIBZ, - COMPR_ALG_ZSTD, -} CompressionAlgorithm; - /* Prototype for callback function to WriteDataToArchive() */ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); @@ -47,8 +40,8 @@ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); /* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; -extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF); -extern void ReadDataFromArchive(ArchiveHandle *AH, int compression, +extern CompressorState *AllocateCompressor(Compress *compression, WriteFunc writeF); +extern void ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF); extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen); @@ -57,9 +50,9 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); typedef struct cfp cfp; -extern cfp *cfopen(const char *path, const char *mode, int compression); -extern cfp *cfopen_read(const char *path, const char *mode, int compression); -extern cfp *cfopen_write(const char *path, const char *mode, int compression); +extern cfp *cfopen(const char *path, const char *mode, Compress *compression); +extern cfp *cfopen_read(const char *path, const char *mode, Compress *compression); +extern cfp *cfopen_write(const char *path, const char *mode, Compress *compression); extern int cfread(void *ptr, int size, cfp *fp); extern int cfwrite(const void *ptr, int size, cfp *fp); extern int cfgetc(cfp *fp); @@ -68,4 +61,7 @@ extern int cfclose(cfp *fp); extern int cfeof(cfp *fp); extern const char *get_cfp_error(cfp *fp); +/* also used by tar */ +extern const char * compress_suffix(Compress *compression); + #endif diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 9d0056a569..1c1954dd1a 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -72,6 +72,21 @@ typedef struct _connParams char *override_dbname; } ConnParams; +typedef enum +{ + COMPR_ALG_DEFAULT = -1, + COMPR_ALG_NONE, + COMPR_ALG_LIBZ, + COMPR_ALG_ZSTD, +} CompressionAlgorithm; + +typedef struct Compress { + CompressionAlgorithm alg; + int level; // XXX: default + bool zstdlong; +} Compress; + + typedef struct _restoreOptions { int createDB; /* Issue commands to create the database */ @@ -125,7 +140,7 @@ typedef struct _restoreOptions int noDataForFailedTables; int exit_on_error; - int compression; + Compress compression; int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; @@ -281,7 +296,7 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); /* Create a new archive */ extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker); /* The --list option */ diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 1f82c6499b..2f1d59ce7a 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -70,7 +70,7 @@ typedef struct _parallelReadyList static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr); static void _getObjectDescription(PQExpBuffer buf, TocEntry *te); static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData); @@ -98,7 +98,7 @@ static int _discoverArchiveFormat(ArchiveHandle *AH); static int RestoringToDB(ArchiveHandle *AH); static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); -static void SetOutput(ArchiveHandle *AH, const char *filename, int compression); +static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compress); static OutputContext SaveOutput(ArchiveHandle *AH); static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); @@ -238,7 +238,7 @@ setupRestoreWorker(Archive *AHX) /* Public */ Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker) { @@ -253,7 +253,9 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker); + Compress compress = {0}; + ArchiveHandle *AH = _allocAH(FileSpec, fmt, &compress, true, archModeRead, + setupRestoreWorker); return (Archive *) AH; } @@ -382,7 +384,7 @@ RestoreArchive(Archive *AHX) * Make sure we won't need (de)compression we haven't got */ #ifndef HAVE_LIBZ - if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + if (AH->compression.alg != COMPR_ALG_NONE && AH->PrintTocDataPtr != NULL) { for (te = AH->toc->next; te != AH->toc; te = te->next) { @@ -457,8 +459,8 @@ RestoreArchive(Archive *AHX) * Setup the output file if necessary. */ sav = SaveOutput(AH); - if (ropt->filename || ropt->compression) - SetOutput(AH, ropt->filename, ropt->compression); + if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE) + SetOutput(AH, ropt->filename, &ropt->compression); ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n"); @@ -738,7 +740,7 @@ RestoreArchive(Archive *AHX) */ AH->stage = STAGE_FINALIZING; - if (ropt->filename || ropt->compression) + if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE) RestoreOutput(AH, sav); if (ropt->useDB) @@ -1121,10 +1123,11 @@ PrintTOCSummary(Archive *AHX) OutputContext sav; const char *fmtName; char stamp_str[64]; + Compress nocompression = {0}; sav = SaveOutput(AH); if (ropt->filename) - SetOutput(AH, ropt->filename, 0 /* no compression */ ); + SetOutput(AH, ropt->filename, &nocompression); if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT, localtime(&AH->createDate)) == 0) @@ -1133,7 +1136,7 @@ PrintTOCSummary(Archive *AHX) ahprintf(AH, ";\n; Archive created at %s\n", stamp_str); ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n", sanitize_line(AH->archdbname, false), - AH->tocCount, AH->compression); + AH->tocCount, AH->compression.alg); switch (AH->format) { @@ -1487,7 +1490,7 @@ archprintf(Archive *AH, const char *fmt,...) *******************************/ static void -SetOutput(ArchiveHandle *AH, const char *filename, int compression) +SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression) { int fn; @@ -1510,12 +1513,12 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression) /* If compression explicitly requested, use gzopen */ #ifdef HAVE_LIBZ - if (compression != 0) + if (compression->alg != COMPR_ALG_NONE) { char fmode[14]; /* Don't use PG_BINARY_x since this is zlib */ - sprintf(fmode, "wb%d", compression); + sprintf(fmode, "wb%d", compression->level); if (fn >= 0) AH->OF = gzdopen(dup(fn), fmode); else @@ -2259,7 +2262,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; @@ -2310,7 +2313,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->toc->prev = AH->toc; AH->mode = mode; - AH->compression = compression; + AH->compression = *compression; AH->dosync = dosync; memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); @@ -2325,7 +2328,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, * Force stdin/stdout into binary mode if that is what we are using. */ #ifdef WIN32 - if ((fmt != archNull || compression != 0) && + if ((fmt != archNull || compression->alg != COMPR_ALG_NONE) && (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)) { if (mode == archModeWrite) @@ -3741,7 +3744,7 @@ WriteHead(ArchiveHandle *AH) AH->WriteBytePtr(AH, AH->intSize); AH->WriteBytePtr(AH, AH->offSize); AH->WriteBytePtr(AH, AH->format); - WriteInt(AH, AH->compression); + WriteInt(AH, AH->compression.alg); crtm = *localtime(&AH->createDate); WriteInt(AH, crtm.tm_sec); WriteInt(AH, crtm.tm_min); @@ -3816,15 +3819,15 @@ ReadHead(ArchiveHandle *AH) if (AH->version >= K_VERS_1_2) { if (AH->version < K_VERS_1_4) - AH->compression = AH->ReadBytePtr(AH); + AH->compression.alg = AH->ReadBytePtr(AH); else - AH->compression = ReadInt(AH); + AH->compression.alg = ReadInt(AH); } else - AH->compression = Z_DEFAULT_COMPRESSION; + AH->compression.alg = Z_DEFAULT_COMPRESSION; #ifndef HAVE_LIBZ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available"); #endif diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 1a229ebedb..641ba2d043 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -47,8 +47,6 @@ #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s)) #define GZREAD(p, s, n, fh) fread(p, s, n, fh) #define GZEOF(fh) feof(fh) -/* this is just the redefinition of a libz constant */ -#define Z_DEFAULT_COMPRESSION (-1) typedef struct _z_stream { @@ -62,7 +60,6 @@ typedef z_stream *z_streamp; #ifdef HAVE_LIBZSTD #include <zstd.h> -#define ZSTD_COMPRESSION -2 #endif /* HAVE_LIBZSTD */ /* Data block types */ @@ -334,13 +331,7 @@ struct _archiveHandle DumpId *tableDataId; /* TABLE DATA ids, indexed by table dumpId */ struct _tocEntry *currToc; /* Used when dumping data */ - int compression; /*--------- - * Compression requested on open - * Possible values for compression: - * -2 ZSTD_COMPRESSION - * -1 Z_DEFAULT_COMPRESSION - * 0 COMPRESSION_NONE - * 1-9 levels for gzip compression */ + Compress compression; /* Compression requested on open */ bool dosync; /* data requested to be synced on sight */ ArchiveMode mode; /* File mode - r or w */ void *formatData; /* Header data specific to file format */ diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 77d402c323..55a887a236 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc); } /* @@ -377,7 +377,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc); } /* @@ -566,7 +566,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); + ReadDataFromArchive(AH, _CustomReadFunc); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 6e55cb425e..dfa36d5dc1 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R, AH->compression); + tocFH = cfopen_read(fname, PG_BINARY_R, &AH->compression); if (tocFH == NULL) fatal("could not open input file \"%s\": %m", fname); @@ -327,7 +327,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression); if (ctx->dataFH == NULL) fatal("could not open output file \"%s\": %m", fname); } @@ -388,12 +388,12 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R, AH->compression); + cfp = cfopen_read(filename, PG_BINARY_R, &AH->compression); if (!cfp) fatal("could not open input file \"%s\": %m", filename); - buf = pg_malloc(ZLIB_OUT_SIZE); + buf = pg_malloc(ZLIB_OUT_SIZE); // XXX buflen = ZLIB_OUT_SIZE; while ((cnt = cfread(buf, buflen, cfp))) @@ -435,12 +435,13 @@ _LoadBlobs(ArchiveHandle *AH) lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; char line[MAXPGPATH]; + Compress nocompression = {0}; StartRestoreBlobs(AH); setFilePath(AH, fname, "blobs.toc"); - ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, AH->compression); + ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, &nocompression); if (ctx->blobsTocFH == NULL) fatal("could not open large object TOC file \"%s\" for input: %m", @@ -573,6 +574,7 @@ _CloseArchive(ArchiveHandle *AH) { cfp *tocFH; char fname[MAXPGPATH]; + Compress nocompression = {0}; setFilePath(AH, fname, "toc.dat"); @@ -580,7 +582,7 @@ _CloseArchive(ArchiveHandle *AH) ctx->pstate = ParallelBackupStart(AH); /* The TOC is always created uncompressed */ - tocFH = cfopen_write(fname, PG_BINARY_W, 0); + tocFH = cfopen_write(fname, PG_BINARY_W, &nocompression); if (tocFH == NULL) fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -639,11 +641,12 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; + Compress nocompression = {0}; setFilePath(AH, fname, "blobs.toc"); /* The blob TOC file is never compressed */ - ctx->blobsTocFH = cfopen_write(fname, "ab", 0); + ctx->blobsTocFH = cfopen_write(fname, "ab", &nocompression); if (ctx->blobsTocFH == NULL) fatal("could not open output file \"%s\": %m", fname); } @@ -661,7 +664,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression); if (ctx->dataFH == NULL) fatal("could not open output file \"%s\": %m", fname); diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index 54e708875c..5c9d17bae7 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -39,6 +39,7 @@ #include "pg_backup_archiver.h" #include "pg_backup_tar.h" #include "pg_backup_utils.h" +#include "compress_io.h" #include "pgtar.h" static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te); @@ -196,10 +197,10 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) /* * We don't support compression because reading the files back is not - * possible since gzdopen uses buffered IO which totally screws file + * possible since gzdopen uses buffered IO which totally screws file XXX * positioning. */ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) fatal("compression is not supported by tar archive format"); } else @@ -254,14 +255,8 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te) ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); if (te->dataDumper != NULL) { -#ifdef HAVE_LIBZ - if (AH->compression == 0) - sprintf(fn, "%d.dat", te->dumpId); - else - sprintf(fn, "%d.dat.gz", te->dumpId); -#else - sprintf(fn, "%d.dat", te->dumpId); -#endif + const char *suffix = compress_suffix(&AH->compression); + sprintf(fn, "%d.dat%s", te->dumpId, suffix); ctx->filename = pg_strdup(fn); } else @@ -352,7 +347,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) #ifdef HAVE_LIBZ - if (AH->compression == 0) + if (AH->compression.alg == COMPR_ALG_NONE) tm->nFH = ctx->tarFH; else fatal("compression is not supported by tar archive format"); @@ -413,9 +408,9 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) #ifdef HAVE_LIBZ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) { - sprintf(fmode, "wb%d", AH->compression); + sprintf(fmode, "wb%d", AH->compression.level); tm->zFH = gzdopen(dup(fileno(tm->tmpFH)), fmode); if (tm->zFH == NULL) fatal("could not open temporary file"); @@ -443,7 +438,7 @@ tarClose(ArchiveHandle *AH, TAR_MEMBER *th) /* * Close the GZ file since we dup'd. This will flush the buffers. */ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) if (GZCLOSE(th->zFH) != 0) fatal("could not close tar member"); @@ -868,7 +863,7 @@ _CloseArchive(ArchiveHandle *AH) memcpy(ropt, AH->public.ropt, sizeof(RestoreOptions)); ropt->filename = NULL; ropt->dropSchema = 1; - ropt->compression = 0; + ropt->compression.alg = COMPR_ALG_NONE; ropt->superuser = NULL; ropt->suppressDumpWarnings = true; @@ -952,16 +947,12 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) lclContext *ctx = (lclContext *) AH->formatData; lclTocEntry *tctx = (lclTocEntry *) te->formatData; char fname[255]; - char *sfx; + const char *sfx; if (oid == 0) fatal("invalid OID for large object (%u)", oid); - if (AH->compression != 0) - sfx = ".gz"; - else - sfx = ""; - + sfx = compress_suffix(&AH->compression); sprintf(fname, "blob_%u.dat%s", oid, sfx); tarPrintf(ctx->blobToc, "%u %s\n", oid, fname); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index f21eb021c7..5f80d05716 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -59,6 +59,7 @@ #include "getopt_long.h" #include "libpq/libpq-fs.h" #include "parallel.h" +#include "compress_io.h" #include "pg_backup_db.h" #include "pg_backup_utils.h" #include "pg_dump.h" @@ -297,6 +298,111 @@ static void setupDumpWorker(Archive *AHX); static TableInfo *getRootTableInfo(TableInfo *tbinfo); +/* Parse the string into compression options */ +static void +parse_compression(const char *optarg, Compress *compress) +{ + if (optarg[0] == '0' && optarg[1] == '\0') + compress->alg = COMPR_ALG_NONE; + else if ((optarg[0] > '0' && optarg[0] <= '9') || + optarg[0] == '-') + { + compress->alg = COMPR_ALG_LIBZ; + compress->level = atoi(optarg); + if (optarg[1] != '\0') + { + pg_log_error("compression level must be in range 0..9"); + exit_nicely(1); + } + } + else + { + /* Parse a more flexible string like level=3 alg=zlib opts=long */ + for (;;) + { + char *eq = strchr(optarg, '='); + int len; + + if (eq == NULL) + { + pg_log_error("compression options must be key=value: %s", optarg); + exit_nicely(1); + } + + len = eq - optarg; + if (strncmp(optarg, "alg", len) == 0) + { + if (strchr(eq, ' ')) + len = strchr(eq, ' ') - eq - 1; + else + len = strlen(eq) - len; + if (strncmp(1+eq, "zlib", len) == 0 || + strncmp(1+eq, "libz", len) == 0) + compress->alg = COMPR_ALG_LIBZ; + else if (strncmp(1+eq, "zstd", len) == 0) + compress->alg = COMPR_ALG_ZSTD; + else + { + pg_log_error("unknown compression algorithm: %s", 1+eq); + exit_nicely(1); + } + } + else if (strncmp(optarg, "level", len) == 0) + compress->level = atoi(1+eq); + else if (strncmp(optarg, "opt", len) == 0) + { + if (strchr(eq, ' ')) + len = strchr(eq, ' ') - eq - 1; + else + len = strlen(eq) - len; + if (strncmp(1+eq, "zstdlong", len) == 0) + compress->zstdlong = true; + else + { + pg_log_error("unknown compression option: %s", 1+eq); + exit_nicely(1); + } + } + else + { + pg_log_error("unknown compression setting: %s", optarg); + exit_nicely(1); + } + + optarg = strchr(eq, ' '); + if (!optarg++) + break; + } + + /* zstd will check its own compression level later */ + if (compress->alg != COMPR_ALG_ZSTD) + { + if (compress->level < 0 || compress->level > 9) + { + pg_log_error("compression level must be in range 0..9"); + exit_nicely(1); + } + if (compress->zstdlong) + { + pg_log_error("compression option not supported with this algorithm"); + exit_nicely(1); + } + } + + /* XXX: 0 means "unset" */ + if (compress->level == 0) + { + const int default_compress_level[] = { + 0, /* COMPR_ALG_NONE */ + Z_DEFAULT_COMPRESSION, /* COMPR_ALG_ZLIB */ + ZSTD_CLEVEL_DEFAULT, /* COMPR_ALG_ZSTD */ + }; + + compress->level = default_compress_level[compress->alg]; + } + } +} + int main(int argc, char **argv) { @@ -319,7 +425,7 @@ main(int argc, char **argv) char *use_role = NULL; long rowsPerInsert; int numWorkers = 1; - int compressLevel = -1; + Compress compress = { .alg = COMPR_ALG_DEFAULT }; int plainText = 0; ArchiveFormat archiveFormat = archUnknown; ArchiveMode archiveMode; @@ -532,13 +638,7 @@ main(int argc, char **argv) break; case 'Z': /* Compression Level */ - compressLevel = atoi(optarg); - if ((compressLevel < 0 || compressLevel > 9) && - compressLevel != -2) - { - pg_log_error("compression level must be in range 0..9"); - exit_nicely(1); - } + parse_compression(optarg, &compress); break; case 0: @@ -680,20 +780,40 @@ main(int argc, char **argv) plainText = 1; /* Custom and directory formats are compressed by default, others not */ - if (compressLevel == -1) + if (compress.alg == COMPR_ALG_DEFAULT) { -#ifdef HAVE_LIBZ if (archiveFormat == archCustom || archiveFormat == archDirectory) - compressLevel = Z_DEFAULT_COMPRESSION; - else + { +#ifdef HAVE_LIBZ + compress.alg = COMPR_ALG_LIBZ; + compress.level = Z_DEFAULT_COMPRESSION; +#endif +#ifdef HAVE_LIBZSTD + compress.alg = COMPR_ALG_ZSTD; // Set default for testing purposes + compress.level = ZSTD_CLEVEL_DEFAULT; #endif - compressLevel = 0; + } + else + { + compress.alg = COMPR_ALG_NONE; + compress.level = 0; + } } #ifndef HAVE_LIBZ - if (compressLevel != 0) + if (compress.alg == COMPR_ALG_LIBZ) + { pg_log_warning("requested compression not available in this installation -- archive will be uncompressed"); - compressLevel = 0; + compress.alg = 0; + } +#endif + +#ifndef HAVE_LIBZ + if (compress.alg == COMPR_ALG_ZSTD) + { + pg_log_warning("requested compression not available in this installation -- archive will be uncompressed"); + compress.alg = 0; + } #endif /* @@ -724,7 +844,7 @@ main(int argc, char **argv) fatal("option --index-collation-versions-unknown only works in binary upgrade mode"); /* Open the output file */ - fout = CreateArchive(filename, archiveFormat, compressLevel, dosync, + fout = CreateArchive(filename, archiveFormat, &compress, dosync, archiveMode, setupDumpWorker); /* Make dump options accessible right away */ @@ -958,10 +1078,7 @@ main(int argc, char **argv) ropt->sequence_data = dopt.sequence_data; ropt->binary_upgrade = dopt.binary_upgrade; - if (compressLevel == -1) - ropt->compression = 0; - else - ropt->compression = compressLevel; + ropt->compression = compress; ropt->suppressDumpWarnings = true; /* We've already shown them */ -- 2.17.0
>From bc6bc514d784da4cab7262c8386b0b8ed29ed02f Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Mon, 21 Dec 2020 00:11:43 -0600 Subject: [PATCH 5/7] union{} with a CompressionAlgorithm alg --- src/bin/pg_dump/compress_io.c | 218 ++++++++++++++++------------------ 1 file changed, 105 insertions(+), 113 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 421b8e04cb..06005c3fba 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -619,23 +619,27 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, */ struct cfp { - FILE *uncompressedfp; + CompressionAlgorithm alg; + + union { + FILE *fp; + #ifdef HAVE_LIBZ - gzFile compressedfp; + gzFile gzfp; #endif -#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg? - /* This is a normal file to which we read/write compressed data */ - struct { - FILE *fp; - // XXX: use one separate ZSTD_CStream per thread: disable on windows ? - ZSTD_CStream *cstream; - ZSTD_DStream *dstream; - ZSTD_outBuffer output; - ZSTD_inBuffer input; - } zstd; +#ifdef HAVE_LIBZSTD + struct { + /* This is a normal file to which we read/write compressed data */ + FILE *fp; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + } zstd; #endif - + } u; }; #ifdef HAVE_LIBZ @@ -726,13 +730,7 @@ cfopen(const char *path, const char *mode, Compress *compression) { cfp *fp = pg_malloc(sizeof(cfp)); - fp->uncompressedfp = NULL; -#ifdef HAVE_LIBZ - fp->compressedfp = NULL; -#endif -#ifdef HAVE_LIBZSTD - fp->zstd.fp = NULL; -#endif + fp->alg = compression->alg; switch (compression->alg) { @@ -745,15 +743,15 @@ cfopen(const char *path, const char *mode, Compress *compression) snprintf(mode_compression, sizeof(mode_compression), "%s%d", mode, compression->level); - fp->compressedfp = gzopen(path, mode_compression); + fp->u.gzfp = gzopen(path, mode_compression); } else { /* don't specify a level, just use the zlib default */ - fp->compressedfp = gzopen(path, mode); + fp->u.gzfp = gzopen(path, mode); } - if (fp->compressedfp == NULL) + if (fp->u.gzfp == NULL) { free_keep_errno(fp); fp = NULL; @@ -765,26 +763,26 @@ cfopen(const char *path, const char *mode, Compress *compression) #ifdef HAVE_LIBZSTD case COMPR_ALG_ZSTD: - fp->zstd.fp = fopen(path, mode); - if (fp->zstd.fp == NULL) + fp->u.zstd.fp = fopen(path, mode); + if (fp->u.zstd.fp == NULL) { free_keep_errno(fp); fp = NULL; } else if (strchr(mode, 'w')) { - fp->zstd.dstream = NULL; - fp->zstd.output.size = ZSTD_CStreamOutSize(); - fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); - fp->zstd.cstream = ZstdCStreamParams(compression); + fp->u.zstd.dstream = NULL; + fp->u.zstd.output.size = ZSTD_CStreamOutSize(); + fp->u.zstd.output.dst = pg_malloc0(fp->u.zstd.output.size); + fp->u.zstd.cstream = ZstdCStreamParams(compression); } else if (strchr(mode, 'r')) { - fp->zstd.cstream = NULL; - fp->zstd.input.size = ZSTD_DStreamInSize(); - fp->zstd.input.src = pg_malloc0(fp->zstd.input.size); - fp->zstd.dstream = ZSTD_createDStream(); - if (fp->zstd.dstream == NULL) + fp->u.zstd.cstream = NULL; + fp->u.zstd.input.size = ZSTD_DStreamInSize(); + fp->u.zstd.input.src = pg_malloc0(fp->u.zstd.input.size); + fp->u.zstd.dstream = ZSTD_createDStream(); + if (fp->u.zstd.dstream == NULL) fatal("could not initialize compression library"); } // XXX else: bad mode return fp; @@ -792,8 +790,8 @@ cfopen(const char *path, const char *mode, Compress *compression) #endif case COMPR_ALG_NONE: - fp->uncompressedfp = fopen(path, mode); - if (fp->uncompressedfp == NULL) + fp->u.fp = fopen(path, mode); + if (fp->u.fp == NULL) { free_keep_errno(fp); fp = NULL; @@ -817,26 +815,26 @@ cfread(void *ptr, int size, cfp *fp) return 0; #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - ret = gzread(fp->compressedfp, ptr, size); - if (ret != size && !gzeof(fp->compressedfp)) + ret = gzread(fp->u.gzfp, ptr, size); + if (ret != size && !gzeof(fp->u.gzfp)) { int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + const char *errmsg = gzerror(fp->u.gzfp, &errnum); fatal("could not read from input file: %s", errnum == Z_ERRNO ? strerror(errno) : errmsg); } + return ret; } - else #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; size_t input_size = ZSTD_DStreamInSize(); size_t res, cnt; @@ -845,7 +843,7 @@ cfread(void *ptr, int size, cfp *fp) output->pos = 0; /* read compressed data */ - while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp))) + while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->u.zstd.fp))) { input->size = cnt; input->pos = 0; @@ -853,7 +851,7 @@ cfread(void *ptr, int size, cfp *fp) for ( ; input->pos < input->size; ) { /* decompress */ - res = ZSTD_decompressStream(fp->zstd.dstream, output, input); + res = ZSTD_decompressStream(fp->u.zstd.dstream, output, input); if (res == 0 || output->pos == output->size) break; if (ZSTD_isError(res)) @@ -864,16 +862,13 @@ cfread(void *ptr, int size, cfp *fp) break; /* We read all the data that fits */ } - ret = output->pos; + return output->pos; } - else #endif - { - ret = fread(ptr, 1, size, fp->uncompressedfp); - if (ret != size && !feof(fp->uncompressedfp)) - READ_ERROR_EXIT(fp->uncompressedfp); - } + ret = fread(ptr, 1, size, fp->u.fp); + if (ret != size && !feof(fp->u.fp)) + READ_ERROR_EXIT(fp->u.fp); return ret; } @@ -881,16 +876,16 @@ int cfwrite(const void *ptr, int size, cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzwrite(fp->compressedfp, ptr, size); - else + if (fp->alg == COMPR_ALG_LIBZ) + return gzwrite(fp->u.gzfp, ptr, size); #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { size_t res, cnt; - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; input->src = ptr; input->size = size; @@ -900,21 +895,20 @@ cfwrite(const void *ptr, int size, cfp *fp) while (input->pos != input->size) { output->pos = 0; - res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue); + res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_continue); if (ZSTD_isError(res)) fatal("could not compress data: %s", ZSTD_getErrorName(res)); - cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp); if (cnt != output->pos) fatal("could not write data: %s", strerror(errno)); } return size; } - else #endif - return fwrite(ptr, 1, size, fp->uncompressedfp); + return fwrite(ptr, 1, size, fp->u.fp); } int @@ -923,39 +917,38 @@ cfgetc(cfp *fp) int ret; #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - ret = gzgetc(fp->compressedfp); + ret = gzgetc(fp->u.gzfp); if (ret == EOF) { - if (!gzeof(fp->compressedfp)) + if (!gzeof(fp->u.gzfp)) fatal("could not read from input file: %s", strerror(errno)); else fatal("could not read from input file: end of file"); } + return ret; } - else #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { if (cfread(&ret, 1, fp) != 1) { - if (feof(fp->zstd.fp)) + if (feof(fp->u.zstd.fp)) fatal("could not read from input file: end of file"); else fatal("could not read from input file: %s", strerror(errno)); } fprintf(stderr, "cfgetc %d\n", ret); + return ret; } #endif - { - ret = fgetc(fp->uncompressedfp); - if (ret == EOF) - READ_ERROR_EXIT(fp->uncompressedfp); - } - + ret = fgetc(fp->u.fp); + if (ret == EOF) + READ_ERROR_EXIT(fp->u.fp); return ret; } @@ -963,12 +956,12 @@ char * cfgets(cfp *fp, char *buf, int len) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzgets(fp->compressedfp, buf, len); - else + if (fp->alg == COMPR_ALG_LIBZ) + return gzgets(fp->u.gzfp, buf, len); #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { int res; res = cfread(buf, len, fp); @@ -977,9 +970,9 @@ cfgets(cfp *fp, char *buf, int len) *strchr(buf, '\n') = '\0'; return res > 0 ? buf : 0; } - else #endif - return fgets(buf, len, fp->uncompressedfp); + + return fgets(buf, len, fp->u.fp); } int @@ -993,56 +986,55 @@ cfclose(cfp *fp) return EOF; } #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - result = gzclose(fp->compressedfp); - fp->compressedfp = NULL; + result = gzclose(fp->u.gzfp); + fp->u.gzfp = NULL; + return result; } - else #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; size_t res, cnt; - if (fp->zstd.cstream) + if (fp->u.zstd.cstream) { for (;;) { output->pos = 0; - res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end); + res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_end); if (ZSTD_isError(res)) fatal("could not compress data: %s", ZSTD_getErrorName(res)); - cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp); if (cnt != output->pos) fatal("could not write data: %s", strerror(errno)); if (res == 0) break; } - ZSTD_freeCStream(fp->zstd.cstream); - pg_free(fp->zstd.output.dst); + ZSTD_freeCStream(fp->u.zstd.cstream); + pg_free(fp->u.zstd.output.dst); } - if (fp->zstd.dstream) + if (fp->u.zstd.dstream) { - ZSTD_freeDStream(fp->zstd.dstream); - pg_free(unconstify(void *, fp->zstd.input.src)); + ZSTD_freeDStream(fp->u.zstd.dstream); + pg_free(unconstify(void *, fp->u.zstd.input.src)); } - result = fclose(fp->zstd.fp); - fp->zstd.fp = NULL; + result = fclose(fp->u.zstd.fp); + fp->u.zstd.fp = NULL; + return result; } - else #endif - { - result = fclose(fp->uncompressedfp); - fp->uncompressedfp = NULL; - } - free_keep_errno(fp); + result = fclose(fp->u.fp); + fp->u.fp = NULL; + free_keep_errno(fp); return result; } @@ -1050,26 +1042,26 @@ int cfeof(cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzeof(fp->compressedfp); - else + if (fp->alg == COMPR_ALG_LIBZ) + return gzeof(fp->u.gzfp); #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) - return feof(fp->zstd.fp); - else + if (fp->alg == COMPR_ALG_ZSTD) + return feof(fp->u.zstd.fp); #endif - return feof(fp->uncompressedfp); + + return feof(fp->u.fp); } const char * get_cfp_error(cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + const char *errmsg = gzerror(fp->u.gzfp, &errnum); if (errnum != Z_ERRNO) return errmsg; -- 2.17.0
>From 15cd65b4d650009cf18f48357e9dfe9173d32446 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Fri, 11 Dec 2020 22:22:31 -0600 Subject: [PATCH 6/7] Move zlib into the union{} --- src/bin/pg_dump/compress_io.c | 54 ++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 06005c3fba..a94efbea4e 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -67,23 +67,25 @@ struct CompressorState CompressionAlgorithm comprAlg; WriteFunc writeF; + union { #ifdef HAVE_LIBZ - z_streamp zp; - char *zlibOut; - size_t zlibOutSize; + struct { + z_streamp zp; + char *zlibOut; + size_t zlibOutSize; + } zlib; #endif #ifdef HAVE_LIBZSTD - union { + /* This is used for compression but not decompression */ struct { - ZSTD_outBuffer output; - ZSTD_inBuffer input; // XXX: use one separate ZSTD_CStream per thread: disable on windows ? - ZSTD_CStream *cstream; + ZSTD_CStream *cstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; } zstd; - } u; #endif - + } u; }; /* Routines that support zlib compressed data I/O */ @@ -420,7 +422,7 @@ InitCompressorZlib(CompressorState *cs, Compress *compress) { z_streamp zp; - zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp = cs->u.zlib.zp = (z_streamp) pg_malloc(sizeof(z_stream)); zp->zalloc = Z_NULL; zp->zfree = Z_NULL; zp->opaque = Z_NULL; @@ -430,22 +432,22 @@ InitCompressorZlib(CompressorState *cs, Compress *compress) * actually allocate one extra byte because some routines want to append a * trailing zero byte to the zlib output. */ - cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); - cs->zlibOutSize = ZLIB_OUT_SIZE; + cs->u.zlib.zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + cs->u.zlib.zlibOutSize = ZLIB_OUT_SIZE; if (deflateInit(zp, compress->level) != Z_OK) fatal("could not initialize compression library: %s", zp->msg); /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) cs->zlibOut; - zp->avail_out = cs->zlibOutSize; + zp->next_out = (void *) cs->u.zlib.zlibOut; + zp->avail_out = cs->u.zlib.zlibOutSize; } static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) { - z_streamp zp = cs->zp; + z_streamp zp = cs->u.zlib.zp; zp->next_in = NULL; zp->avail_in = 0; @@ -456,23 +458,23 @@ EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) if (deflateEnd(zp) != Z_OK) fatal("could not close compression stream: %s", zp->msg); - free(cs->zlibOut); - free(cs->zp); + free(cs->u.zlib.zlibOut); + free(cs->u.zlib.zp); } static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) { - z_streamp zp = cs->zp; - char *out = cs->zlibOut; + z_streamp zp = cs->u.zlib.zp; + char *out = cs->u.zlib.zlibOut; int res = Z_OK; - while (cs->zp->avail_in != 0 || flush) + while (cs->u.zlib.zp->avail_in != 0 || flush) { res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); if (res == Z_STREAM_ERROR) fatal("could not compress data: %s", zp->msg); - if ((flush && (zp->avail_out < cs->zlibOutSize)) + if ((flush && (zp->avail_out < cs->u.zlib.zlibOutSize)) || (zp->avail_out == 0) || (zp->avail_in != 0) ) @@ -482,18 +484,18 @@ DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) * chunk is the EOF marker in the custom format. This should never * happen but... */ - if (zp->avail_out < cs->zlibOutSize) + if (zp->avail_out < cs->u.zlib.zlibOutSize) { /* * Any write function should do its own error checking but to * make sure we do a check here as well... */ - size_t len = cs->zlibOutSize - zp->avail_out; + size_t len = cs->u.zlib.zlibOutSize - zp->avail_out; cs->writeF(AH, out, len); } zp->next_out = (void *) out; - zp->avail_out = cs->zlibOutSize; + zp->avail_out = cs->u.zlib.zlibOutSize; } if (res == Z_STREAM_END) @@ -505,8 +507,8 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen) { - cs->zp->next_in = (void *) unconstify(char *, data); - cs->zp->avail_in = dLen; + cs->u.zlib.zp->next_in = (void *) unconstify(char *, data); + cs->u.zlib.zp->avail_in = dLen; DeflateCompressorZlib(AH, cs, false); } -- 2.17.0