On Mon, Dec 21, 2020 at 01:49:24PM -0600, Justin Pryzby wrote: > a big disadvantage of piping through zstd is that it's not identified as a > PGDMP file, and, /usr/bin/file on centos7 fails to even identify zstd by its > magic number..
Other reasons are that pg_dump |zstd >output.zst loses the exit status of pg_dump, and that it's not "transparent" (one needs to type "zstd -dq |pg_restore"). On Mon, Dec 21, 2020 at 08:32:35PM -0600, Justin Pryzby wrote: > On Mon, Dec 21, 2020 at 03:02:40PM -0500, Tom Lane wrote: > > Justin Pryzby <pry...@telsasoft.com> writes: > > > I found that our largest tables are 40% smaller and 20% faster to pipe > > > pg_dump -Fc -Z0 |zstd relative to native zlib > > > > The patch might be a tad smaller if you hadn't included a core file in it. > > About 89% smaller. > > This also fixes the extension (.zst) > And fixes zlib default compression. > And a bunch of cleanup. I rebased so the "typedef struct compression" patch is first and zstd on top of that (say, in case someone wants to bikeshed about which compression algorithm to support). And made a central struct with all the compression-specific info to further isolate the compress-specific changes. And handle compression of "plain" archive format. And fix compilation for MSVC and make --without-zstd the default. And fix cfgets() (which I think is actually unused code for the code paths for compressed FP). And add fix for pre-existing problem: ftello() on unseekable input. I also started a patch to allow compression of "tar" format, but I didn't include that here yet. Note, there's currently several "compression" patches in CF app. This patch seems to be independent of the others, but probably shouldn't be totally uncoordinated (like adding lz4 in one and ztsd in another might be poor execution). https://commitfest.postgresql.org/31/2897/ - Faster pglz compression https://commitfest.postgresql.org/31/2813/ - custom compression methods for toast https://commitfest.postgresql.org/31/2773/ - libpq compression -- Justin
>From d2fc2673e19a95629edfe9a0f4ead75e1f1f2754 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Wed, 23 Dec 2020 23:56:54 -0600 Subject: [PATCH 01/20] fix!preeexisting --- src/bin/pg_dump/compress_io.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 1417401086..6a428978d4 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -28,7 +28,7 @@ * The interface for reading an archive consists of just one function: * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the - * compressed data chunk at a time, and ReadDataFromArchive decompresses it + * compressed data one chunk at a time, and ReadDataFromArchive decompresses it * and passes the decompressed data to ahwrite(), until ReadFunc returns 0 * to signal EOF. * -- 2.17.0
>From e8bb61dd633aefb2cc7a14887a15cc60e05cd9c5 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Tue, 22 Dec 2020 15:40:08 -0600 Subject: [PATCH 02/20] Fix broken error message on unseekable input.. pg_dump -Fd -f dir.dir cat dir.dir/toc.dat | pg_restore -l # I realize this isn't how it's intended to be used pg_restore: error: corrupt tar header found in PGDMP (expected 0, computed 18577) file position 18446744073709551615 See also 929c69aa19 --- src/bin/pg_dump/pg_backup_tar.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index 54e708875c..61c9c87a9f 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -1280,12 +1280,14 @@ _tarGetHeader(ArchiveHandle *AH, TAR_MEMBER *th) if (chk != sum) { - char posbuf[32]; + off_t off = ftello(ctx->tarFH); - snprintf(posbuf, sizeof(posbuf), UINT64_FORMAT, - (uint64) ftello(ctx->tarFH)); - fatal("corrupt tar header found in %s (expected %d, computed %d) file position %s", - tag, sum, chk, posbuf); + if (off == -1) + fatal("corrupt tar header found in %s (expected %d, computed %d)", + tag, sum, chk); + else + fatal("corrupt tar header found in %s (expected %d, computed %d) file position " UINT64_FORMAT, + tag, sum, chk, off); } th->targetFile = pg_strdup(tag); -- 2.17.0
>From bbf91d13a25bcb2f0718f40984362f81741ca66a Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Tue, 22 Dec 2020 00:23:43 -0600 Subject: [PATCH 03/20] Support multiple compression algs/levels/opts.. The existing implementation abtracts compressed and noncompressed I/O. This preliminary commit is intended to also allow for multiple compression algorithms. --- src/bin/pg_dump/compress_io.c | 220 +++++++++++--------------- src/bin/pg_dump/compress_io.h | 19 +-- src/bin/pg_dump/pg_backup.h | 23 ++- src/bin/pg_dump/pg_backup_archiver.c | 45 +++--- src/bin/pg_dump/pg_backup_archiver.h | 12 +- src/bin/pg_dump/pg_backup_custom.c | 6 +- src/bin/pg_dump/pg_backup_directory.c | 17 +- src/bin/pg_dump/pg_backup_tar.c | 33 ++-- src/bin/pg_dump/pg_dump.c | 115 +++++++++++--- 9 files changed, 272 insertions(+), 218 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 6a428978d4..db16fd33f2 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -74,12 +74,9 @@ struct CompressorState #endif }; -static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, - int *level); - /* Routines that support zlib compressed data I/O */ #ifdef HAVE_LIBZ -static void InitCompressorZlib(CompressorState *cs, int level); +static void InitCompressorZlib(CompressorState *cs, Compress *compress); static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush); static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); @@ -93,58 +90,36 @@ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen); -/* - * Interprets a numeric 'compression' value. The algorithm implied by the - * value (zlib or none at the moment), is returned in *alg, and the - * zlib compression level in *level. - */ -static void -ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level) -{ - if (compression == Z_DEFAULT_COMPRESSION || - (compression > 0 && compression <= 9)) - *alg = COMPR_ALG_LIBZ; - else if (compression == 0) - *alg = COMPR_ALG_NONE; - else - { - fatal("invalid compression code: %d", compression); - *alg = COMPR_ALG_NONE; /* keep compiler quiet */ - } - - /* The level is just the passed-in value. */ - if (level) - *level = compression; -} - /* Public interface routines */ /* Allocate a new compressor */ CompressorState * -AllocateCompressor(int compression, WriteFunc writeF) +AllocateCompressor(Compress *compression, WriteFunc writeF) { CompressorState *cs; - CompressionAlgorithm alg; - int level; - - ParseCompressionOption(compression, &alg, &level); - -#ifndef HAVE_LIBZ - if (alg == COMPR_ALG_LIBZ) - fatal("not built with zlib support"); -#endif cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); cs->writeF = writeF; - cs->comprAlg = alg; + cs->comprAlg = compression->alg; /* * Perform compression algorithm specific initialization. */ + Assert (compression->alg != COMPR_ALG_DEFAULT); + switch (compression->alg) + { #ifdef HAVE_LIBZ - if (alg == COMPR_ALG_LIBZ) - InitCompressorZlib(cs, level); + case COMPR_ALG_LIBZ: + InitCompressorZlib(cs, compression); + break; #endif + case COMPR_ALG_NONE: + /* Do nothing */ + break; + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); + } return cs; } @@ -154,21 +129,21 @@ AllocateCompressor(int compression, WriteFunc writeF) * out with ahwrite(). */ void -ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF) +ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF) { - CompressionAlgorithm alg; - - ParseCompressionOption(compression, &alg, NULL); - - if (alg == COMPR_ALG_NONE) - ReadDataFromArchiveNone(AH, readF); - if (alg == COMPR_ALG_LIBZ) + switch (AH->compression.alg) { + case COMPR_ALG_NONE: + ReadDataFromArchiveNone(AH, readF); + break; #ifdef HAVE_LIBZ + case COMPR_ALG_LIBZ: ReadDataFromArchiveZlib(AH, readF); -#else - fatal("not built with zlib support"); + break; #endif + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); } } @@ -181,16 +156,18 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, { switch (cs->comprAlg) { - case COMPR_ALG_LIBZ: #ifdef HAVE_LIBZ + case COMPR_ALG_LIBZ: WriteDataToArchiveZlib(AH, cs, data, dLen); -#else - fatal("not built with zlib support"); -#endif break; +#endif case COMPR_ALG_NONE: WriteDataToArchiveNone(AH, cs, data, dLen); break; + + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); } } @@ -215,7 +192,7 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs) */ static void -InitCompressorZlib(CompressorState *cs, int level) +InitCompressorZlib(CompressorState *cs, Compress *compress) { z_streamp zp; @@ -232,7 +209,7 @@ InitCompressorZlib(CompressorState *cs, int level) cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); cs->zlibOutSize = ZLIB_OUT_SIZE; - if (deflateInit(zp, level) != Z_OK) + if (deflateInit(zp, compress->level) != Z_OK) fatal("could not initialize compression library: %s", zp->msg); @@ -424,9 +401,7 @@ struct cfp #endif }; -#ifdef HAVE_LIBZ static int hasSuffix(const char *filename, const char *suffix); -#endif /* free() without changing errno; useful in several places below */ static void @@ -442,34 +417,31 @@ free_keep_errno(void *p) * Open a file for reading. 'path' is the file to open, and 'mode' should * be either "r" or "rb". * - * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path' - * doesn't already have it) and try again. So if you pass "foo" as 'path', + * If the file at 'path' does not exist, we search with compressed suffix (if 'path' + * doesn't already have one) and try again. So if you pass "foo" as 'path', * this will open either "foo" or "foo.gz". * * On failure, return NULL with an error code in errno. */ cfp * -cfopen_read(const char *path, const char *mode) +cfopen_read(const char *path, const char *mode, Compress *compression) { cfp *fp; -#ifdef HAVE_LIBZ if (hasSuffix(path, ".gz")) - fp = cfopen(path, mode, 1); + fp = cfopen(path, mode, compression); else -#endif { - fp = cfopen(path, mode, 0); -#ifdef HAVE_LIBZ + fp = cfopen(path, mode, compression); if (fp == NULL) { char *fname; + const char *suffix = compress_suffix(compression); - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, 1); + fname = psprintf("%s%s", path, suffix); + fp = cfopen(fname, mode, compression); free_keep_errno(fname); } -#endif } return fp; } @@ -479,31 +451,26 @@ cfopen_read(const char *path, const char *mode) * be a filemode as accepted by fopen() and gzopen() that indicates writing * ("w", "wb", "a", or "ab"). * - * If 'compression' is non-zero, a gzip compressed stream is opened, and - * 'compression' indicates the compression level used. The ".gz" suffix - * is automatically added to 'path' in that case. + * Use compression if specified. + * The appropriate suffix is automatically added to 'path' in that case. * * On failure, return NULL with an error code in errno. */ cfp * -cfopen_write(const char *path, const char *mode, int compression) +cfopen_write(const char *path, const char *mode, Compress *compression) { cfp *fp; - if (compression == 0) - fp = cfopen(path, mode, 0); + if (compression->alg == COMPR_ALG_NONE) + fp = cfopen(path, mode, compression); else { -#ifdef HAVE_LIBZ char *fname; + const char *suffix = compress_suffix(compression); - fname = psprintf("%s.gz", path); + fname = psprintf("%s%s", path, suffix); fp = cfopen(fname, mode, compression); free_keep_errno(fname); -#else - fatal("not built with zlib support"); - fp = NULL; /* keep compiler quiet */ -#endif } return fp; } @@ -515,20 +482,21 @@ cfopen_write(const char *path, const char *mode, int compression) * On failure, return NULL with an error code in errno. */ cfp * -cfopen(const char *path, const char *mode, int compression) +cfopen(const char *path, const char *mode, Compress *compression) { - cfp *fp = pg_malloc(sizeof(cfp)); + cfp *fp = pg_malloc0(sizeof(cfp)); - if (compression != 0) + switch (compression->alg) { #ifdef HAVE_LIBZ - if (compression != Z_DEFAULT_COMPRESSION) + case COMPR_ALG_LIBZ: + if (compression->level != Z_DEFAULT_COMPRESSION) { /* user has specified a compression level, so tell zlib to use it */ char mode_compression[32]; snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression); + mode, compression->level); fp->compressedfp = gzopen(path, mode_compression); } else @@ -537,30 +505,27 @@ cfopen(const char *path, const char *mode, int compression) fp->compressedfp = gzopen(path, mode); } - fp->uncompressedfp = NULL; if (fp->compressedfp == NULL) { free_keep_errno(fp); fp = NULL; } -#else - fatal("not built with zlib support"); -#endif - } - else - { -#ifdef HAVE_LIBZ - fp->compressedfp = NULL; + return fp; #endif + + case COMPR_ALG_NONE: fp->uncompressedfp = fopen(path, mode); if (fp->uncompressedfp == NULL) { free_keep_errno(fp); fp = NULL; } - } + return fp; - return fp; + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); + } } @@ -584,14 +549,13 @@ cfread(void *ptr, int size, cfp *fp) fatal("could not read from input file: %s", errnum == Z_ERRNO ? strerror(errno) : errmsg); } + return ret; } - else #endif - { - ret = fread(ptr, 1, size, fp->uncompressedfp); - if (ret != size && !feof(fp->uncompressedfp)) - READ_ERROR_EXIT(fp->uncompressedfp); - } + + ret = fread(ptr, 1, size, fp->uncompressedfp); + if (ret != size && !feof(fp->uncompressedfp)) + READ_ERROR_EXIT(fp->uncompressedfp); return ret; } @@ -601,9 +565,8 @@ cfwrite(const void *ptr, int size, cfp *fp) #ifdef HAVE_LIBZ if (fp->compressedfp) return gzwrite(fp->compressedfp, ptr, size); - else #endif - return fwrite(ptr, 1, size, fp->uncompressedfp); + return fwrite(ptr, 1, size, fp->uncompressedfp); } int @@ -622,15 +585,12 @@ cfgetc(cfp *fp) else fatal("could not read from input file: end of file"); } + return ret; } - else #endif - { - ret = fgetc(fp->uncompressedfp); - if (ret == EOF) - READ_ERROR_EXIT(fp->uncompressedfp); - } - + ret = fgetc(fp->uncompressedfp); + if (ret == EOF) + READ_ERROR_EXIT(fp->uncompressedfp); return ret; } @@ -640,9 +600,8 @@ cfgets(cfp *fp, char *buf, int len) #ifdef HAVE_LIBZ if (fp->compressedfp) return gzgets(fp->compressedfp, buf, len); - else #endif - return fgets(buf, len, fp->uncompressedfp); + return fgets(buf, len, fp->uncompressedfp); } int @@ -660,15 +619,13 @@ cfclose(cfp *fp) { result = gzclose(fp->compressedfp); fp->compressedfp = NULL; + return result; } - else #endif - { - result = fclose(fp->uncompressedfp); - fp->uncompressedfp = NULL; - } - free_keep_errno(fp); + result = fclose(fp->uncompressedfp); + fp->uncompressedfp = NULL; + free_keep_errno(fp); return result; } @@ -678,9 +635,9 @@ cfeof(cfp *fp) #ifdef HAVE_LIBZ if (fp->compressedfp) return gzeof(fp->compressedfp); - else #endif - return feof(fp->uncompressedfp); + + return feof(fp->uncompressedfp); } const char * @@ -699,7 +656,6 @@ get_cfp_error(cfp *fp) return strerror(errno); } -#ifdef HAVE_LIBZ static int hasSuffix(const char *filename, const char *suffix) { @@ -714,4 +670,18 @@ hasSuffix(const char *filename, const char *suffix) suffixlen) == 0; } -#endif +/* + * Return a string for the given AH's compression. + * The string is statically allocated. + */ +const char * +compress_suffix(Compress *compression) +{ + switch (compression->alg) + { + case COMPR_ALG_LIBZ: + return ".gz"; + default: + return ""; + } +} diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index d2e6e1b854..2c073676eb 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -21,12 +21,6 @@ #define ZLIB_OUT_SIZE 4096 #define ZLIB_IN_SIZE 4096 -typedef enum -{ - COMPR_ALG_NONE, - COMPR_ALG_LIBZ -} CompressionAlgorithm; - /* Prototype for callback function to WriteDataToArchive() */ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); @@ -46,8 +40,8 @@ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); /* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; -extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF); -extern void ReadDataFromArchive(ArchiveHandle *AH, int compression, +extern CompressorState *AllocateCompressor(Compress *compression, WriteFunc writeF); +extern void ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF); extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen); @@ -56,9 +50,9 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); typedef struct cfp cfp; -extern cfp *cfopen(const char *path, const char *mode, int compression); -extern cfp *cfopen_read(const char *path, const char *mode); -extern cfp *cfopen_write(const char *path, const char *mode, int compression); +extern cfp *cfopen(const char *path, const char *mode, Compress *compression); +extern cfp *cfopen_read(const char *path, const char *mode, Compress *compression); +extern cfp *cfopen_write(const char *path, const char *mode, Compress *compression); extern int cfread(void *ptr, int size, cfp *fp); extern int cfwrite(const void *ptr, int size, cfp *fp); extern int cfgetc(cfp *fp); @@ -67,4 +61,7 @@ extern int cfclose(cfp *fp); extern int cfeof(cfp *fp); extern const char *get_cfp_error(cfp *fp); +/* also used by tar */ +extern const char * compress_suffix(Compress *compression); + #endif diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 9d0056a569..f2390b7937 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -72,6 +72,25 @@ typedef struct _connParams char *override_dbname; } ConnParams; +typedef enum +{ + COMPR_ALG_DEFAULT = -1, + COMPR_ALG_NONE, + COMPR_ALG_LIBZ, +} CompressionAlgorithm; +/* Should be called "method" or "library" ? */ + +typedef struct Compress { + CompressionAlgorithm alg; + int level; + /* Is a nondefault level set ? This is useful since different compression + * methods have different "default" levels. For now we assume the levels + * are all integer, though. + */ + bool level_set; +} Compress; + + typedef struct _restoreOptions { int createDB; /* Issue commands to create the database */ @@ -125,7 +144,7 @@ typedef struct _restoreOptions int noDataForFailedTables; int exit_on_error; - int compression; + Compress compression; int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; @@ -281,7 +300,7 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); /* Create a new archive */ extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker); /* The --list option */ diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 1f82c6499b..3eb6c55600 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -70,7 +70,7 @@ typedef struct _parallelReadyList static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr); static void _getObjectDescription(PQExpBuffer buf, TocEntry *te); static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData); @@ -98,7 +98,7 @@ static int _discoverArchiveFormat(ArchiveHandle *AH); static int RestoringToDB(ArchiveHandle *AH); static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); -static void SetOutput(ArchiveHandle *AH, const char *filename, int compression); +static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compress); static OutputContext SaveOutput(ArchiveHandle *AH); static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); @@ -238,7 +238,7 @@ setupRestoreWorker(Archive *AHX) /* Public */ Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker) { @@ -253,7 +253,9 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker); + Compress compress = {0}; + ArchiveHandle *AH = _allocAH(FileSpec, fmt, &compress, true, archModeRead, + setupRestoreWorker); return (Archive *) AH; } @@ -382,7 +384,7 @@ RestoreArchive(Archive *AHX) * Make sure we won't need (de)compression we haven't got */ #ifndef HAVE_LIBZ - if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + if (AH->compression.alg != COMPR_ALG_NONE && AH->PrintTocDataPtr != NULL) { for (te = AH->toc->next; te != AH->toc; te = te->next) { @@ -457,8 +459,8 @@ RestoreArchive(Archive *AHX) * Setup the output file if necessary. */ sav = SaveOutput(AH); - if (ropt->filename || ropt->compression) - SetOutput(AH, ropt->filename, ropt->compression); + if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE) + SetOutput(AH, ropt->filename, &ropt->compression); ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n"); @@ -738,7 +740,7 @@ RestoreArchive(Archive *AHX) */ AH->stage = STAGE_FINALIZING; - if (ropt->filename || ropt->compression) + if (ropt->filename || ropt->compression.alg != COMPR_ALG_NONE) RestoreOutput(AH, sav); if (ropt->useDB) @@ -1123,8 +1125,9 @@ PrintTOCSummary(Archive *AHX) char stamp_str[64]; sav = SaveOutput(AH); + Assert(ropt->compression.alg == COMPR_ALG_NONE); if (ropt->filename) - SetOutput(AH, ropt->filename, 0 /* no compression */ ); + SetOutput(AH, ropt->filename, &ropt->compression); if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT, localtime(&AH->createDate)) == 0) @@ -1133,7 +1136,7 @@ PrintTOCSummary(Archive *AHX) ahprintf(AH, ";\n; Archive created at %s\n", stamp_str); ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n", sanitize_line(AH->archdbname, false), - AH->tocCount, AH->compression); + AH->tocCount, AH->compression.alg); switch (AH->format) { @@ -1487,7 +1490,7 @@ archprintf(Archive *AH, const char *fmt,...) *******************************/ static void -SetOutput(ArchiveHandle *AH, const char *filename, int compression) +SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression) { int fn; @@ -1510,12 +1513,12 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression) /* If compression explicitly requested, use gzopen */ #ifdef HAVE_LIBZ - if (compression != 0) + if (compression->alg != COMPR_ALG_NONE) { char fmode[14]; /* Don't use PG_BINARY_x since this is zlib */ - sprintf(fmode, "wb%d", compression); + sprintf(fmode, "wb%d", compression->level); if (fn >= 0) AH->OF = gzdopen(dup(fn), fmode); else @@ -2259,7 +2262,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, bool dosync, ArchiveMode mode, + Compress *compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; @@ -2310,7 +2313,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->toc->prev = AH->toc; AH->mode = mode; - AH->compression = compression; + AH->compression = *compression; AH->dosync = dosync; memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); @@ -2325,7 +2328,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, * Force stdin/stdout into binary mode if that is what we are using. */ #ifdef WIN32 - if ((fmt != archNull || compression != 0) && + if ((fmt != archNull || compression->alg != COMPR_ALG_NONE) && (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)) { if (mode == archModeWrite) @@ -3741,7 +3744,7 @@ WriteHead(ArchiveHandle *AH) AH->WriteBytePtr(AH, AH->intSize); AH->WriteBytePtr(AH, AH->offSize); AH->WriteBytePtr(AH, AH->format); - WriteInt(AH, AH->compression); + WriteInt(AH, AH->compression.alg); crtm = *localtime(&AH->createDate); WriteInt(AH, crtm.tm_sec); WriteInt(AH, crtm.tm_min); @@ -3816,15 +3819,15 @@ ReadHead(ArchiveHandle *AH) if (AH->version >= K_VERS_1_2) { if (AH->version < K_VERS_1_4) - AH->compression = AH->ReadBytePtr(AH); + AH->compression.alg = AH->ReadBytePtr(AH); else - AH->compression = ReadInt(AH); + AH->compression.alg = ReadInt(AH); } else - AH->compression = Z_DEFAULT_COMPRESSION; + AH->compression.alg = Z_DEFAULT_COMPRESSION; #ifndef HAVE_LIBZ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available"); #endif diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index a8ea5c7eae..6e033d040e 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -47,7 +47,8 @@ #define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s)) #define GZREAD(p, s, n, fh) fread(p, s, n, fh) #define GZEOF(fh) feof(fh) -/* this is just the redefinition of a libz constant */ +/* this is just the redefinition of a libz constant, in case zlib isn't + * available */ #define Z_DEFAULT_COMPRESSION (-1) typedef struct _z_stream @@ -329,14 +330,7 @@ struct _archiveHandle DumpId *tableDataId; /* TABLE DATA ids, indexed by table dumpId */ struct _tocEntry *currToc; /* Used when dumping data */ - int compression; /*--------- - * Compression requested on open(). - * Possible values for compression: - * -1 Z_DEFAULT_COMPRESSION - * 0 COMPRESSION_NONE - * 1-9 levels for gzip compression - *--------- - */ + Compress compression; /* Compression requested on open */ bool dosync; /* data requested to be synced on sight */ ArchiveMode mode; /* File mode - r or w */ void *formatData; /* Header data specific to file format */ diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 77d402c323..55a887a236 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc); } /* @@ -377,7 +377,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression, _CustomWriteFunc); } /* @@ -566,7 +566,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); + ReadDataFromArchive(AH, _CustomReadFunc); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 650b542fce..8bf869c6ca 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = cfopen_read(fname, PG_BINARY_R, &AH->compression); if (tocFH == NULL) fatal("could not open input file \"%s\": %m", fname); @@ -327,7 +327,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression); if (ctx->dataFH == NULL) fatal("could not open output file \"%s\": %m", fname); } @@ -388,7 +388,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); + cfp = cfopen_read(filename, PG_BINARY_R, &AH->compression); if (!cfp) fatal("could not open input file \"%s\": %m", filename); @@ -435,12 +435,13 @@ _LoadBlobs(ArchiveHandle *AH) lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; char line[MAXPGPATH]; + Compress nocompression = {0}; StartRestoreBlobs(AH); setFilePath(AH, fname, "blobs.toc"); - ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R); + ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, &nocompression); if (ctx->blobsTocFH == NULL) fatal("could not open large object TOC file \"%s\" for input: %m", @@ -573,6 +574,7 @@ _CloseArchive(ArchiveHandle *AH) { cfp *tocFH; char fname[MAXPGPATH]; + Compress nocompression = {0}; setFilePath(AH, fname, "toc.dat"); @@ -580,7 +582,7 @@ _CloseArchive(ArchiveHandle *AH) ctx->pstate = ParallelBackupStart(AH); /* The TOC is always created uncompressed */ - tocFH = cfopen_write(fname, PG_BINARY_W, 0); + tocFH = cfopen_write(fname, PG_BINARY_W, &nocompression); if (tocFH == NULL) fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -639,11 +641,12 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; char fname[MAXPGPATH]; + Compress nocompression = {0}; setFilePath(AH, fname, "blobs.toc"); /* The blob TOC file is never compressed */ - ctx->blobsTocFH = cfopen_write(fname, "ab", 0); + ctx->blobsTocFH = cfopen_write(fname, "ab", &nocompression); if (ctx->blobsTocFH == NULL) fatal("could not open output file \"%s\": %m", fname); } @@ -661,7 +664,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression); if (ctx->dataFH == NULL) fatal("could not open output file \"%s\": %m", fname); diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index 61c9c87a9f..4ba79ab924 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -39,6 +39,7 @@ #include "pg_backup_archiver.h" #include "pg_backup_tar.h" #include "pg_backup_utils.h" +#include "compress_io.h" #include "pgtar.h" static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te); @@ -196,10 +197,10 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) /* * We don't support compression because reading the files back is not - * possible since gzdopen uses buffered IO which totally screws file + * possible since gzdopen uses buffered IO which totally screws file XXX * positioning. */ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) fatal("compression is not supported by tar archive format"); } else @@ -254,14 +255,8 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te) ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); if (te->dataDumper != NULL) { -#ifdef HAVE_LIBZ - if (AH->compression == 0) - sprintf(fn, "%d.dat", te->dumpId); - else - sprintf(fn, "%d.dat.gz", te->dumpId); -#else - sprintf(fn, "%d.dat", te->dumpId); -#endif + const char *suffix = compress_suffix(&AH->compression); + sprintf(fn, "%d.dat%s", te->dumpId, suffix); ctx->filename = pg_strdup(fn); } else @@ -352,7 +347,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) #ifdef HAVE_LIBZ - if (AH->compression == 0) + if (AH->compression.alg == COMPR_ALG_NONE) tm->nFH = ctx->tarFH; else fatal("compression is not supported by tar archive format"); @@ -413,9 +408,9 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) #ifdef HAVE_LIBZ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) { - sprintf(fmode, "wb%d", AH->compression); + sprintf(fmode, "wb%d", AH->compression.level); tm->zFH = gzdopen(dup(fileno(tm->tmpFH)), fmode); if (tm->zFH == NULL) fatal("could not open temporary file"); @@ -443,7 +438,7 @@ tarClose(ArchiveHandle *AH, TAR_MEMBER *th) /* * Close the GZ file since we dup'd. This will flush the buffers. */ - if (AH->compression != 0) + if (AH->compression.alg != COMPR_ALG_NONE) if (GZCLOSE(th->zFH) != 0) fatal("could not close tar member"); @@ -868,7 +863,7 @@ _CloseArchive(ArchiveHandle *AH) memcpy(ropt, AH->public.ropt, sizeof(RestoreOptions)); ropt->filename = NULL; ropt->dropSchema = 1; - ropt->compression = 0; + ropt->compression.alg = COMPR_ALG_NONE; ropt->superuser = NULL; ropt->suppressDumpWarnings = true; @@ -952,16 +947,12 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) lclContext *ctx = (lclContext *) AH->formatData; lclTocEntry *tctx = (lclTocEntry *) te->formatData; char fname[255]; - char *sfx; + const char *sfx; if (oid == 0) fatal("invalid OID for large object (%u)", oid); - if (AH->compression != 0) - sfx = ".gz"; - else - sfx = ""; - + sfx = compress_suffix(&AH->compression); sprintf(fname, "blob_%u.dat%s", oid, sfx); tarPrintf(ctx->blobToc, "%u %s\n", oid, fname); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 1ab98a2286..4cbc79aedc 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -59,6 +59,7 @@ #include "getopt_long.h" #include "libpq/libpq-fs.h" #include "parallel.h" +#include "compress_io.h" #include "pg_backup_db.h" #include "pg_backup_utils.h" #include "pg_dump.h" @@ -297,6 +298,82 @@ static void setupDumpWorker(Archive *AHX); static TableInfo *getRootTableInfo(TableInfo *tbinfo); +/* Parse the string into compression options */ +static void +parse_compression(const char *optarg, Compress *compress) +{ + if (optarg[0] == '0' && optarg[1] == '\0') + compress->alg = COMPR_ALG_NONE; + else if ((optarg[0] > '0' && optarg[0] <= '9') || + optarg[0] == '-') + { + compress->alg = COMPR_ALG_LIBZ; + compress->level_set = true; + compress->level = atoi(optarg); + if (optarg[1] != '\0') + { + pg_log_error("compression level must be in range 0..9"); + exit_nicely(1); + } + } + else + { + /* Parse a more flexible string like level=3 alg=zlib opts=long */ + for (;;) + { + char *eq = strchr(optarg, '='); + int len; + + if (eq == NULL) + { + pg_log_error("compression options must be key=value: %s", optarg); + exit_nicely(1); + } + + len = eq - optarg; + if (strncmp(optarg, "alg", len) == 0) + { + if (strchr(eq, ' ')) + len = strchr(eq, ' ') - eq - 1; + else + len = strlen(eq) - len; + if (strncmp(1+eq, "zlib", len) == 0 || + strncmp(1+eq, "libz", len) == 0) + compress->alg = COMPR_ALG_LIBZ; + else + { + pg_log_error("unknown compression algorithm: %s", 1+eq); + exit_nicely(1); + } + } + else if (strncmp(optarg, "level", len) == 0) + { + compress->level = atoi(1+eq); + compress->level_set = true; + } + else + { + pg_log_error("unknown compression setting: %s", optarg); + exit_nicely(1); + } + + optarg = strchr(eq, ' '); + if (!optarg++) + break; + } + + if (!compress->level_set) + { + const int default_compress_level[] = { + 0, /* COMPR_ALG_NONE */ + Z_DEFAULT_COMPRESSION, /* COMPR_ALG_ZLIB */ + }; + + compress->level = default_compress_level[compress->alg]; + } + } +} + int main(int argc, char **argv) { @@ -319,7 +396,7 @@ main(int argc, char **argv) char *use_role = NULL; long rowsPerInsert; int numWorkers = 1; - int compressLevel = -1; + Compress compress = { .alg = COMPR_ALG_DEFAULT }; int plainText = 0; ArchiveFormat archiveFormat = archUnknown; ArchiveMode archiveMode; @@ -532,12 +609,7 @@ main(int argc, char **argv) break; case 'Z': /* Compression Level */ - compressLevel = atoi(optarg); - if (compressLevel < 0 || compressLevel > 9) - { - pg_log_error("compression level must be in range 0..9"); - exit_nicely(1); - } + parse_compression(optarg, &compress); break; case 0: @@ -679,20 +751,28 @@ main(int argc, char **argv) plainText = 1; /* Custom and directory formats are compressed by default, others not */ - if (compressLevel == -1) + if (compress.alg == COMPR_ALG_DEFAULT) { -#ifdef HAVE_LIBZ if (archiveFormat == archCustom || archiveFormat == archDirectory) - compressLevel = Z_DEFAULT_COMPRESSION; - else + { +#ifdef HAVE_LIBZ + compress.alg = COMPR_ALG_LIBZ; + compress.level = Z_DEFAULT_COMPRESSION; #endif - compressLevel = 0; + } + else + { + compress.alg = COMPR_ALG_NONE; + compress.level = 0; + } } #ifndef HAVE_LIBZ - if (compressLevel != 0) + if (compress.alg == COMPR_ALG_LIBZ) + { pg_log_warning("requested compression not available in this installation -- archive will be uncompressed"); - compressLevel = 0; + compress.alg = 0; + } #endif /* @@ -723,7 +803,7 @@ main(int argc, char **argv) fatal("option --index-collation-versions-unknown only works in binary upgrade mode"); /* Open the output file */ - fout = CreateArchive(filename, archiveFormat, compressLevel, dosync, + fout = CreateArchive(filename, archiveFormat, &compress, dosync, archiveMode, setupDumpWorker); /* Make dump options accessible right away */ @@ -957,10 +1037,7 @@ main(int argc, char **argv) ropt->sequence_data = dopt.sequence_data; ropt->binary_upgrade = dopt.binary_upgrade; - if (compressLevel == -1) - ropt->compression = 0; - else - ropt->compression = compressLevel; + ropt->compression = compress; ropt->suppressDumpWarnings = true; /* We've already shown them */ -- 2.17.0
>From b26d3fa15723ab38057276471942e79cf4c7789b Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Thu, 24 Dec 2020 22:08:43 -0600 Subject: [PATCH 04/20] struct compressLibs --- src/bin/pg_dump/compress_io.c | 55 +++++++++++++++++++-------- src/bin/pg_dump/compress_io.h | 7 ++++ src/bin/pg_dump/pg_backup_directory.c | 20 ++++++---- src/bin/pg_dump/pg_dump.c | 25 +++++++----- 4 files changed, 75 insertions(+), 32 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index db16fd33f2..21957d68f3 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -56,6 +56,18 @@ #include "compress_io.h" #include "pg_backup_utils.h" +const struct compressLibs +compresslibs[] = { + { COMPR_ALG_NONE, "no", "", 0, }, + { COMPR_ALG_NONE, "none", "", 0, }, /* Alternate name */ + +// #ifdef HAVE_LIBZ? + { COMPR_ALG_LIBZ, "libz", ".gz", Z_DEFAULT_COMPRESSION }, + { COMPR_ALG_LIBZ, "zlib", ".gz", Z_DEFAULT_COMPRESSION }, /* Alternate name */ + + { 0, NULL, } /* sentinel */ +}; + /*---------------------- * Compressor API *---------------------- @@ -401,7 +413,7 @@ struct cfp #endif }; -static int hasSuffix(const char *filename, const char *suffix); +static int hasSuffix(const char *filename); /* free() without changing errno; useful in several places below */ static void @@ -428,7 +440,7 @@ cfopen_read(const char *path, const char *mode, Compress *compression) { cfp *fp; - if (hasSuffix(path, ".gz")) + if (hasSuffix(path)) fp = cfopen(path, mode, compression); else { @@ -656,18 +668,29 @@ get_cfp_error(cfp *fp) return strerror(errno); } +/* Return true iff the filename has a known compression suffix */ static int -hasSuffix(const char *filename, const char *suffix) +hasSuffix(const char *filename) { - int filenamelen = strlen(filename); - int suffixlen = strlen(suffix); + for (int i = 0; compresslibs[i].name != NULL; ++i) + { + const char *suffix = compresslibs[i].suffix; + int filenamelen = strlen(filename); + int suffixlen = strlen(suffix); - if (filenamelen < suffixlen) - return 0; + /* COMPR_ALG_NONE has an empty "suffix", which doesn't count */ + if (suffixlen == 0) + continue; + + if (filenamelen < suffixlen) + continue; - return memcmp(&filename[filenamelen - suffixlen], - suffix, - suffixlen) == 0; + if (memcmp(&filename[filenamelen - suffixlen], + suffix, suffixlen) == 0) + return true; + } + + return false; } /* @@ -677,11 +700,13 @@ hasSuffix(const char *filename, const char *suffix) const char * compress_suffix(Compress *compression) { - switch (compression->alg) + for (int i = 0; compresslibs[i].name != NULL; ++i) { - case COMPR_ALG_LIBZ: - return ".gz"; - default: - return ""; + if (compression->alg != compresslibs[i].alg) + continue; + + return compresslibs[i].suffix; } + + return ""; } diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 2c073676eb..fb9d659acc 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -47,6 +47,13 @@ extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen); extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); +struct compressLibs { + const CompressionAlgorithm alg; + const char *name; /* Name in -Z alg= */ + const char *suffix; /* file extension */ + const int defaultlevel; /* Default compression level */ +}; +extern const struct compressLibs compresslibs[]; typedef struct cfp cfp; diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 8bf869c6ca..75c1bf22e4 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -768,14 +768,20 @@ _PrepParallelRestore(ArchiveHandle *AH) */ setFilePath(AH, fname, tctx->filename); - if (stat(fname, &st) == 0) - te->dataLength = st.st_size; - else + for (int i = 0; compresslibs[i].name != NULL; ++i) { - /* It might be compressed */ - strlcat(fname, ".gz", sizeof(fname)); - if (stat(fname, &st) == 0) - te->dataLength = st.st_size; + char filename[MAXPGPATH]; + int ret; + + snprintf(filename, sizeof(filename), "%s%s", fname, + compresslibs[i].suffix); + + ret = stat(fname, &st); + if (ret < 0) // && errno == ENOENT) + continue; + + te->dataLength = st.st_size; + break; } /* diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 4cbc79aedc..75985fd4d3 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -318,7 +318,7 @@ parse_compression(const char *optarg, Compress *compress) } else { - /* Parse a more flexible string like level=3 alg=zlib opts=long */ + /* Parse a more flexible string like -Z level=3 -Z alg=zlib -Z checksum=1 */ for (;;) { char *eq = strchr(optarg, '='); @@ -333,14 +333,19 @@ parse_compression(const char *optarg, Compress *compress) len = eq - optarg; if (strncmp(optarg, "alg", len) == 0) { - if (strchr(eq, ' ')) - len = strchr(eq, ' ') - eq - 1; - else - len = strlen(eq) - len; - if (strncmp(1+eq, "zlib", len) == 0 || - strncmp(1+eq, "libz", len) == 0) - compress->alg = COMPR_ALG_LIBZ; - else + len = strlen(eq) - len; + + for (int i = 0; compresslibs[i].name != NULL; ++i) + { + if (strlen(1+eq) != strlen(compresslibs[i].name)) + continue; + if (strncmp(1+eq, compresslibs[i].name, len) != 0) + continue; + compress->alg = compresslibs[i].alg; + break; + } + + if (compress->alg == COMPR_ALG_DEFAULT) { pg_log_error("unknown compression algorithm: %s", 1+eq); exit_nicely(1); @@ -363,7 +368,7 @@ parse_compression(const char *optarg, Compress *compress) } if (!compress->level_set) - { + { // XXX const int default_compress_level[] = { 0, /* COMPR_ALG_NONE */ Z_DEFAULT_COMPRESSION, /* COMPR_ALG_ZLIB */ -- 2.17.0
>From 1bac5d02bbcc9c34dcb44b358bc27f3d204bb584 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Fri, 25 Dec 2020 00:34:01 -0600 Subject: [PATCH 05/20] Use cf* abstraction in archiver and tar ..rather than direct, conditional calls to gzopen/fopen. See also: bf9aa490db24b2334b3595ee33653bf2fe39208c --- src/bin/pg_dump/compress_io.c | 53 +++++++++++++ src/bin/pg_dump/compress_io.h | 5 ++ src/bin/pg_dump/pg_backup_archiver.c | 109 +++++++++------------------ src/bin/pg_dump/pg_backup_archiver.h | 16 ++-- src/bin/pg_dump/pg_backup_tar.c | 85 +++++---------------- 5 files changed, 117 insertions(+), 151 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 21957d68f3..d66d6f60f5 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -540,6 +540,58 @@ cfopen(const char *path, const char *mode, Compress *compression) } } +/* + * Open a file descriptor, with specified compression. + * Returns an opaque cfp object. + */ +cfp * +cfdopen(int fd, const char *mode, Compress *compression) +{ + cfp *fp = pg_malloc0(sizeof(cfp)); + + switch (compression->alg) + { +#ifdef HAVE_LIBZ + case COMPR_ALG_LIBZ: + if (compression->level != Z_DEFAULT_COMPRESSION) + { + /* user has specified a compression level, so tell zlib to use it */ + char mode_compression[32]; + + snprintf(mode_compression, sizeof(mode_compression), "%s%d", + mode, compression->level); + fp->compressedfp = gzdopen(fd, mode_compression); + } + else + { + /* don't specify a level, just use the zlib default */ + fp->compressedfp = gzdopen(fd, mode); + } + + if (fp->compressedfp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } + return fp; +#endif + + case COMPR_ALG_NONE: + fp->uncompressedfp = fdopen(fd, mode); + if (fp->uncompressedfp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } + else + setvbuf(fp->uncompressedfp, NULL, _IONBF, 0); + return fp; + + default: + /* Should not happen */ + fatal("requested compression not available in this installation"); + } +} int cfread(void *ptr, int size, cfp *fp) @@ -616,6 +668,7 @@ cfgets(cfp *fp, char *buf, int len) return fgets(buf, len, fp->uncompressedfp); } +/* Close the given compressed or uncompressed stream; return 0 on success. */ int cfclose(cfp *fp) { diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index fb9d659acc..318a6b5340 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -21,6 +21,10 @@ #define ZLIB_OUT_SIZE 4096 #define ZLIB_IN_SIZE 4096 +/* Forward declaration */ +struct ArchiveHandle; +typedef struct _archiveHandle ArchiveHandle; + /* Prototype for callback function to WriteDataToArchive() */ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); @@ -58,6 +62,7 @@ extern const struct compressLibs compresslibs[]; typedef struct cfp cfp; extern cfp *cfopen(const char *path, const char *mode, Compress *compression); +extern cfp *cfdopen(int fd, const char *mode, Compress *compression); extern cfp *cfopen_read(const char *path, const char *mode, Compress *compression); extern cfp *cfopen_write(const char *path, const char *mode, Compress *compression); extern int cfread(void *ptr, int size, cfp *fp); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 3eb6c55600..bd06fbb787 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -39,17 +39,11 @@ #include "pg_backup_archiver.h" #include "pg_backup_db.h" #include "pg_backup_utils.h" +#include "compress_io.h" #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* state needed to save/restore an archive's output target */ -typedef struct _outputContext -{ - void *OF; - int gzOut; -} OutputContext; - /* * State for tracking TocEntrys that are ready to process during a parallel * restore. (This used to be a list, and we still call it that, though now @@ -99,8 +93,8 @@ static int RestoringToDB(ArchiveHandle *AH); static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compress); -static OutputContext SaveOutput(ArchiveHandle *AH); -static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); +static cfp *SaveOutput(ArchiveHandle *AH); +static void RestoreOutput(ArchiveHandle *AH, cfp *fp); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); static void restore_toc_entries_prefork(ArchiveHandle *AH, @@ -270,10 +264,8 @@ CloseArchive(Archive *AHX) AH->ClosePtr(AH); /* Close the output */ - if (AH->gzOut) - res = GZCLOSE(AH->OF); - else if (AH->OF != stdout) - res = fclose(AH->OF); + if ((FILE *)AH->OF != stdout) + res = cfclose(AH->OF); if (res != 0) fatal("could not close output file: %m"); @@ -355,7 +347,7 @@ RestoreArchive(Archive *AHX) RestoreOptions *ropt = AH->public.ropt; bool parallel_mode; TocEntry *te; - OutputContext sav; + cfp *sav; AH->stage = STAGE_INITIALIZING; @@ -1120,7 +1112,7 @@ PrintTOCSummary(Archive *AHX) RestoreOptions *ropt = AH->public.ropt; TocEntry *te; teSection curSection; - OutputContext sav; + cfp *sav; const char *fmtName; char stamp_str[64]; @@ -1492,6 +1484,7 @@ archprintf(Archive *AH, const char *fmt,...) static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression) { + char fmode[14]; int fn; if (filename) @@ -1511,38 +1504,22 @@ SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression) else fn = fileno(stdout); - /* If compression explicitly requested, use gzopen */ -#ifdef HAVE_LIBZ - if (compression->alg != COMPR_ALG_NONE) + if (fn >= 0) { - char fmode[14]; + /* Handle output to stdout */ + sprintf(fmode, "%sb%d", + AH->mode == archModeAppend ? PG_BINARY_A : PG_BINARY_W, + compression->level); - /* Don't use PG_BINARY_x since this is zlib */ - sprintf(fmode, "wb%d", compression->level); - if (fn >= 0) - AH->OF = gzdopen(dup(fn), fmode); - else - AH->OF = gzopen(filename, fmode); - AH->gzOut = 1; + AH->OF = cfdopen(dup(fn), fmode, compression); } else -#endif - { /* Use fopen */ - if (AH->mode == archModeAppend) - { - if (fn >= 0) - AH->OF = fdopen(dup(fn), PG_BINARY_A); - else - AH->OF = fopen(filename, PG_BINARY_A); - } - else - { - if (fn >= 0) - AH->OF = fdopen(dup(fn), PG_BINARY_W); - else - AH->OF = fopen(filename, PG_BINARY_W); - } - AH->gzOut = 0; + { + Assert(filename != NULL); + sprintf(fmode, "%cb%d", + AH->mode == archModeAppend ? 'a' : 'w', + compression->level); + AH->OF = cfopen(filename, fmode, compression); } if (!AH->OF) @@ -1554,32 +1531,22 @@ SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression) } } -static OutputContext +/* Return a pointer to the old FP */ +static cfp * SaveOutput(ArchiveHandle *AH) { - OutputContext sav; - - sav.OF = AH->OF; - sav.gzOut = AH->gzOut; - - return sav; + return AH->OF; } static void -RestoreOutput(ArchiveHandle *AH, OutputContext savedContext) +RestoreOutput(ArchiveHandle *AH, cfp *savedContext) { int res; - - if (AH->gzOut) - res = GZCLOSE(AH->OF); - else - res = fclose(AH->OF); - + res = cfclose(AH->OF); if (res != 0) fatal("could not close output file: %m"); - AH->gzOut = savedContext.gzOut; - AH->OF = savedContext.OF; + AH->OF = savedContext; } @@ -1703,22 +1670,14 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) bytes_written = size * nmemb; } - else if (AH->gzOut) - bytes_written = GZWRITE(ptr, size, nmemb, AH->OF); else if (AH->CustomOutPtr) bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb); - + else if (RestoringToDB(AH)) + /* If we're doing a restore, and it's direct to DB, and we're + * connected then send it to the DB. */ + bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); else - { - /* - * If we're doing a restore, and it's direct to DB, and we're - * connected then send it to the DB. - */ - if (RestoringToDB(AH)) - bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); - else - bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size; - } + bytes_written = cfwrite(ptr, size * nmemb, AH->OF); if (bytes_written != size * nmemb) WRITE_ERROR_EXIT; @@ -2127,6 +2086,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) fh = stdin; if (!fh) fatal("could not open input file: %m"); + setvbuf(fh, NULL, _IONBF, 0); } if ((cnt = fread(sig, 1, 5, fh)) != 5) @@ -2266,6 +2226,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; + Compress nocompression = {0}; pg_log_debug("allocating AH for %s, format %d", FileSpec ? FileSpec : "(stdio)", fmt); @@ -2319,8 +2280,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); /* Open stdout with no compression for AH output handle */ - AH->gzOut = 0; - AH->OF = stdout; + AH->OF = cfdopen(fileno(stdout), "w", &nocompression); + // AH->OF = cfdopen(STDOUT_FILENO, "w", compression); // XXX /* * On Windows, we need to use binary mode to read/write non-text files, diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 6e033d040e..9f511b49b9 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -30,6 +30,9 @@ #include "pg_backup.h" #include "pqexpbuffer.h" +/* Forward declaration XXX: CIRCULAR */ +typedef struct cfp cfp; + #define LOBBUFSIZE 16384 /* @@ -38,19 +41,11 @@ */ #ifdef HAVE_LIBZ #include <zlib.h> -#define GZCLOSE(fh) gzclose(fh) -#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s)) -#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s)) -#define GZEOF(fh) gzeof(fh) #else -#define GZCLOSE(fh) fclose(fh) -#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s)) -#define GZREAD(p, s, n, fh) fread(p, s, n, fh) -#define GZEOF(fh) feof(fh) + /* this is just the redefinition of a libz constant, in case zlib isn't * available */ #define Z_DEFAULT_COMPRESSION (-1) - typedef struct _z_stream { void *next_in; @@ -318,8 +313,7 @@ struct _archiveHandle char *fSpec; /* Archive File Spec */ FILE *FH; /* General purpose file handle */ - void *OF; - int gzOut; /* Output file */ + cfp *OF; /* Output file (compressed or not) */ struct _tocEntry *toc; /* Header of circular list of TOC entries */ int tocCount; /* Number of TOC entries */ diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index 4ba79ab924..16f4e0792a 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -66,12 +66,7 @@ static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); typedef struct { -#ifdef HAVE_LIBZ - gzFile zFH; -#else - FILE *zFH; -#endif - FILE *nFH; + cfp *FH; FILE *tarFH; FILE *tmpFH; char *targetFile; @@ -191,7 +186,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) * Make unbuffered since we will dup() it, and the buffers screw each * other */ - /* setvbuf(ctx->tarFH, NULL, _IONBF, 0); */ + // setvbuf(ctx->tarFH, NULL, _IONBF, 0); ctx->hasSeek = checkSeek(ctx->tarFH); @@ -223,7 +218,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) * Make unbuffered since we will dup() it, and the buffers screw each * other */ - /* setvbuf(ctx->tarFH, NULL, _IONBF, 0); */ + setvbuf(ctx->tarFH, NULL, _IONBF, 0); ctx->tarFHpos = 0; @@ -321,10 +316,6 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) lclContext *ctx = (lclContext *) AH->formatData; TAR_MEMBER *tm; -#ifdef HAVE_LIBZ - char fmode[14]; -#endif - if (mode == 'r') { tm = _tarPositionTo(AH, filename); @@ -345,16 +336,10 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) } } -#ifdef HAVE_LIBZ - if (AH->compression.alg == COMPR_ALG_NONE) - tm->nFH = ctx->tarFH; + tm->FH = cfdopen(dup(fileno(ctx->tarFH)), "rb", &AH->compression); else fatal("compression is not supported by tar archive format"); - /* tm->zFH = gzdopen(dup(fileno(ctx->tarFH)), "rb"); */ -#else - tm->nFH = ctx->tarFH; -#endif } else { @@ -406,21 +391,11 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) umask(old_umask); -#ifdef HAVE_LIBZ - - if (AH->compression.alg != COMPR_ALG_NONE) - { - sprintf(fmode, "wb%d", AH->compression.level); - tm->zFH = gzdopen(dup(fileno(tm->tmpFH)), fmode); - if (tm->zFH == NULL) - fatal("could not open temporary file"); - } - else - tm->nFH = tm->tmpFH; -#else - - tm->nFH = tm->tmpFH; -#endif + tm->FH = cfdopen(dup(fileno(tm->tmpFH)), + mode == 'r' ? "r" : "w", + &AH->compression); + if (tm->FH == NULL) + fatal("could not open temporary file"); tm->AH = AH; tm->targetFile = pg_strdup(filename); @@ -435,12 +410,14 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode) static void tarClose(ArchiveHandle *AH, TAR_MEMBER *th) { + int res; + /* * Close the GZ file since we dup'd. This will flush the buffers. */ - if (AH->compression.alg != COMPR_ALG_NONE) - if (GZCLOSE(th->zFH) != 0) - fatal("could not close tar member"); + res = cfclose(th->FH); + if (res != 0) + fatal("could not close tar member"); if (th->mode == 'w') _tarAddFile(AH, th); /* This will close the temp file */ @@ -453,8 +430,7 @@ tarClose(ArchiveHandle *AH, TAR_MEMBER *th) if (th->targetFile) free(th->targetFile); - th->nFH = NULL; - th->zFH = NULL; + th->FH = NULL; } #ifdef __NOT_USED__ @@ -540,29 +516,9 @@ _tarReadRaw(ArchiveHandle *AH, void *buf, size_t len, TAR_MEMBER *th, FILE *fh) } else if (th) { - if (th->zFH) - { - res = GZREAD(&((char *) buf)[used], 1, len, th->zFH); - if (res != len && !GZEOF(th->zFH)) - { -#ifdef HAVE_LIBZ - int errnum; - const char *errmsg = gzerror(th->zFH, &errnum); - - fatal("could not read from input file: %s", - errnum == Z_ERRNO ? strerror(errno) : errmsg); -#else - fatal("could not read from input file: %s", - strerror(errno)); -#endif - } - } - else - { - res = fread(&((char *) buf)[used], 1, len, th->nFH); - if (res != len && !feof(th->nFH)) - READ_ERROR_EXIT(th->nFH); - } + res = cfread(&((char *) buf)[used], len, th->FH); + if (res != len && !cfeof(th->FH)) + fatal("could not read from input file: %m"); } } @@ -594,10 +550,7 @@ tarWrite(const void *buf, size_t len, TAR_MEMBER *th) { size_t res; - if (th->zFH != NULL) - res = GZWRITE(buf, 1, len, th->zFH); - else - res = fwrite(buf, 1, len, th->nFH); + res = cfwrite(buf, len, th->FH); th->pos += res; return res; -- 2.17.0
>From b8eb13b5bd4114a9860cfa83f8240ab09db588b4 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Tue, 22 Dec 2020 01:06:26 -0600 Subject: [PATCH 06/20] pg_dump: zstd compression document any change in search for .gz? docs Maybe compress_io should be split so all the library-specific stuff are in separate files, like compress_{zlib/zstd}.c --- configure | 123 ++++++- configure.ac | 22 ++ src/bin/pg_dump/compress_io.c | 480 ++++++++++++++++++++++++++ src/bin/pg_dump/pg_backup.h | 14 + src/bin/pg_dump/pg_backup_archiver.h | 4 + src/bin/pg_dump/pg_backup_directory.c | 8 +- src/bin/pg_dump/pg_dump.c | 39 +++ src/include/pg_config.h.in | 3 + src/tools/msvc/Solution.pm | 1 + 9 files changed, 686 insertions(+), 8 deletions(-) diff --git a/configure b/configure index 11a4284e5b..fe739879af 100755 --- a/configure +++ b/configure @@ -698,6 +698,7 @@ with_gnu_ld LD LDFLAGS_SL LDFLAGS_EX +with_zstd with_zlib with_system_tzdata with_libxslt @@ -798,6 +799,7 @@ infodir docdir oldincludedir includedir +runstatedir localstatedir sharedstatedir sysconfdir @@ -866,6 +868,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_zstd with_gnu_ld enable_largefile ' @@ -935,6 +938,7 @@ datadir='${datarootdir}' sysconfdir='${prefix}/etc' sharedstatedir='${prefix}/com' localstatedir='${prefix}/var' +runstatedir='${localstatedir}/run' includedir='${prefix}/include' oldincludedir='/usr/include' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' @@ -1187,6 +1191,15 @@ do | -silent | --silent | --silen | --sile | --sil) silent=yes ;; + -runstatedir | --runstatedir | --runstatedi | --runstated \ + | --runstate | --runstat | --runsta | --runst | --runs \ + | --run | --ru | --r) + ac_prev=runstatedir ;; + -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \ + | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \ + | --run=* | --ru=* | --r=*) + runstatedir=$ac_optarg ;; + -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) ac_prev=sbindir ;; -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ @@ -1324,7 +1337,7 @@ fi for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ datadir sysconfdir sharedstatedir localstatedir includedir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ - libdir localedir mandir + libdir localedir mandir runstatedir do eval ac_val=\$$ac_var # Remove trailing slashes. @@ -1477,6 +1490,7 @@ Fine tuning of the installation directories: --sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --localstatedir=DIR modifiable single-machine data [PREFIX/var] + --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run] --libdir=DIR object code libraries [EPREFIX/lib] --includedir=DIR C header files [PREFIX/include] --oldincludedir=DIR C header files for non-gcc [/usr/include] @@ -1570,6 +1584,7 @@ Optional Packages: --with-system-tzdata=DIR use system time zone data in DIR --without-zlib do not use Zlib + --with-zstd use Zstd compression library --with-gnu-ld assume the C compiler uses GNU ld [default=no] Some influential environment variables: @@ -8601,6 +8616,35 @@ fi +# +# Zstd +# + + + +# Check whether --with-zstd was given. +if test "${with_zstd+set}" = set; then : + withval=$with_zstd; + case $withval in + yes) + : + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5 + ;; + esac + +else + with_zstd=no + +fi + + + + # # Assignments # @@ -12092,6 +12136,59 @@ fi fi +if test "$with_zstd" = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressStream2 in -lzstd" >&5 +$as_echo_n "checking for ZSTD_compressStream2 in -lzstd... " >&6; } +if ${ac_cv_lib_zstd_ZSTD_compressStream2+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzstd $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compressStream2 (); +int +main () +{ +return ZSTD_compressStream2 (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zstd_ZSTD_compressStream2=yes +else + ac_cv_lib_zstd_ZSTD_compressStream2=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressStream2" >&5 +$as_echo "$ac_cv_lib_zstd_ZSTD_compressStream2" >&6; } +if test "x$ac_cv_lib_zstd_ZSTD_compressStream2" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZSTD 1 +_ACEOF + + LIBS="-lzstd $LIBS" + +else + as_fn_error $? "zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support." "$LINENO" 5 +fi + +fi + if test "$enable_spinlocks" = yes; then $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h @@ -13295,6 +13392,20 @@ Use --without-zlib to disable zlib support." "$LINENO" 5 fi +fi + +if test "$with_zstd" = yes; then + ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default" +if test "x$ac_cv_header_zstd_h" = xyes; then : + +else + as_fn_error $? "zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support." "$LINENO" 5 +fi + + fi if test "$with_gssapi" = yes ; then @@ -14689,7 +14800,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14735,7 +14846,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14804,7 +14915,7 @@ else We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; @@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext We can't simply define LARGE_OFF_T to be 9223372036854775807, since some C++ compilers masquerading as C compilers incorrectly reject 9223372036854775807. */ -#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62)) +#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31)) int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721 && LARGE_OFF_T % 2147483647 == 1) ? 1 : -1]; diff --git a/configure.ac b/configure.ac index fc523c6aeb..744836ea7f 100644 --- a/configure.ac +++ b/configure.ac @@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes, [do not use Zlib]) AC_SUBST(with_zlib) +# +# Zstd +# +PGAC_ARG_BOOL(with, zstd, no, + [use Zstd compression library]) +AC_SUBST(with_zstd) + # # Assignments # @@ -1186,6 +1193,14 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_LIB(zstd, ZSTD_compressStream2, [], + [AC_MSG_ERROR([zstd library not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) +fi + if test "$enable_spinlocks" = yes; then AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) else @@ -1400,6 +1415,13 @@ failure. It is possible the compiler isn't looking in the proper directory. Use --without-zlib to disable zlib support.])]) fi +if test "$with_zstd" = yes; then + AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found +If you have zstd already installed, see config.log for details on the +failure. It is possible the compiler isn't looking in the proper directory. +Use --without-zstd to disable zstd support.])]) +fi + if test "$with_gssapi" = yes ; then AC_CHECK_HEADERS(gssapi/gssapi.h, [], [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])]) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index d66d6f60f5..285f554c1a 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -65,6 +65,18 @@ compresslibs[] = { { COMPR_ALG_LIBZ, "libz", ".gz", Z_DEFAULT_COMPRESSION }, { COMPR_ALG_LIBZ, "zlib", ".gz", Z_DEFAULT_COMPRESSION }, /* Alternate name */ +#ifdef HAVE_LIBZSTD + /* + * ZSTD doesen't have a #define for it, but 0 means "the current default". + * Note that ZSTD_CLEVEL_DEFAULT is currently defined to 3. + * + * Block size should be ZSTD_DStreamOutSize(), but needs to be + * constant, so use ZSTD_BLOCKSIZE_MAX (128kB) + */ + { COMPR_ALG_ZSTD, "zst", ".zst", 0 }, + { COMPR_ALG_ZSTD, "zstd", ".zst", 0 }, /* Alternate name */ +#endif /* HAVE_LIBZSTD */ + { 0, NULL, } /* sentinel */ }; @@ -84,6 +96,18 @@ struct CompressorState char *zlibOut; size_t zlibOutSize; #endif + +#ifdef HAVE_LIBZSTD + union { + struct { + ZSTD_outBuffer output; + ZSTD_inBuffer input; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + } zstd; + } u; +#endif + }; /* Routines that support zlib compressed data I/O */ @@ -97,6 +121,15 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); #endif +#ifdef HAVE_LIBZSTD +static ZSTD_CStream *ZstdCStreamParams(Compress *compress); +static void InitCompressorZstd(CompressorState *cs, Compress *compress); +static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs); +static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen); +static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF); +#endif + /* Routines that support uncompressed data I/O */ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, @@ -125,6 +158,13 @@ AllocateCompressor(Compress *compression, WriteFunc writeF) InitCompressorZlib(cs, compression); break; #endif + +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + InitCompressorZstd(cs, compression); + break; +#endif + case COMPR_ALG_NONE: /* Do nothing */ break; @@ -153,6 +193,13 @@ ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF) ReadDataFromArchiveZlib(AH, readF); break; #endif + +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + ReadDataFromArchiveZstd(AH, readF); + break; +#endif + default: /* Should not happen */ fatal("requested compression not available in this installation"); @@ -173,6 +220,12 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, WriteDataToArchiveZlib(AH, cs, data, dLen); break; #endif + +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + WriteDataToArchiveZstd(AH, cs, data, dLen); + break; +#endif case COMPR_ALG_NONE: WriteDataToArchiveNone(AH, cs, data, dLen); break; @@ -193,11 +246,202 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs) if (cs->comprAlg == COMPR_ALG_LIBZ) EndCompressorZlib(AH, cs); #endif + +#ifdef HAVE_LIBZSTD + if (cs->comprAlg == COMPR_ALG_ZSTD) + EndCompressorZstd(AH, cs); +#endif + free(cs); } /* Private routines, specific to each compression method. */ +#ifdef HAVE_LIBZSTD + +static void ZSTD_CCtx_setParam_or_die(ZSTD_CStream *cstream, + ZSTD_cParameter param, int value) + +{ + size_t res; + res = ZSTD_CCtx_setParameter(cstream, param, value); + if (ZSTD_isError(res)) + fatal("could not set compression parameter: %s", + ZSTD_getErrorName(res)); +} + +/* Return a compression stream with parameters set per argument */ +static ZSTD_CStream* +ZstdCStreamParams(Compress *compress) +{ + ZSTD_CStream *cstream; + cstream = ZSTD_createCStream(); + if (cstream == NULL) + fatal("could not initialize compression library"); + + if (compress->level != 0) // XXX: ZSTD_CLEVEL_DEFAULT + { + size_t res; + res = ZSTD_CCtx_setParameter(cstream, + ZSTD_c_compressionLevel, compress->level); + if (ZSTD_isError(res)) + fatal("could not set compression level: %s", + ZSTD_getErrorName(res)); + } + + if (compress->zstd.longdistance) // XXX: ternary + ZSTD_CCtx_setParam_or_die(cstream, + ZSTD_c_enableLongDistanceMatching, + compress->zstd.longdistance); + + if (compress->zstd.checksum) + ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_checksumFlag, + compress->zstd.checksum); + +// not supported in my library ? + if (compress->zstd.threads) + ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers, + compress->zstd.threads); + +#if 0 + /* Still marked as experimental */ + if (compress->zstd.rsyncable) + ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_rsyncable, 1); +#endif + + return cstream; +} + +static void +InitCompressorZstd(CompressorState *cs, Compress *compress) +{ + cs->u.zstd.cstream = ZstdCStreamParams(compress); + /* XXX: initialize safely like the corresponding zlib "paranoia" */ + cs->u.zstd.output.size = ZSTD_CStreamOutSize(); + cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size); + cs->u.zstd.output.pos = 0; +} + +static void +EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZSTD_outBuffer *output = &cs->u.zstd.output; + + for (;;) + { + size_t res; + + res = ZSTD_compressStream2(cs->u.zstd.cstream, output, + &cs->u.zstd.input, ZSTD_e_end); + + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + if (res == 0) + break; + + if (ZSTD_isError(res)) + fatal("could not close compression stream: %s", + ZSTD_getErrorName(res)); + } + + // XXX: retval + ZSTD_freeCStream(cs->u.zstd.cstream); +} + +static void +WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const char *data, size_t dLen) +{ + ZSTD_inBuffer *input = &cs->u.zstd.input; + ZSTD_outBuffer *output = &cs->u.zstd.output; + + input->src = (void *) unconstify(char *, data); + input->size = dLen; + input->pos = 0; + + while (input->pos != input->size) + { + size_t res; + + res = ZSTD_compressStream2(cs->u.zstd.cstream, output, + input, ZSTD_e_continue); + + if (output->pos == output->size || + input->pos != input->size) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + output->pos = 0; + } + + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + } +} + +/* Read data from a compressed zstd archive */ +static void +ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF) +{ + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + size_t res; + size_t input_size; + + dstream = ZSTD_createDStream(); + if (dstream == NULL) + fatal("could not initialize compression library"); + + input_size = ZSTD_DStreamInSize(); + input.src = pg_malloc(input_size); + + output.size = ZSTD_DStreamOutSize(); + output.dst = pg_malloc(output.size); + + /* read compressed data */ + for (;;) + { + size_t cnt; + + input.size = input_size; // XXX: the buffer can grow, we shouldn't keep resetting it to the original value.. + cnt = readF(AH, (char **)unconstify(void **, &input.src), &input.size); + input.pos = 0; + input.size = cnt; + + if (cnt == 0) + break; + + while (input.pos < input.size) + { + /* decompress */ + output.pos = 0; + res = ZSTD_decompressStream(dstream, &output, &input); + + if (ZSTD_isError(res)) + fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + /* write to output handle */ + ((char *)output.dst)[output.pos] = '\0'; + ahwrite(output.dst, 1, output.pos, AH); + // if (res == 0) + // break; + } + } + + pg_free(unconstify(void *, input.src)); + pg_free(output.dst); +} + +#endif /* HAVE_LIBZSTD */ + #ifdef HAVE_LIBZ /* * Functions for zlib compressed output. @@ -411,6 +655,19 @@ struct cfp #ifdef HAVE_LIBZ gzFile compressedfp; #endif + +#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg? + /* This is a normal file to which we read/write compressed data */ + struct { + FILE *fp; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + } zstd; +#endif + }; static int hasSuffix(const char *filename); @@ -525,6 +782,31 @@ cfopen(const char *path, const char *mode, Compress *compression) return fp; #endif +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + fp->zstd.fp = fopen(path, mode); + if (fp->zstd.fp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } + else if (mode[0] == 'w' || mode[0] == 'a' || + strchr(mode, '+') != NULL) + { + fp->zstd.output.size = ZSTD_CStreamOutSize(); + fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); + fp->zstd.cstream = ZstdCStreamParams(compression); + } + else if (strchr(mode, 'r')) + { + fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); + fp->zstd.dstream = ZSTD_createDStream(); + if (fp->zstd.dstream == NULL) + fatal("could not initialize compression library"); + } // XXX else: bad mode + return fp; +#endif + case COMPR_ALG_NONE: fp->uncompressedfp = fopen(path, mode); if (fp->uncompressedfp == NULL) @@ -576,6 +858,31 @@ cfdopen(int fd, const char *mode, Compress *compression) return fp; #endif +#ifdef HAVE_LIBZSTD + case COMPR_ALG_ZSTD: + fp->zstd.fp = fdopen(fd, mode); + if (fp->zstd.fp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } + else if (mode[0] == 'w' || mode[0] == 'a' || + strchr(mode, '+') != NULL) + { + fp->zstd.output.size = ZSTD_CStreamOutSize(); + fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); + fp->zstd.cstream = ZstdCStreamParams(compression); + } + else if (strchr(mode, 'r')) + { + fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); + fp->zstd.dstream = ZSTD_createDStream(); + if (fp->zstd.dstream == NULL) + fatal("could not initialize compression library"); + } // XXX else: bad mode + return fp; +#endif + case COMPR_ALG_NONE: fp->uncompressedfp = fdopen(fd, mode); if (fp->uncompressedfp == NULL) @@ -617,6 +924,68 @@ cfread(void *ptr, int size, cfp *fp) } #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + size_t input_size = ZSTD_DStreamInSize(); + /* input_size is the allocated size */ + size_t res, cnt; + + output->size = size; + output->dst = ptr; + output->pos = 0; + + for (;;) + { + Assert(input->pos <= input->size); + Assert(input->size <= input_size); + + /* If the input is completely consumed, start back at the beginning */ + if (input->pos == input->size) + { + /* input->size is size produced by "fread" */ + input->size = 0; + /* input->pos is position consumed by decompress */ + input->pos = 0; + } + + /* read compressed data if we must produce more input */ + if (input->pos == input->size) + { + cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp); + input->size = cnt; + + /* If we have no input to consume, we're done */ + if (cnt == 0) + break; + } + + Assert(cnt >= 0); + Assert(input->size <= input_size); + + /* Now consume as much as possible */ + for ( ; input->pos < input->size; ) + { + /* decompress */ + res = ZSTD_decompressStream(fp->zstd.dstream, output, input); + if (res == 0) + break; /* End of frame */ + if (output->pos == output->size) + break; /* No more room for output */ + if (ZSTD_isError(res)) + fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + } + + if (output->pos == output->size) + break; /* We read all the data that fits */ + } + + return output->pos; + } +#endif + ret = fread(ptr, 1, size, fp->uncompressedfp); if (ret != size && !feof(fp->uncompressedfp)) READ_ERROR_EXIT(fp->uncompressedfp); @@ -630,6 +999,35 @@ cfwrite(const void *ptr, int size, cfp *fp) if (fp->compressedfp) return gzwrite(fp->compressedfp, ptr, size); #endif + +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + size_t res, cnt; + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + + input->src = ptr; + input->size = size; + input->pos = 0; + + /* Consume all input, and flush later */ + while (input->pos != input->size) + { + output->pos = 0; + res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue); + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + + cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + if (cnt != output->pos) + fatal("could not write data: %s", strerror(errno)); + } + + return size; + } +#endif + return fwrite(ptr, 1, size, fp->uncompressedfp); } @@ -652,6 +1050,21 @@ cfgetc(cfp *fp) return ret; } #endif + +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + if (cfread(&ret, 1, fp) != 1) + { + if (feof(fp->zstd.fp)) + fatal("could not read from input file: end of file"); + else + fatal("could not read from input file: %s", strerror(errno)); + } + return ret; + } +#endif + ret = fgetc(fp->uncompressedfp); if (ret == EOF) READ_ERROR_EXIT(fp->uncompressedfp); @@ -665,6 +1078,31 @@ cfgets(cfp *fp, char *buf, int len) if (fp->compressedfp) return gzgets(fp->compressedfp, buf, len); #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + /* + * Read one byte at a time until newline or EOF. + * This is only used to read the list of blobs, and the I/O is + * buffered anyway. + */ + int i, res; + for (i = 0; i < len - 1; ++i) + { + res = cfread(&buf[i], 1, fp); + if (res != 1) + break; + if (buf[i] == '\n') + { + ++i; + break; + } + } + buf[i] = '\0'; + return i > 0 ? buf : 0; + } +#endif + return fgets(buf, len, fp->uncompressedfp); } @@ -688,6 +1126,44 @@ cfclose(cfp *fp) } #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + { + ZSTD_outBuffer *output = &fp->zstd.output; + ZSTD_inBuffer *input = &fp->zstd.input; + size_t res, cnt; + + if (fp->zstd.cstream) + { + for (;;) + { + output->pos = 0; + res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end); + if (ZSTD_isError(res)) + fatal("could not compress data: %s", ZSTD_getErrorName(res)); + cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + if (cnt != output->pos) + fatal("could not write data: %s", strerror(errno)); + if (res == 0) + break; + } + + ZSTD_freeCStream(fp->zstd.cstream); + pg_free(fp->zstd.output.dst); + } + + if (fp->zstd.dstream) + { + ZSTD_freeDStream(fp->zstd.dstream); + pg_free(unconstify(void *, fp->zstd.input.src)); + } + + result = fclose(fp->zstd.fp); + fp->zstd.fp = NULL; + return result; + } +#endif + result = fclose(fp->uncompressedfp); fp->uncompressedfp = NULL; free_keep_errno(fp); @@ -702,6 +1178,10 @@ cfeof(cfp *fp) return gzeof(fp->compressedfp); #endif +#ifdef HAVE_LIBZSTD + if (fp->zstd.fp) + return feof(fp->zstd.fp); +#endif return feof(fp->uncompressedfp); } diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index f2390b7937..19ff6248d5 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -77,6 +77,7 @@ typedef enum COMPR_ALG_DEFAULT = -1, COMPR_ALG_NONE, COMPR_ALG_LIBZ, + COMPR_ALG_ZSTD, } CompressionAlgorithm; /* Should be called "method" or "library" ? */ @@ -88,6 +89,19 @@ typedef struct Compress { * are all integer, though. */ bool level_set; + + /* + * This could be a union across all compress algorithms, but + * keeping as separate structs allows checking that options are + * not specified for a different algorithm than selected. + */ + + struct { + bool longdistance; + bool checksum; + bool rsyncable; + int threads; + } zstd; } Compress; diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 9f511b49b9..da2eb53277 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -56,6 +56,10 @@ typedef struct _z_stream typedef z_stream *z_streamp; #endif +#ifdef HAVE_LIBZSTD +#include <zstd.h> +#endif /* HAVE_LIBZSTD */ + /* Data block types */ #define BLK_DATA 1 #define BLK_BLOBS 3 diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 75c1bf22e4..b8efeb8ca7 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -393,8 +393,12 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!cfp) fatal("could not open input file \"%s\": %m", filename); - buf = pg_malloc(ZLIB_OUT_SIZE); - buflen = ZLIB_OUT_SIZE; + /* + * zstd prefers a 128kB buffer. The allocation cannot happen in + * cfread, since the "cfp" is an opaque type. + */ + buf = pg_malloc(128*1024); + buflen = 128*1024; while ((cnt = cfread(buf, buflen, cfp))) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 75985fd4d3..7c2f7a9ca3 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -356,6 +356,12 @@ parse_compression(const char *optarg, Compress *compress) compress->level = atoi(1+eq); compress->level_set = true; } + else if (strncmp(optarg, "zstdlong", len) == 0) + compress->zstd.longdistance = atoi(1+eq); + else if (strncmp(optarg, "checksum", len) == 0) + compress->zstd.checksum = atoi(1+eq); + else if (strncmp(optarg, "threads", len) == 0) + compress->zstd.threads = atoi(1+eq); else { pg_log_error("unknown compression setting: %s", optarg); @@ -367,11 +373,31 @@ parse_compression(const char *optarg, Compress *compress) break; } + /* XXX: zstd will check its own compression level later */ + if (compress->alg != COMPR_ALG_ZSTD) + { + Compress nullopts = {0}; + + if (compress->level < 0 || compress->level > 9) + { + pg_log_error("compression level must be in range 0..9"); + exit_nicely(1); + } + +// XXX: needs to set default alg first + if (memcmp(&compress->zstd, &nullopts.zstd, sizeof(nullopts.zstd)) != 0) + { + pg_log_error("compression option not supported with this algorithm"); + exit_nicely(1); + } + } + if (!compress->level_set) { // XXX const int default_compress_level[] = { 0, /* COMPR_ALG_NONE */ Z_DEFAULT_COMPRESSION, /* COMPR_ALG_ZLIB */ + 0, // XXX: ZSTD_CLEVEL_DEFAULT, /* COMPR_ALG_ZSTD */ }; compress->level = default_compress_level[compress->alg]; @@ -764,6 +790,11 @@ main(int argc, char **argv) compress.alg = COMPR_ALG_LIBZ; compress.level = Z_DEFAULT_COMPRESSION; #endif + +#ifdef HAVE_LIBZSTD + compress.alg = COMPR_ALG_ZSTD; // Set default for testing purposes + compress.level = ZSTD_CLEVEL_DEFAULT; +#endif } else { @@ -780,6 +811,14 @@ main(int argc, char **argv) } #endif +#ifndef HAVE_LIBZSTD + if (compress.alg == COMPR_ALG_ZSTD) + { + pg_log_warning("requested compression not available in this installation -- archive will be uncompressed"); + compress.alg = 0; + } +#endif + /* * If emitting an archive format, we always want to emit a DATABASE item, * in case --create is specified at pg_restore time. diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index de8f838e53..da35415c72 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -346,6 +346,9 @@ /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ +/* Define to 1 if you have the `zstd' library (-lzstd). */ +#undef HAVE_LIBZSTD + /* Define to 1 if you have the `link' function. */ #undef HAVE_LINK diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm index 22d6abd367..a101366b4c 100644 --- a/src/tools/msvc/Solution.pm +++ b/src/tools/msvc/Solution.pm @@ -307,6 +307,7 @@ sub GenerateFiles HAVE_LIBXML2 => undef, HAVE_LIBXSLT => undef, HAVE_LIBZ => $self->{options}->{zlib} ? 1 : undef, + HAVE_LIBZSTD => $self->{options}->{zstd} ? 1 : undef, HAVE_LINK => undef, HAVE_LOCALE_T => 1, HAVE_LONG_INT_64 => undef, -- 2.17.0
>From fe12ba8f0ec663f4778e6b46053739ad4ec20514 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Thu, 24 Dec 2020 00:01:43 -0600 Subject: [PATCH 07/20] fix comments --- src/bin/pg_dump/compress_io.c | 15 +++++++-------- src/bin/pg_dump/pg_backup_directory.c | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 285f554c1a..fa94148cdf 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -13,7 +13,7 @@ * friends, providing an interface similar to those, but abstracts away * the possible compression. Both APIs use libz for the compression, but * the second API uses gzip headers, so the resulting files can be easily - * manipulated with the gzip utility. + * manipulated with the gzip utility. XXX * * Compressor API * -------------- @@ -41,9 +41,9 @@ * libz's gzopen() APIs. It allows you to use the same functions for * compressed and uncompressed streams. cfopen_read() first tries to open * the file with given name, and if it fails, it tries to open the same - * file with the .gz suffix. cfopen_write() opens a file for writing, an + * file with a compressed suffix. cfopen_write() opens a file for writing, an * extra argument specifies if the file should be compressed, and adds the - * .gz suffix to the filename if so. This allows you to easily handle both + * compressed suffix to the filename if so. This allows you to easily handle both * compressed and uncompressed files. * * IDENTIFICATION @@ -646,8 +646,8 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, */ /* - * cfp represents an open stream, wrapping the underlying FILE or gzFile - * pointer. This is opaque to the callers. + * cfp represents an open stream, wrapping the underlying compressed or + * uncompressed file object. This is opaque to the callers. */ struct cfp { @@ -687,8 +687,7 @@ free_keep_errno(void *p) * be either "r" or "rb". * * If the file at 'path' does not exist, we search with compressed suffix (if 'path' - * doesn't already have one) and try again. So if you pass "foo" as 'path', - * this will open either "foo" or "foo.gz". + * doesn't already have one) and try again. * * On failure, return NULL with an error code in errno. */ @@ -745,7 +744,7 @@ cfopen_write(const char *path, const char *mode, Compress *compression) } /* - * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file + * Opens file 'path' in 'mode'. If 'alg' is COMPR_ALG_ZLIB, the file * is opened with libz gzopen(), otherwise with plain fopen(). * * On failure, return NULL with an error code in errno. diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index b8efeb8ca7..f0ded2421d 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -6,8 +6,8 @@ * for the TOC, and a separate file for each data entry, named "<oid>.dat". * Large objects (BLOBs) are stored in separate files named "blob_<oid>.dat", * and there's a plain-text TOC file for them called "blobs.toc". If - * compression is used, each data file is individually compressed and the - * ".gz" suffix is added to the filenames. The TOC files are never + * compression is used, each data file is individually compressed with a + * suffix is added to the filenames. The TOC files are never * compressed by pg_dump, however they are accepted with the .gz suffix too, * in case the user has manually compressed them with 'gzip'. * -- 2.17.0
>From 44b3ed951859072b8d814d0439565187bf960b7b Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Mon, 21 Dec 2020 00:11:43 -0600 Subject: [PATCH 08/20] union{} with a CompressionAlgorithm alg --- src/bin/pg_dump/compress_io.c | 200 ++++++++++++++++++---------------- src/bin/pg_dump/pg_dump.c | 2 +- 2 files changed, 106 insertions(+), 96 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index fa94148cdf..e07436bc21 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -651,23 +651,27 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, */ struct cfp { - FILE *uncompressedfp; + CompressionAlgorithm alg; + + union { + FILE *fp; + #ifdef HAVE_LIBZ - gzFile compressedfp; + gzFile gzfp; #endif -#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg? - /* This is a normal file to which we read/write compressed data */ - struct { - FILE *fp; - // XXX: use one separate ZSTD_CStream per thread: disable on windows ? - ZSTD_CStream *cstream; - ZSTD_DStream *dstream; - ZSTD_outBuffer output; - ZSTD_inBuffer input; - } zstd; +#ifdef HAVE_LIBZSTD + struct { + /* This is a normal file to which we read/write compressed data */ + FILE *fp; + // XXX: use one separate ZSTD_CStream per thread: disable on windows ? + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + } zstd; #endif - + } u; }; static int hasSuffix(const char *filename); @@ -754,6 +758,8 @@ cfopen(const char *path, const char *mode, Compress *compression) { cfp *fp = pg_malloc0(sizeof(cfp)); + fp->alg = compression->alg; + switch (compression->alg) { #ifdef HAVE_LIBZ @@ -765,15 +771,15 @@ cfopen(const char *path, const char *mode, Compress *compression) snprintf(mode_compression, sizeof(mode_compression), "%s%d", mode, compression->level); - fp->compressedfp = gzopen(path, mode_compression); + fp->u.gzfp = gzopen(path, mode_compression); } else { /* don't specify a level, just use the zlib default */ - fp->compressedfp = gzopen(path, mode); + fp->u.gzfp = gzopen(path, mode); } - if (fp->compressedfp == NULL) + if (fp->u.gzfp == NULL) { free_keep_errno(fp); fp = NULL; @@ -783,8 +789,8 @@ cfopen(const char *path, const char *mode, Compress *compression) #ifdef HAVE_LIBZSTD case COMPR_ALG_ZSTD: - fp->zstd.fp = fopen(path, mode); - if (fp->zstd.fp == NULL) + fp->u.zstd.fp = fopen(path, mode); + if (fp->u.zstd.fp == NULL) { free_keep_errno(fp); fp = NULL; @@ -792,23 +798,23 @@ cfopen(const char *path, const char *mode, Compress *compression) else if (mode[0] == 'w' || mode[0] == 'a' || strchr(mode, '+') != NULL) { - fp->zstd.output.size = ZSTD_CStreamOutSize(); - fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); - fp->zstd.cstream = ZstdCStreamParams(compression); + fp->u.zstd.output.size = ZSTD_CStreamOutSize(); + fp->u.zstd.output.dst = pg_malloc0(fp->u.zstd.output.size); + fp->u.zstd.cstream = ZstdCStreamParams(compression); } else if (strchr(mode, 'r')) { - fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); - fp->zstd.dstream = ZSTD_createDStream(); - if (fp->zstd.dstream == NULL) + fp->u.zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); + fp->u.zstd.dstream = ZSTD_createDStream(); + if (fp->u.zstd.dstream == NULL) fatal("could not initialize compression library"); } // XXX else: bad mode return fp; #endif case COMPR_ALG_NONE: - fp->uncompressedfp = fopen(path, mode); - if (fp->uncompressedfp == NULL) + fp->u.fp = fopen(path, mode); + if (fp->u.fp == NULL) { free_keep_errno(fp); fp = NULL; @@ -830,6 +836,8 @@ cfdopen(int fd, const char *mode, Compress *compression) { cfp *fp = pg_malloc0(sizeof(cfp)); + fp->alg = compression->alg; + switch (compression->alg) { #ifdef HAVE_LIBZ @@ -841,15 +849,15 @@ cfdopen(int fd, const char *mode, Compress *compression) snprintf(mode_compression, sizeof(mode_compression), "%s%d", mode, compression->level); - fp->compressedfp = gzdopen(fd, mode_compression); + fp->u.gzfp = gzdopen(fd, mode_compression); } else { /* don't specify a level, just use the zlib default */ - fp->compressedfp = gzdopen(fd, mode); + fp->u.gzfp = gzdopen(fd, mode); } - if (fp->compressedfp == NULL) + if (fp->u.gzfp == NULL) { free_keep_errno(fp); fp = NULL; @@ -859,8 +867,8 @@ cfdopen(int fd, const char *mode, Compress *compression) #ifdef HAVE_LIBZSTD case COMPR_ALG_ZSTD: - fp->zstd.fp = fdopen(fd, mode); - if (fp->zstd.fp == NULL) + fp->u.zstd.fp = fdopen(fd, mode); + if (fp->u.zstd.fp == NULL) { free_keep_errno(fp); fp = NULL; @@ -868,23 +876,23 @@ cfdopen(int fd, const char *mode, Compress *compression) else if (mode[0] == 'w' || mode[0] == 'a' || strchr(mode, '+') != NULL) { - fp->zstd.output.size = ZSTD_CStreamOutSize(); - fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size); - fp->zstd.cstream = ZstdCStreamParams(compression); + fp->u.zstd.output.size = ZSTD_CStreamOutSize(); + fp->u.zstd.output.dst = pg_malloc0(fp->u.zstd.output.size); + fp->u.zstd.cstream = ZstdCStreamParams(compression); } else if (strchr(mode, 'r')) { - fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); - fp->zstd.dstream = ZSTD_createDStream(); - if (fp->zstd.dstream == NULL) + fp->u.zstd.input.src = pg_malloc0(ZSTD_DStreamInSize()); + fp->u.zstd.dstream = ZSTD_createDStream(); + if (fp->u.zstd.dstream == NULL) fatal("could not initialize compression library"); } // XXX else: bad mode return fp; #endif case COMPR_ALG_NONE: - fp->uncompressedfp = fdopen(fd, mode); - if (fp->uncompressedfp == NULL) + fp->u.fp = fdopen(fd, mode); + if (fp->u.fp == NULL) { free_keep_errno(fp); fp = NULL; @@ -908,13 +916,13 @@ cfread(void *ptr, int size, cfp *fp) return 0; #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - ret = gzread(fp->compressedfp, ptr, size); - if (ret != size && !gzeof(fp->compressedfp)) + ret = gzread(fp->u.gzfp, ptr, size); + if (ret != size && !gzeof(fp->u.gzfp)) { int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + const char *errmsg = gzerror(fp->u.gzfp, &errnum); fatal("could not read from input file: %s", errnum == Z_ERRNO ? strerror(errno) : errmsg); @@ -924,10 +932,10 @@ cfread(void *ptr, int size, cfp *fp) #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; size_t input_size = ZSTD_DStreamInSize(); /* input_size is the allocated size */ size_t res, cnt; @@ -953,7 +961,7 @@ cfread(void *ptr, int size, cfp *fp) /* read compressed data if we must produce more input */ if (input->pos == input->size) { - cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp); + cnt = fread(unconstify(void *, input->src), 1, input_size, fp->u.zstd.fp); input->size = cnt; /* If we have no input to consume, we're done */ @@ -968,7 +976,7 @@ cfread(void *ptr, int size, cfp *fp) for ( ; input->pos < input->size; ) { /* decompress */ - res = ZSTD_decompressStream(fp->zstd.dstream, output, input); + res = ZSTD_decompressStream(fp->u.zstd.dstream, output, input); if (res == 0) break; /* End of frame */ if (output->pos == output->size) @@ -985,9 +993,9 @@ cfread(void *ptr, int size, cfp *fp) } #endif - ret = fread(ptr, 1, size, fp->uncompressedfp); - if (ret != size && !feof(fp->uncompressedfp)) - READ_ERROR_EXIT(fp->uncompressedfp); + ret = fread(ptr, 1, size, fp->u.fp); + if (ret != size && !feof(fp->u.fp)) + READ_ERROR_EXIT(fp->u.fp); return ret; } @@ -995,16 +1003,16 @@ int cfwrite(const void *ptr, int size, cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzwrite(fp->compressedfp, ptr, size); + if (fp->alg == COMPR_ALG_LIBZ) + return gzwrite(fp->u.gzfp, ptr, size); #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { size_t res, cnt; - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; input->src = ptr; input->size = size; @@ -1014,11 +1022,11 @@ cfwrite(const void *ptr, int size, cfp *fp) while (input->pos != input->size) { output->pos = 0; - res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue); + res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_continue); if (ZSTD_isError(res)) fatal("could not compress data: %s", ZSTD_getErrorName(res)); - cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp); if (cnt != output->pos) fatal("could not write data: %s", strerror(errno)); } @@ -1027,7 +1035,7 @@ cfwrite(const void *ptr, int size, cfp *fp) } #endif - return fwrite(ptr, 1, size, fp->uncompressedfp); + return fwrite(ptr, 1, size, fp->u.fp); } int @@ -1036,12 +1044,12 @@ cfgetc(cfp *fp) int ret; #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - ret = gzgetc(fp->compressedfp); + ret = gzgetc(fp->u.gzfp); if (ret == EOF) { - if (!gzeof(fp->compressedfp)) + if (!gzeof(fp->u.gzfp)) fatal("could not read from input file: %s", strerror(errno)); else fatal("could not read from input file: end of file"); @@ -1051,11 +1059,11 @@ cfgetc(cfp *fp) #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { if (cfread(&ret, 1, fp) != 1) { - if (feof(fp->zstd.fp)) + if (feof(fp->u.zstd.fp)) fatal("could not read from input file: end of file"); else fatal("could not read from input file: %s", strerror(errno)); @@ -1064,9 +1072,9 @@ cfgetc(cfp *fp) } #endif - ret = fgetc(fp->uncompressedfp); + ret = fgetc(fp->u.fp); if (ret == EOF) - READ_ERROR_EXIT(fp->uncompressedfp); + READ_ERROR_EXIT(fp->u.fp); return ret; } @@ -1074,11 +1082,12 @@ char * cfgets(cfp *fp, char *buf, int len) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzgets(fp->compressedfp, buf, len); + if (fp->alg == COMPR_ALG_LIBZ) + return gzgets(fp->u.gzfp, buf, len); #endif + #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { /* * Read one byte at a time until newline or EOF. @@ -1102,7 +1111,7 @@ cfgets(cfp *fp, char *buf, int len) } #endif - return fgets(buf, len, fp->uncompressedfp); + return fgets(buf, len, fp->u.fp); } /* Close the given compressed or uncompressed stream; return 0 on success. */ @@ -1117,54 +1126,54 @@ cfclose(cfp *fp) return EOF; } #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { - result = gzclose(fp->compressedfp); - fp->compressedfp = NULL; + result = gzclose(fp->u.gzfp); + fp->u.gzfp = NULL; return result; } #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) + if (fp->alg == COMPR_ALG_ZSTD) { - ZSTD_outBuffer *output = &fp->zstd.output; - ZSTD_inBuffer *input = &fp->zstd.input; + ZSTD_outBuffer *output = &fp->u.zstd.output; + ZSTD_inBuffer *input = &fp->u.zstd.input; size_t res, cnt; - if (fp->zstd.cstream) + if (fp->u.zstd.cstream) { for (;;) { output->pos = 0; - res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end); + res = ZSTD_compressStream2(fp->u.zstd.cstream, output, input, ZSTD_e_end); if (ZSTD_isError(res)) fatal("could not compress data: %s", ZSTD_getErrorName(res)); - cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp); + cnt = fwrite(output->dst, 1, output->pos, fp->u.zstd.fp); if (cnt != output->pos) fatal("could not write data: %s", strerror(errno)); if (res == 0) break; } - ZSTD_freeCStream(fp->zstd.cstream); - pg_free(fp->zstd.output.dst); + ZSTD_freeCStream(fp->u.zstd.cstream); + pg_free(fp->u.zstd.output.dst); } - if (fp->zstd.dstream) + if (fp->u.zstd.dstream) { - ZSTD_freeDStream(fp->zstd.dstream); - pg_free(unconstify(void *, fp->zstd.input.src)); + ZSTD_freeDStream(fp->u.zstd.dstream); + pg_free(unconstify(void *, fp->u.zstd.input.src)); } - result = fclose(fp->zstd.fp); - fp->zstd.fp = NULL; + result = fclose(fp->u.zstd.fp); + fp->u.zstd.fp = NULL; return result; } #endif - result = fclose(fp->uncompressedfp); - fp->uncompressedfp = NULL; + result = fclose(fp->u.fp); + fp->u.fp = NULL; free_keep_errno(fp); return result; } @@ -1173,25 +1182,26 @@ int cfeof(cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzeof(fp->compressedfp); + if (fp->alg == COMPR_ALG_LIBZ) + return gzeof(fp->u.gzfp); #endif #ifdef HAVE_LIBZSTD - if (fp->zstd.fp) - return feof(fp->zstd.fp); + if (fp->alg == COMPR_ALG_ZSTD) + return feof(fp->u.zstd.fp); #endif - return feof(fp->uncompressedfp); + + return feof(fp->u.fp); } const char * get_cfp_error(cfp *fp) { #ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->alg == COMPR_ALG_LIBZ) { int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + const char *errmsg = gzerror(fp->u.gzfp, &errnum); if (errnum != Z_ERRNO) return errmsg; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 7c2f7a9ca3..5e009e5854 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -397,7 +397,7 @@ parse_compression(const char *optarg, Compress *compress) const int default_compress_level[] = { 0, /* COMPR_ALG_NONE */ Z_DEFAULT_COMPRESSION, /* COMPR_ALG_ZLIB */ - 0, // XXX: ZSTD_CLEVEL_DEFAULT, /* COMPR_ALG_ZSTD */ + 0, // #ifdef LIBZSTD ZSTD_CLEVEL_DEFAULT, /* COMPR_ALG_ZSTD */ }; compress->level = default_compress_level[compress->alg]; -- 2.17.0
>From c43384ebc1ff47536b207374ed472182ff4ae0e8 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Fri, 11 Dec 2020 22:22:31 -0600 Subject: [PATCH 09/20] Move zlib into the union{} --- src/bin/pg_dump/compress_io.c | 56 ++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index e07436bc21..ad085a7d54 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -91,23 +91,25 @@ struct CompressorState CompressionAlgorithm comprAlg; WriteFunc writeF; + union { #ifdef HAVE_LIBZ - z_streamp zp; - char *zlibOut; - size_t zlibOutSize; + struct { + z_streamp zp; + char *zlibOut; + size_t zlibOutSize; + } zlib; #endif #ifdef HAVE_LIBZSTD - union { + /* This is used for compression but not decompression */ struct { - ZSTD_outBuffer output; - ZSTD_inBuffer input; // XXX: use one separate ZSTD_CStream per thread: disable on windows ? - ZSTD_CStream *cstream; + ZSTD_CStream *cstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; } zstd; - } u; #endif - + } u; }; /* Routines that support zlib compressed data I/O */ @@ -452,7 +454,7 @@ InitCompressorZlib(CompressorState *cs, Compress *compress) { z_streamp zp; - zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp = cs->u.zlib.zp = (z_streamp) pg_malloc(sizeof(z_stream)); zp->zalloc = Z_NULL; zp->zfree = Z_NULL; zp->opaque = Z_NULL; @@ -462,22 +464,22 @@ InitCompressorZlib(CompressorState *cs, Compress *compress) * actually allocate one extra byte because some routines want to append a * trailing zero byte to the zlib output. */ - cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); - cs->zlibOutSize = ZLIB_OUT_SIZE; + cs->u.zlib.zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); + cs->u.zlib.zlibOutSize = ZLIB_OUT_SIZE; if (deflateInit(zp, compress->level) != Z_OK) fatal("could not initialize compression library: %s", zp->msg); /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) cs->zlibOut; - zp->avail_out = cs->zlibOutSize; + zp->next_out = (void *) cs->u.zlib.zlibOut; + zp->avail_out = cs->u.zlib.zlibOutSize; } static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) { - z_streamp zp = cs->zp; + z_streamp zp = cs->u.zlib.zp; zp->next_in = NULL; zp->avail_in = 0; @@ -488,23 +490,23 @@ EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) if (deflateEnd(zp) != Z_OK) fatal("could not close compression stream: %s", zp->msg); - free(cs->zlibOut); - free(cs->zp); + free(cs->u.zlib.zlibOut); + free(cs->u.zlib.zp); } static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) { - z_streamp zp = cs->zp; - char *out = cs->zlibOut; + z_streamp zp = cs->u.zlib.zp; + char *out = cs->u.zlib.zlibOut; int res = Z_OK; - while (cs->zp->avail_in != 0 || flush) + while (cs->u.zlib.zp->avail_in != 0 || flush) { res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); if (res == Z_STREAM_ERROR) fatal("could not compress data: %s", zp->msg); - if ((flush && (zp->avail_out < cs->zlibOutSize)) + if ((flush && (zp->avail_out < cs->u.zlib.zlibOutSize)) || (zp->avail_out == 0) || (zp->avail_in != 0) ) @@ -514,18 +516,18 @@ DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) * chunk is the EOF marker in the custom format. This should never * happen but... */ - if (zp->avail_out < cs->zlibOutSize) + if (zp->avail_out < cs->u.zlib.zlibOutSize) { /* * Any write function should do its own error checking but to * make sure we do a check here as well... */ - size_t len = cs->zlibOutSize - zp->avail_out; + size_t len = cs->u.zlib.zlibOutSize - zp->avail_out; cs->writeF(AH, out, len); } zp->next_out = (void *) out; - zp->avail_out = cs->zlibOutSize; + zp->avail_out = cs->u.zlib.zlibOutSize; } if (res == Z_STREAM_END) @@ -537,8 +539,8 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen) { - cs->zp->next_in = (void *) unconstify(char *, data); - cs->zp->avail_in = dLen; + cs->u.zlib.zp->next_in = (void *) unconstify(char *, data); + cs->u.zlib.zp->avail_in = dLen; DeflateCompressorZlib(AH, cs, false); } @@ -898,7 +900,7 @@ cfdopen(int fd, const char *mode, Compress *compression) fp = NULL; } else - setvbuf(fp->uncompressedfp, NULL, _IONBF, 0); + setvbuf(fp->u.fp, NULL, _IONBF, 0); return fp; default: -- 2.17.0