On Sun, Jan 08, 2023 at 01:45:25PM -0600, Justin Pryzby wrote: > On Thu, Dec 22, 2022 at 11:08:59AM -0600, Justin Pryzby wrote: > > There's a couple of lz4 bits which shouldn't be present in 002: file > > extension and comments.
> BTW I noticed that cfdopen() was accidentally committed to compress_io.h > in master without being defined anywhere. This was resolved in 69fb29d1a (so now needs to be re-added for this patch series). > pg_compress_specification is being passed by value, but I think it > should be passed as a pointer, as is done everywhere else. ISTM that was an issue with 5e73a6048, affecting a few public and private functions. I wrote a pre-preparatory patch which changes to pass by reference. And addressed a handful of other issues I reported as separate fixup commits. And changed to use LZ4 by default for CI. I also rebased my 2 year old patch to support zstd in pg_dump. I hope it can finally added for v16. I'll send it for the next CF if these patches progress. One more thing: some comments still refer to the cfopen API, which this patch removes. > There were "LZ4" comments and file extension stuff in the preparatory > commit. But now it seems like you *removed* them in the LZ4 commit > (where it actually belongs) rather than *moving* it from the > prior/parent commit *to* the lz4 commit. I recommend to run something > like "git diff @{1}" whenever doing this kind of patch surgery. TODO > Maybe other places that check if (compression==PG_COMPRESSION_GZIP) > should maybe change to say compression!=NONE? > > _PrepParallelRestore() references ".gz", so I think it needs to be > retrofitted to handle .lz4. Ideally, that's built into a struct or list > of file extensions to try. Maybe compression.h should have a function > to return the file extension of a given algorithm. I'm planning to send > a patch for zstd, and hoping its changes will be minimized by these > preparatory commits. TODO > I think it's confusing to have two functions, one named > InitCompressLZ4() and InitCompressorLZ4(). TODO > pg_compress_algorithm is being writen directly into the pg_dump header. > Currently, I think that's not an externally-visible value (it could be > renumbered, theoretically even in a minor release). Maybe there should > be a "private" enum for encoding the pg_dump header, similar to > WAL_COMPRESSION_LZ4 vs BKPIMAGE_COMPRESS_LZ4 ? Or else a comment there > should warn that the values are encoded in pg_dump, and must never be > changed. Michael, WDYT ? -- Justin
>From 3105d480ab82093ca2873e423782f5b2edd9fbb7 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 14 Jan 2023 10:58:23 -0600 Subject: [PATCH 1/7] pg_dump: pass pg_compress_specification as a pointer.. ..as is done everywhere else. --- src/bin/pg_dump/compress_io.c | 30 +++++++++++++-------------- src/bin/pg_dump/compress_io.h | 8 +++---- src/bin/pg_dump/pg_backup.h | 2 +- src/bin/pg_dump/pg_backup_archiver.c | 12 +++++------ src/bin/pg_dump/pg_backup_custom.c | 6 +++--- src/bin/pg_dump/pg_backup_directory.c | 8 +++---- src/bin/pg_dump/pg_dump.c | 2 +- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 7a2c80bbc4c..62a9527fa48 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -94,19 +94,19 @@ static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, /* Allocate a new compressor */ CompressorState * -AllocateCompressor(const pg_compress_specification compression_spec, +AllocateCompressor(const pg_compress_specification *compression_spec, WriteFunc writeF) { CompressorState *cs; #ifndef HAVE_LIBZ - if (compression_spec.algorithm == PG_COMPRESSION_GZIP) + if (compression_spec->algorithm == PG_COMPRESSION_GZIP) pg_fatal("this build does not support compression with %s", "gzip"); #endif cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); cs->writeF = writeF; - cs->compression_spec = compression_spec; + cs->compression_spec = *compression_spec; // XXX /* * Perform compression algorithm specific initialization. @@ -125,12 +125,12 @@ AllocateCompressor(const pg_compress_specification compression_spec, */ void ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, ReadFunc readF) { - if (compression_spec.algorithm == PG_COMPRESSION_NONE) + if (compression_spec->algorithm == PG_COMPRESSION_NONE) ReadDataFromArchiveNone(AH, readF); - if (compression_spec.algorithm == PG_COMPRESSION_GZIP) + if (compression_spec->algorithm == PG_COMPRESSION_GZIP) { #ifdef HAVE_LIBZ ReadDataFromArchiveZlib(AH, readF); @@ -432,13 +432,13 @@ cfopen_read(const char *path, const char *mode) if (hasSuffix(path, ".gz")) { compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(path, mode, compression_spec); + fp = cfopen(path, mode, &compression_spec); } else #endif { compression_spec.algorithm = PG_COMPRESSION_NONE; - fp = cfopen(path, mode, compression_spec); + fp = cfopen(path, mode, &compression_spec); #ifdef HAVE_LIBZ if (fp == NULL) { @@ -446,7 +446,7 @@ cfopen_read(const char *path, const char *mode) fname = psprintf("%s.gz", path); compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(fname, mode, compression_spec); + fp = cfopen(fname, mode, &compression_spec); free_keep_errno(fname); } #endif @@ -467,11 +467,11 @@ cfopen_read(const char *path, const char *mode) */ cfp * cfopen_write(const char *path, const char *mode, - const pg_compress_specification compression_spec) + const pg_compress_specification *compression_spec) { cfp *fp; - if (compression_spec.algorithm == PG_COMPRESSION_NONE) + if (compression_spec->algorithm == PG_COMPRESSION_NONE) fp = cfopen(path, mode, compression_spec); else { @@ -497,20 +497,20 @@ cfopen_write(const char *path, const char *mode, */ cfp * cfopen(const char *path, const char *mode, - const pg_compress_specification compression_spec) + const pg_compress_specification *compression_spec) { cfp *fp = pg_malloc(sizeof(cfp)); - if (compression_spec.algorithm == PG_COMPRESSION_GZIP) + if (compression_spec->algorithm == PG_COMPRESSION_GZIP) { #ifdef HAVE_LIBZ - if (compression_spec.level != Z_DEFAULT_COMPRESSION) + if (compression_spec->level != Z_DEFAULT_COMPRESSION) { /* user has specified a compression level, so tell zlib to use it */ char mode_compression[32]; snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression_spec.level); + mode, compression_spec->level); fp->compressedfp = gzopen(path, mode_compression); } else diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index a429dc4789d..34f4e5e1e14 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -40,10 +40,10 @@ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); /* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; -extern CompressorState *AllocateCompressor(const pg_compress_specification compression_spec, +extern CompressorState *AllocateCompressor(const pg_compress_specification *compression_spec, WriteFunc writeF); extern void ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, ReadFunc readF); extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen); @@ -53,10 +53,10 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); typedef struct cfp cfp; extern cfp *cfopen(const char *path, const char *mode, - const pg_compress_specification compression_spec); + const pg_compress_specification *compression_spec); extern cfp *cfopen_read(const char *path, const char *mode); extern cfp *cfopen_write(const char *path, const char *mode, - const pg_compress_specification compression_spec); + const pg_compress_specification *compression_spec); extern int cfread(void *ptr, int size, cfp *fp); extern int cfwrite(const void *ptr, int size, cfp *fp); extern int cfgetc(cfp *fp); diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index aba780ef4b1..216e24e7ec5 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -305,7 +305,7 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt); /* Create a new archive */ extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 7f7a0f1ce7b..0d91b75c748 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -70,7 +70,7 @@ typedef struct _parallelReadyList static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr); static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te); @@ -241,7 +241,7 @@ setupRestoreWorker(Archive *AHX) /* Public */ Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker) @@ -261,7 +261,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt) pg_compress_specification compression_spec = {0}; compression_spec.algorithm = PG_COMPRESSION_NONE; - AH = _allocAH(FileSpec, fmt, compression_spec, true, + AH = _allocAH(FileSpec, fmt, &compression_spec, true, archModeRead, setupRestoreWorker); return (Archive *) AH; @@ -2214,7 +2214,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, - const pg_compress_specification compression_spec, + const pg_compress_specification *compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr) { @@ -2266,7 +2266,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->toc->prev = AH->toc; AH->mode = mode; - AH->compression_spec = compression_spec; + AH->compression_spec = *compression_spec; // XXX AH->dosync = dosync; memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); @@ -2281,7 +2281,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, * Force stdin/stdout into binary mode if that is what we are using. */ #ifdef WIN32 - if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) && + if ((fmt != archNull || compression_spec->algorithm != PG_COMPRESSION_NONE) && (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)) { if (mode == archModeWrite) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index d1e54644a94..0e87444de85 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression_spec, _CustomWriteFunc); } /* @@ -377,7 +377,7 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression_spec, _CustomWriteFunc); } /* @@ -566,7 +566,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, AH->compression_spec, _CustomReadFunc); + ReadDataFromArchive(AH, &AH->compression_spec, _CustomReadFunc); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 6800c3cceef..ffb8a0e4dd7 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -328,7 +328,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); ctx->dataFH = cfopen_write(fname, PG_BINARY_W, - AH->compression_spec); + &AH->compression_spec); if (ctx->dataFH == NULL) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -584,7 +584,7 @@ _CloseArchive(ArchiveHandle *AH) /* The TOC is always created uncompressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - tocFH = cfopen_write(fname, PG_BINARY_W, compression_spec); + tocFH = cfopen_write(fname, PG_BINARY_W, &compression_spec); if (tocFH == NULL) pg_fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -649,7 +649,7 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te) /* The LO TOC file is never compressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - ctx->LOsTocFH = cfopen_write(fname, "ab", compression_spec); + ctx->LOsTocFH = cfopen_write(fname, "ab", &compression_spec); if (ctx->LOsTocFH == NULL) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -667,7 +667,7 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression_spec); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression_spec); if (ctx->dataFH == NULL) pg_fatal("could not open output file \"%s\": %m", fname); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2c0a9699729..20f73729fac 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -751,7 +751,7 @@ main(int argc, char **argv) pg_fatal("parallel backup only supported by the directory format"); /* Open the output file */ - fout = CreateArchive(filename, archiveFormat, compression_spec, + fout = CreateArchive(filename, archiveFormat, &compression_spec, dosync, archiveMode, setupDumpWorker); /* Make dump options accessible right away */ -- 2.25.1
>From 6a8f2cd926be4f0b83f2d2d5170cf02a2a825036 Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos <gkokola...@pm.me> Date: Wed, 21 Dec 2022 09:49:23 +0000 Subject: [PATCH 2/7] Prepare pg_dump internals for additional compression methods. Commit bf9aa490db introduced cfp in compress_io.{c,h} with the intent of unifying compression related code and allow for the introduction of additional archive formats. However, pg_backup_archiver.c was not using that API. This commit teaches pg_backup_archiver.c about it and is using it throughout. --- src/bin/pg_dump/compress_io.c | 391 +++++++++++++++++++-------- src/bin/pg_dump/compress_io.h | 2 + src/bin/pg_dump/pg_backup_archiver.c | 136 ++++------ src/bin/pg_dump/pg_backup_archiver.h | 27 +- 4 files changed, 326 insertions(+), 230 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 62a9527fa48..97b18337578 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -56,6 +56,10 @@ #include "compress_io.h" #include "pg_backup_utils.h" +#ifdef HAVE_LIBZ +#include <zlib.h> +#endif + /*---------------------- * Compressor API *---------------------- @@ -128,15 +132,24 @@ ReadDataFromArchive(ArchiveHandle *AH, const pg_compress_specification *compression_spec, ReadFunc readF) { - if (compression_spec->algorithm == PG_COMPRESSION_NONE) - ReadDataFromArchiveNone(AH, readF); - if (compression_spec->algorithm == PG_COMPRESSION_GZIP) + switch (compression_spec->algorithm) { + case PG_COMPRESSION_NONE: + ReadDataFromArchiveNone(AH, readF); + break; + case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ - ReadDataFromArchiveZlib(AH, readF); + ReadDataFromArchiveZlib(AH, readF); #else - pg_fatal("this build does not support compression with %s", "gzip"); + pg_fatal("this build does not support compression with %s", "gzip"); #endif + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; } } @@ -149,6 +162,9 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, { switch (cs->compression_spec.algorithm) { + case PG_COMPRESSION_NONE: + WriteDataToArchiveNone(AH, cs, data, dLen); + break; case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ WriteDataToArchiveZlib(AH, cs, data, dLen); @@ -156,13 +172,11 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, pg_fatal("this build does not support compression with %s", "gzip"); #endif break; - case PG_COMPRESSION_NONE: - WriteDataToArchiveNone(AH, cs, data, dLen); - break; case PG_COMPRESSION_LZ4: - /* fallthrough */ + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; case PG_COMPRESSION_ZSTD: - pg_fatal("invalid compression method"); + pg_fatal("compression with %s is not yet supported", "ZSTD"); break; } } @@ -173,10 +187,26 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, void EndCompressor(ArchiveHandle *AH, CompressorState *cs) { + switch (cs->compression_spec.algorithm) + { + case PG_COMPRESSION_NONE: + break; + case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ - if (cs->compression_spec.algorithm == PG_COMPRESSION_GZIP) - EndCompressorZlib(AH, cs); + EndCompressorZlib(AH, cs); +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; + } + free(cs); } @@ -391,10 +421,8 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, */ struct cfp { - FILE *uncompressedfp; -#ifdef HAVE_LIBZ - gzFile compressedfp; -#endif + pg_compress_specification compression_spec; + void *fp; }; #ifdef HAVE_LIBZ @@ -490,127 +518,204 @@ cfopen_write(const char *path, const char *mode, } /* - * Opens file 'path' in 'mode'. If compression is GZIP, the file - * is opened with libz gzopen(), otherwise with plain fopen(). + * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or + * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The + * descriptor is not dup'ed and it is the caller's responsibility to do so. + * The caller must verify that the 'compress_algorithm' is supported by the + * current build. * * On failure, return NULL with an error code in errno. */ -cfp * -cfopen(const char *path, const char *mode, - const pg_compress_specification *compression_spec) +static cfp * +cfopen_internal(const char *path, int fd, const char *mode, + const pg_compress_specification *compression_spec) { cfp *fp = pg_malloc(sizeof(cfp)); - if (compression_spec->algorithm == PG_COMPRESSION_GZIP) + fp->compression_spec = *compression_spec; + + switch (compression_spec->algorithm) { -#ifdef HAVE_LIBZ - if (compression_spec->level != Z_DEFAULT_COMPRESSION) - { - /* user has specified a compression level, so tell zlib to use it */ - char mode_compression[32]; + case PG_COMPRESSION_NONE: + if (fd >= 0) + fp->fp = fdopen(fd, mode); + else + fp->fp = fopen(path, mode); + if (fp->fp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } - snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression_spec->level); - fp->compressedfp = gzopen(path, mode_compression); - } - else - { - /* don't specify a level, just use the zlib default */ - fp->compressedfp = gzopen(path, mode); - } + break; + case PG_COMPRESSION_GZIP: +#ifdef HAVE_LIBZ + if (compression_spec->level != Z_DEFAULT_COMPRESSION) + { + /* + * user has specified a compression level, so tell zlib to use + * it + */ + char mode_compression[32]; + + snprintf(mode_compression, sizeof(mode_compression), "%s%d", + mode, compression_spec->level); + if (fd >= 0) + fp->fp = gzdopen(fd, mode_compression); + // fp->compressedfp = gzopen(path, mode_compression); + else + fp->fp = gzopen(path, mode_compression); + } + else + { + /* don't specify a level, just use the zlib default */ + if (fd >= 0) + fp->fp = gzdopen(fd, mode); + else + fp->fp = gzopen(path, mode); + // fp->compressedfp = gzopen(path, mode); + } - fp->uncompressedfp = NULL; - if (fp->compressedfp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } + if (fp->fp == NULL) + { + free_keep_errno(fp); + fp = NULL; + } #else - pg_fatal("this build does not support compression with %s", "gzip"); -#endif - } - else - { -#ifdef HAVE_LIBZ - fp->compressedfp = NULL; + pg_fatal("this build does not support compression with %s", "gzip"); #endif - fp->uncompressedfp = fopen(path, mode); - if (fp->uncompressedfp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; } return fp; } +cfp * +cfopen(const char *path, const char *mode, + const pg_compress_specification *compression_spec) +{ + return cfopen_internal(path, -1, mode, compression_spec); +} + +cfp * +cfdopen(int fd, const char *mode, + const pg_compress_specification *compression_spec) +{ + return cfopen_internal(NULL, fd, mode, compression_spec); +} int cfread(void *ptr, int size, cfp *fp) { - int ret; + int ret = 0; if (size == 0) return 0; -#ifdef HAVE_LIBZ - if (fp->compressedfp) + switch (fp->compression_spec.algorithm) { - ret = gzread(fp->compressedfp, ptr, size); - if (ret != size && !gzeof(fp->compressedfp)) - { - int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + case PG_COMPRESSION_NONE: + ret = fread(ptr, 1, size, (FILE *) fp->fp); + if (ret != size && !feof((FILE *) fp->fp)) + READ_ERROR_EXIT((FILE *) fp->fp); - pg_fatal("could not read from input file: %s", - errnum == Z_ERRNO ? strerror(errno) : errmsg); - } - } - else + break; + case PG_COMPRESSION_GZIP: +#ifdef HAVE_LIBZ + ret = gzread((gzFile) fp->fp, ptr, size); + if (ret != size && !gzeof((gzFile) fp->fp)) + { + int errnum; + const char *errmsg = gzerror((gzFile) fp->fp, &errnum); + + pg_fatal("could not read from input file: %s", + errnum == Z_ERRNO ? strerror(errno) : errmsg); + } +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - { - ret = fread(ptr, 1, size, fp->uncompressedfp); - if (ret != size && !feof(fp->uncompressedfp)) - READ_ERROR_EXIT(fp->uncompressedfp); + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; } + return ret; } int cfwrite(const void *ptr, int size, cfp *fp) { + int ret = 0; + + switch (fp->compression_spec.algorithm) + { + case PG_COMPRESSION_NONE: + ret = fwrite(ptr, 1, size, (FILE *) fp->fp); + break; + case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzwrite(fp->compressedfp, ptr, size); - else + ret = gzwrite((gzFile) fp->fp, ptr, size); +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - return fwrite(ptr, 1, size, fp->uncompressedfp); + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; + } + + return ret; } int cfgetc(cfp *fp) { - int ret; + int ret = 0; -#ifdef HAVE_LIBZ - if (fp->compressedfp) + switch (fp->compression_spec.algorithm) { - ret = gzgetc(fp->compressedfp); - if (ret == EOF) - { - if (!gzeof(fp->compressedfp)) - pg_fatal("could not read from input file: %s", strerror(errno)); - else - pg_fatal("could not read from input file: end of file"); - } - } - else + case PG_COMPRESSION_NONE: + ret = fgetc((FILE *) fp->fp); + if (ret == EOF) + READ_ERROR_EXIT((FILE *) fp->fp); + + break; + case PG_COMPRESSION_GZIP: +#ifdef HAVE_LIBZ + ret = gzgetc((gzFile) fp->fp); + if (ret == EOF) + { + if (!gzeof((gzFile) fp->fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); + } +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - { - ret = fgetc(fp->uncompressedfp); - if (ret == EOF) - READ_ERROR_EXIT(fp->uncompressedfp); + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; } return ret; @@ -619,65 +724,119 @@ cfgetc(cfp *fp) char * cfgets(cfp *fp, char *buf, int len) { + char *ret = NULL; + + switch (fp->compression_spec.algorithm) + { + case PG_COMPRESSION_NONE: + ret = fgets(buf, len, (FILE *) fp->fp); + + break; + case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzgets(fp->compressedfp, buf, len); - else + ret = gzgets((gzFile) fp->fp, buf, len); +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - return fgets(buf, len, fp->uncompressedfp); + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; + } + + return ret; } int cfclose(cfp *fp) { - int result; + int ret = 0; if (fp == NULL) { errno = EBADF; return EOF; } -#ifdef HAVE_LIBZ - if (fp->compressedfp) + + switch (fp->compression_spec.algorithm) { - result = gzclose(fp->compressedfp); - fp->compressedfp = NULL; - } - else + case PG_COMPRESSION_NONE: + ret = fclose((FILE *) fp->fp); + fp->fp = NULL; + + break; + case PG_COMPRESSION_GZIP: +#ifdef HAVE_LIBZ + ret = gzclose((gzFile) fp->fp); + fp->fp = NULL; +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - { - result = fclose(fp->uncompressedfp); - fp->uncompressedfp = NULL; + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; } + free_keep_errno(fp); - return result; + return ret; } int cfeof(cfp *fp) { + int ret = 0; + + switch (fp->compression_spec.algorithm) + { + case PG_COMPRESSION_NONE: + ret = feof((FILE *) fp->fp); + + break; + case PG_COMPRESSION_GZIP: #ifdef HAVE_LIBZ - if (fp->compressedfp) - return gzeof(fp->compressedfp); - else + ret = gzeof((gzFile) fp->fp); +#else + pg_fatal("this build does not support compression with %s", + "gzip"); #endif - return feof(fp->uncompressedfp); + break; + case PG_COMPRESSION_LZ4: + pg_fatal("compression with %s is not yet supported", "LZ4"); + break; + case PG_COMPRESSION_ZSTD: + pg_fatal("compression with %s is not yet supported", "ZSTD"); + break; + } + + return ret; } const char * get_cfp_error(cfp *fp) { -#ifdef HAVE_LIBZ - if (fp->compressedfp) + if (fp->compression_spec.algorithm == PG_COMPRESSION_GZIP) { +#ifdef HAVE_LIBZ int errnum; - const char *errmsg = gzerror(fp->compressedfp, &errnum); + const char *errmsg = gzerror((gzFile) fp->fp, &errnum); if (errnum != Z_ERRNO) return errmsg; - } +#else + pg_fatal("this build does not support compression with %s", "gzip"); #endif + } + return strerror(errno); } diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 34f4e5e1e14..768096c820d 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -54,6 +54,8 @@ typedef struct cfp cfp; extern cfp *cfopen(const char *path, const char *mode, const pg_compress_specification *compression_spec); +extern cfp *cfdopen(int fd, const char *mode, + const pg_compress_specification *compression_spec); extern cfp *cfopen_read(const char *path, const char *mode); extern cfp *cfopen_write(const char *path, const char *mode, const pg_compress_specification *compression_spec); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 0d91b75c748..cbe110c917a 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -31,6 +31,7 @@ #endif #include "common/string.h" +#include "compress_io.h" #include "dumputils.h" #include "fe_utils/string_utils.h" #include "lib/stringinfo.h" @@ -43,13 +44,6 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* state needed to save/restore an archive's output target */ -typedef struct _outputContext -{ - void *OF; - int gzOut; -} OutputContext; - /* * State for tracking TocEntrys that are ready to process during a parallel * restore. (This used to be a list, and we still call it that, though now @@ -100,9 +94,9 @@ static int RestoringToDB(ArchiveHandle *AH); static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, - const pg_compress_specification compression_spec); -static OutputContext SaveOutput(ArchiveHandle *AH); -static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); + const pg_compress_specification *compression_spec); +static cfp *SaveOutput(ArchiveHandle *AH); +static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); static void restore_toc_entries_prefork(ArchiveHandle *AH, @@ -277,11 +271,8 @@ CloseArchive(Archive *AHX) AH->ClosePtr(AH); /* Close the output */ - errno = 0; /* in case gzclose() doesn't set it */ - if (AH->gzOut) - res = GZCLOSE(AH->OF); - else if (AH->OF != stdout) - res = fclose(AH->OF); + errno = 0; + res = cfclose(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -362,8 +353,9 @@ RestoreArchive(Archive *AHX) ArchiveHandle *AH = (ArchiveHandle *) AHX; RestoreOptions *ropt = AH->public.ropt; bool parallel_mode; + bool supports_compression; TocEntry *te; - OutputContext sav; + cfp *sav; AH->stage = STAGE_INITIALIZING; @@ -391,17 +383,24 @@ RestoreArchive(Archive *AHX) /* * Make sure we won't need (de)compression we haven't got */ -#ifndef HAVE_LIBZ - if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP && + supports_compression = true; + if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE && + AH->compression_spec.algorithm == PG_COMPRESSION_GZIP && AH->PrintTocDataPtr != NULL) { for (te = AH->toc->next; te != AH->toc; te = te->next) { if (te->hadDumper && (te->reqs & REQ_DATA) != 0) - pg_fatal("cannot restore from compressed archive (compression not supported in this installation)"); + { +#ifndef HAVE_LIBZ + if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) + supports_compression = false; +#endif + if (supports_compression == false) + pg_fatal("cannot restore from compressed archive (compression not supported in this installation)"); + } } } -#endif /* * Prepare index arrays, so we can assume we have them throughout restore. @@ -469,7 +468,7 @@ RestoreArchive(Archive *AHX) */ sav = SaveOutput(AH); if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE) - SetOutput(AH, ropt->filename, ropt->compression_spec); + SetOutput(AH, ropt->filename, &ropt->compression_spec); ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n"); @@ -1128,7 +1127,7 @@ PrintTOCSummary(Archive *AHX) TocEntry *te; pg_compress_specification out_compression_spec = {0}; teSection curSection; - OutputContext sav; + cfp *sav; const char *fmtName; char stamp_str[64]; @@ -1137,7 +1136,7 @@ PrintTOCSummary(Archive *AHX) sav = SaveOutput(AH); if (ropt->filename) - SetOutput(AH, ropt->filename, out_compression_spec); + SetOutput(AH, ropt->filename, &out_compression_spec); if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT, localtime(&AH->createDate)) == 0) @@ -1501,60 +1500,34 @@ archprintf(Archive *AH, const char *fmt,...) static void SetOutput(ArchiveHandle *AH, const char *filename, - const pg_compress_specification compression_spec) + const pg_compress_specification *compression_spec) { - int fn; + const char *mode; + int fn = -1; if (filename) { if (strcmp(filename, "-") == 0) fn = fileno(stdout); - else - fn = -1; } else if (AH->FH) fn = fileno(AH->FH); else if (AH->fSpec) { - fn = -1; filename = AH->fSpec; } else fn = fileno(stdout); - /* If compression explicitly requested, use gzopen */ -#ifdef HAVE_LIBZ - if (compression_spec.algorithm == PG_COMPRESSION_GZIP) - { - char fmode[14]; + if (AH->mode == archModeAppend) + mode = PG_BINARY_A; + else + mode = PG_BINARY_W; - /* Don't use PG_BINARY_x since this is zlib */ - sprintf(fmode, "wb%d", compression_spec.level); - if (fn >= 0) - AH->OF = gzdopen(dup(fn), fmode); - else - AH->OF = gzopen(filename, fmode); - AH->gzOut = 1; - } + if (fn >= 0) + AH->OF = cfdopen(dup(fn), mode, compression_spec); else -#endif - { /* Use fopen */ - if (AH->mode == archModeAppend) - { - if (fn >= 0) - AH->OF = fdopen(dup(fn), PG_BINARY_A); - else - AH->OF = fopen(filename, PG_BINARY_A); - } - else - { - if (fn >= 0) - AH->OF = fdopen(dup(fn), PG_BINARY_W); - else - AH->OF = fopen(filename, PG_BINARY_W); - } - AH->gzOut = 0; - } + AH->OF = cfopen(filename, mode, compression_spec); if (!AH->OF) { @@ -1565,33 +1538,24 @@ SetOutput(ArchiveHandle *AH, const char *filename, } } -static OutputContext +static cfp * SaveOutput(ArchiveHandle *AH) { - OutputContext sav; - - sav.OF = AH->OF; - sav.gzOut = AH->gzOut; - - return sav; + return (cfp *) AH->OF; } static void -RestoreOutput(ArchiveHandle *AH, OutputContext savedContext) +RestoreOutput(ArchiveHandle *AH, cfp *savedOutput) { int res; - errno = 0; /* in case gzclose() doesn't set it */ - if (AH->gzOut) - res = GZCLOSE(AH->OF); - else - res = fclose(AH->OF); + errno = 0; + res = cfclose(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); - AH->gzOut = savedContext.gzOut; - AH->OF = savedContext.OF; + AH->OF = savedOutput; } @@ -1715,22 +1679,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) bytes_written = size * nmemb; } - else if (AH->gzOut) - bytes_written = GZWRITE(ptr, size, nmemb, AH->OF); else if (AH->CustomOutPtr) bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb); + /* + * If we're doing a restore, and it's direct to DB, and we're connected + * then send it to the DB. + */ + else if (RestoringToDB(AH)) + bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); else - { - /* - * If we're doing a restore, and it's direct to DB, and we're - * connected then send it to the DB. - */ - if (RestoringToDB(AH)) - bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); - else - bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size; - } + bytes_written = cfwrite(ptr, size * nmemb, AH->OF); if (bytes_written != size * nmemb) WRITE_ERROR_EXIT; @@ -2219,6 +2178,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; + pg_compress_specification out_compress_spec = {0}; pg_log_debug("allocating AH for %s, format %d", FileSpec ? FileSpec : "(stdio)", fmt); @@ -2272,8 +2232,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse)); /* Open stdout with no compression for AH output handle */ - AH->gzOut = 0; - AH->OF = stdout; + out_compress_spec.algorithm = PG_COMPRESSION_NONE; + AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, &out_compress_spec); /* * On Windows, we need to use binary mode to read/write non-text files, diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index f72446ed5b4..4725e49747b 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -32,30 +32,6 @@ #define LOBBUFSIZE 16384 -#ifdef HAVE_LIBZ -#include <zlib.h> -#define GZCLOSE(fh) gzclose(fh) -#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s)) -#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s)) -#define GZEOF(fh) gzeof(fh) -#else -#define GZCLOSE(fh) fclose(fh) -#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s)) -#define GZREAD(p, s, n, fh) fread(p, s, n, fh) -#define GZEOF(fh) feof(fh) -/* this is just the redefinition of a libz constant */ -#define Z_DEFAULT_COMPRESSION (-1) - -typedef struct _z_stream -{ - void *next_in; - void *next_out; - size_t avail_in; - size_t avail_out; -} z_stream; -typedef z_stream *z_streamp; -#endif - /* Data block types */ #define BLK_DATA 1 #define BLK_BLOBS 3 @@ -319,8 +295,7 @@ struct _archiveHandle char *fSpec; /* Archive File Spec */ FILE *FH; /* General purpose file handle */ - void *OF; - int gzOut; /* Output file */ + void *OF; /* Output file */ struct _tocEntry *toc; /* Header of circular list of TOC entries */ int tocCount; /* Number of TOC entries */ -- 2.25.1
>From 6bcd16aa5d94b2fee99ed34fc8a76a757f569cb6 Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos <gkokola...@pm.me> Date: Wed, 21 Dec 2022 09:49:31 +0000 Subject: [PATCH 3/7] Introduce Compressor API in pg_dump The purpose of this API is to allow for easier addition of new compression methods. CompressFileHandle is substituting the cfp* family of functions under a struct of function pointers for opening, writing, etc. The implementor of a new compression method is now able to "simply" just add those definitions. Custom compressed archives now need to store the compression algorithm in their header. This requires a bump in the version number. The level of compression is no longer stored in the dump as it is irrelevant. --- src/bin/pg_dump/Makefile | 1 + src/bin/pg_dump/compress_gzip.c | 405 ++++++++++++ src/bin/pg_dump/compress_gzip.h | 22 + src/bin/pg_dump/compress_io.c | 916 +++++++------------------- src/bin/pg_dump/compress_io.h | 71 +- src/bin/pg_dump/meson.build | 1 + src/bin/pg_dump/pg_backup_archiver.c | 102 +-- src/bin/pg_dump/pg_backup_archiver.h | 5 +- src/bin/pg_dump/pg_backup_custom.c | 23 +- src/bin/pg_dump/pg_backup_directory.c | 94 +-- src/bin/pg_dump/t/002_pg_dump.pl | 10 +- src/tools/pginclude/cpluspluscheck | 1 + src/tools/pgindent/typedefs.list | 2 + 13 files changed, 852 insertions(+), 801 deletions(-) create mode 100644 src/bin/pg_dump/compress_gzip.c create mode 100644 src/bin/pg_dump/compress_gzip.h diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index ef1ed0f3e51..7a19f5d6172 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -24,6 +24,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ + compress_gzip.o \ compress_io.o \ dumputils.o \ parallel.o \ diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c new file mode 100644 index 00000000000..37c841c5a9b --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.c @@ -0,0 +1,405 @@ +/*------------------------------------------------------------------------- + * + * compress_gzip.c + * Routines for archivers to write an uncompressed or compressed data + * stream. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_gzip.c + * + *------------------------------------------------------------------------- + */ +#include "postgres_fe.h" +#include <unistd.h> + +#include "compress_gzip.h" +#include "pg_backup_utils.h" + +#ifdef HAVE_LIBZ +#include "zlib.h" + +/*---------------------- + * Compressor API + *---------------------- + */ +typedef struct GzipCompressorState +{ + z_streamp zp; + + void *outbuf; + size_t outsize; +} GzipCompressorState; + +/* Private routines that support gzip compressed data I/O */ +static void +DeflateCompressorGzip(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private_data; + z_streamp zp = gzipcs->zp; + void *out = gzipcs->outbuf; + int res = Z_OK; + + while (gzipcs->zp->avail_in != 0 || flush) + { + res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); + if (res == Z_STREAM_ERROR) + pg_fatal("could not compress data: %s", zp->msg); + if ((flush && (zp->avail_out < gzipcs->outsize)) + || (zp->avail_out == 0) + || (zp->avail_in != 0) + ) + { + /* + * Extra paranoia: avoid zero-length chunks, since a zero length + * chunk is the EOF marker in the custom format. This should never + * happen but... + */ + if (zp->avail_out < gzipcs->outsize) + { + /* + * Any write function should do its own error checking but to + * make sure we do a check here as well... + */ + size_t len = gzipcs->outsize - zp->avail_out; + + cs->writeF(AH, (char *) out, len); + } + zp->next_out = out; + zp->avail_out = gzipcs->outsize; + } + + if (res == Z_STREAM_END) + break; + } +} + +static void +EndCompressorGzip(ArchiveHandle *AH, CompressorState *cs) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private_data; + z_streamp zp; + + if (gzipcs->zp) + { + zp = gzipcs->zp; + zp->next_in = NULL; + zp->avail_in = 0; + + /* Flush any remaining data from zlib buffer */ + DeflateCompressorGzip(AH, cs, true); + + if (deflateEnd(zp) != Z_OK) + pg_fatal("could not close compression stream: %s", zp->msg); + + pg_free(gzipcs->outbuf); + pg_free(gzipcs->zp); + } + + pg_free(gzipcs); + cs->private_data = NULL; +} + +static void +WriteDataToArchiveGzip(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private_data; + z_streamp zp; + + if (!gzipcs->zp) + { + zp = gzipcs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + /* + * outsize is the buffer size we tell zlib it can output to. We + * actually allocate one extra byte because some routines want to + * append a trailing zero byte to the zlib output. + */ + gzipcs->outbuf = pg_malloc(ZLIB_OUT_SIZE + 1); + gzipcs->outsize = ZLIB_OUT_SIZE; + + /* + * A level of zero simply copies the input one block at the time. This + * is probably not what the user wanted when calling this interface. + */ + if (cs->compression_spec.level == 0) + pg_fatal("requested to compress the archive yet no level was specified"); + + if (deflateInit(zp, cs->compression_spec.level) != Z_OK) + pg_fatal("could not initialize compression library: %s", zp->msg); + + /* Just be paranoid - maybe End is called after Start, with no Write */ + zp->next_out = gzipcs->outbuf; + zp->avail_out = gzipcs->outsize; + } + + gzipcs->zp->next_in = (void *) unconstify(void *, data); + gzipcs->zp->avail_in = dLen; + DeflateCompressorGzip(AH, cs, false); +} + +static void +ReadDataFromArchiveGzip(ArchiveHandle *AH, CompressorState *cs) +{ + z_streamp zp; + char *out; + int res = Z_OK; + size_t cnt; + char *buf; + size_t buflen; + + zp = (z_streamp) pg_malloc(sizeof(z_stream)); + zp->zalloc = Z_NULL; + zp->zfree = Z_NULL; + zp->opaque = Z_NULL; + + buf = pg_malloc(ZLIB_IN_SIZE); + buflen = ZLIB_IN_SIZE; + + out = pg_malloc(ZLIB_OUT_SIZE + 1); + + if (inflateInit(zp) != Z_OK) + pg_fatal("could not initialize compression library: %s", + zp->msg); + + /* no minimal chunk size for zlib */ + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + zp->next_in = (void *) buf; + zp->avail_in = cnt; + + while (zp->avail_in > 0) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + } + + zp->next_in = NULL; + zp->avail_in = 0; + while (res != Z_STREAM_END) + { + zp->next_out = (void *) out; + zp->avail_out = ZLIB_OUT_SIZE; + res = inflate(zp, 0); + if (res != Z_OK && res != Z_STREAM_END) + pg_fatal("could not uncompress data: %s", zp->msg); + + out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; + ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + } + + if (inflateEnd(zp) != Z_OK) + pg_fatal("could not close compression library: %s", zp->msg); + + free(buf); + free(out); + free(zp); +} + +/* Public routines that support gzip compressed data I/O */ +void +InitCompressorGzip(CompressorState *cs, const pg_compress_specification *compression_spec) +{ + GzipCompressorState *gzipcs; + + cs->readData = ReadDataFromArchiveGzip; + cs->writeData = WriteDataToArchiveGzip; + cs->end = EndCompressorGzip; + + cs->compression_spec = *compression_spec; + + gzipcs = (GzipCompressorState *) pg_malloc0(sizeof(GzipCompressorState)); + + cs->private_data = gzipcs; +} + + +/*---------------------- + * Compress File API + *---------------------- + */ + +static size_t +Gzip_read(void *ptr, size_t size, CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + size_t ret; + + ret = gzread(gzfp, ptr, size); + if (ret != size && !gzeof(gzfp)) + { + int errnum; + const char *errmsg = gzerror(gzfp, &errnum); + + pg_fatal("could not read from input file: %s", + errnum == Z_ERRNO ? strerror(errno) : errmsg); + } + + return ret; +} + +static size_t +Gzip_write(const void *ptr, size_t size, CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + + return gzwrite(gzfp, ptr, size); +} + +static int +Gzip_getc(CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + int ret; + + errno = 0; + ret = gzgetc(gzfp); + if (ret == EOF) + { + if (!gzeof(gzfp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); + } + + return ret; +} + +static char * +Gzip_gets(char *ptr, int size, CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + + return gzgets(gzfp, ptr, size); +} + +static int +Gzip_close(CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + int save_errno; + int ret; + + CFH->private_data = NULL; + + ret = gzclose(gzfp); + + save_errno = errno; + errno = save_errno; + + return ret; +} + +static int +Gzip_eof(CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + + return gzeof(gzfp); +} + +static const char * +Gzip_get_error(CompressFileHandle *CFH) +{ + gzFile gzfp = (gzFile) CFH->private_data; + const char *errmsg; + int errnum; + + errmsg = gzerror(gzfp, &errnum); + if (errnum == Z_ERRNO) + errmsg = strerror(errno); + + return errmsg; +} + +static int +Gzip_open(const char *path, int fd, const char *mode, CompressFileHandle *CFH) +{ + gzFile gzfp; + char mode_compression[32]; + + if (CFH->compression_spec.level != Z_DEFAULT_COMPRESSION) + { + /* + * user has specified a compression level, so tell zlib to use it + */ + snprintf(mode_compression, sizeof(mode_compression), "%s%d", + mode, CFH->compression_spec.level); + } + else + strcpy(mode_compression, mode); + + if (fd >= 0) + gzfp = gzdopen(dup(fd), mode_compression); + else + gzfp = gzopen(path, mode_compression); + + if (gzfp == NULL) + return 1; + + CFH->private_data = gzfp; + + return 0; +} + +static int +Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH) +{ + char *fname; + int ret; + int save_errno; + + fname = psprintf("%s.gz", path); + ret = CFH->open_func(fname, -1, mode, CFH); + + save_errno = errno; + pg_free(fname); + errno = save_errno; + + return ret; +} + +void +InitCompressGzip(CompressFileHandle *CFH, const pg_compress_specification *compression_spec) +{ + CFH->open_func = Gzip_open; + CFH->open_write_func = Gzip_open_write; + CFH->read_func = Gzip_read; + CFH->write_func = Gzip_write; + CFH->gets_func = Gzip_gets; + CFH->getc_func = Gzip_getc; + CFH->close_func = Gzip_close; + CFH->eof_func = Gzip_eof; + CFH->get_error_func = Gzip_get_error; + + CFH->compression_spec = *compression_spec; + + CFH->private_data = NULL; +} +#else /* HAVE_LIBZ */ +void +InitCompressorGzip(CompressorState *cs, const pg_compress_specification *compression_spec) +{ + pg_fatal("this build does not support compression with %s", "gzip"); +} + +void +InitCompressGzip(CompressFileHandle *CFH, const pg_compress_specification *compression_spec) +{ + pg_fatal("this build does not support compression with %s", "gzip"); +} +#endif /* HAVE_LIBZ */ diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h new file mode 100644 index 00000000000..a1fc3595e51 --- /dev/null +++ b/src/bin/pg_dump/compress_gzip.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * compress_gzip.h + * Interface to compress_io.c routines + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_gzip.h + * + *------------------------------------------------------------------------- + */ +#ifndef _COMPRESS_GZIP_H_ +#define _COMPRESS_GZIP_H_ + +#include "compress_io.h" + +extern void InitCompressorGzip(CompressorState *cs, const pg_compress_specification *compression_spec); +extern void InitCompressGzip(CompressFileHandle *CFH, const pg_compress_specification *compression_spec); + +#endif /* _COMPRESS_GZIP_H_ */ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 97b18337578..576a8653193 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -9,42 +9,44 @@ * * This file includes two APIs for dealing with compressed data. The first * provides more flexibility, using callbacks to read/write data from the - * underlying stream. The second API is a wrapper around fopen/gzopen and + * underlying stream. The second API is a wrapper around fopen and * friends, providing an interface similar to those, but abstracts away - * the possible compression. Both APIs use libz for the compression, but - * the second API uses gzip headers, so the resulting files can be easily - * manipulated with the gzip utility. + * the possible compression. The second API is aimed for the resulting + * files can be easily manipulated with an external compression utility + * program. * * Compressor API * -------------- * * The interface for writing to an archive consists of three functions: - * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call - * AllocateCompressor, then write all the data by calling WriteDataToArchive - * as many times as needed, and finally EndCompressor. WriteDataToArchive - * and EndCompressor will call the WriteFunc that was provided to - * AllocateCompressor for each chunk of compressed data. + * AllocateCompressor, writeData, and EndCompressor. First you call + * AllocateCompressor, then write all the data by calling writeData as many + * times as needed, and finally EndCompressor. writeData will call the + * WriteFunc that was provided to AllocateCompressor for each chunk of + * compressed data. * - * The interface for reading an archive consists of just one function: - * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input - * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the - * compressed data chunk at a time, and ReadDataFromArchive decompresses it - * and passes the decompressed data to ahwrite(), until ReadFunc returns 0 - * to signal EOF. - * - * The interface is the same for compressed and uncompressed streams. + * The interface for reading an archive consists of the same three functions: + * AllocateCompressor, readData, and EndCompressor. First you call + * AllocateCompressor, then read all the data by calling readData to read the + * whole compressed stream which repeatedly calls the given ReadFunc. ReadFunc + * returns the compressed data chunk at a time, and readData decompresses it + * and passes the decompressed data to ahwrite(), until ReadFunc returns 0 to + * signal EOF. The interface is the same for compressed and uncompressed + * streams. * * Compressed stream API * ---------------------- * * The compressed stream API is a wrapper around the C standard fopen() and - * libz's gzopen() APIs. It allows you to use the same functions for - * compressed and uncompressed streams. cfopen_read() first tries to open - * the file with given name, and if it fails, it tries to open the same - * file with the .gz suffix. cfopen_write() opens a file for writing, an - * extra argument specifies if the file should be compressed, and adds the - * .gz suffix to the filename if so. This allows you to easily handle both - * compressed and uncompressed files. + * libz's gzopen() APIs and custom LZ4 calls which provide similar + * functionality. It allows you to use the same functions for compressed and + * uncompressed streams. cfopen_read() first tries to open the file with given + * name, and if it fails, it tries to open the same file with the .gz suffix, + * failing that it tries to open the same file with the .lz4 suffix. + * cfopen_write() opens a file for writing, an extra argument specifies the + * method to use should the file be compressed, and adds the appropriate + * suffix, .gz or .lz4, to the filename if so. This allows you to easily handle + * both compressed and uncompressed files. * * IDENTIFICATION * src/bin/pg_dump/compress_io.c @@ -53,7 +55,11 @@ */ #include "postgres_fe.h" +#include <sys/stat.h> +#include <unistd.h> + #include "compress_io.h" +#include "compress_gzip.h" #include "pg_backup_utils.h" #ifdef HAVE_LIBZ @@ -65,84 +71,70 @@ *---------------------- */ -/* typedef appears in compress_io.h */ -struct CompressorState +static void +ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs) { - pg_compress_specification compression_spec; - WriteFunc writeF; + size_t cnt; + char *buf; + size_t buflen; -#ifdef HAVE_LIBZ - z_streamp zp; - char *zlibOut; - size_t zlibOutSize; -#endif -}; + buf = pg_malloc(ZLIB_OUT_SIZE); + buflen = ZLIB_OUT_SIZE; + + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + ahwrite(buf, 1, cnt, AH); + } + + free(buf); +} -/* Routines that support zlib compressed data I/O */ -#ifdef HAVE_LIBZ -static void InitCompressorZlib(CompressorState *cs, int level); -static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, - bool flush); -static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); -static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs); -#endif -/* Routines that support uncompressed data I/O */ -static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF); -static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen); +static void +WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + cs->writeF(AH, data, dLen); +} + +static void +EndCompressorNone(ArchiveHandle *AH, CompressorState *cs) +{ + /* no op */ +} + +static void +InitCompressorNone(CompressorState *cs, + const pg_compress_specification *compression_spec) +{ + cs->readData = ReadDataFromArchiveNone; + cs->writeData = WriteDataToArchiveNone; + cs->end = EndCompressorNone; + + cs->compression_spec = *compression_spec; +} /* Public interface routines */ /* Allocate a new compressor */ CompressorState * AllocateCompressor(const pg_compress_specification *compression_spec, - WriteFunc writeF) + ReadFunc readF, WriteFunc writeF) { CompressorState *cs; -#ifndef HAVE_LIBZ - if (compression_spec->algorithm == PG_COMPRESSION_GZIP) - pg_fatal("this build does not support compression with %s", "gzip"); -#endif - cs = (CompressorState *) pg_malloc0(sizeof(CompressorState)); + cs->readF = readF; cs->writeF = writeF; cs->compression_spec = *compression_spec; // XXX - /* - * Perform compression algorithm specific initialization. - */ -#ifdef HAVE_LIBZ - if (cs->compression_spec.algorithm == PG_COMPRESSION_GZIP) - InitCompressorZlib(cs, cs->compression_spec.level); -#endif - - return cs; -} - -/* - * Read all compressed data from the input stream (via readF) and print it - * out with ahwrite(). - */ -void -ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification *compression_spec, - ReadFunc readF) -{ switch (compression_spec->algorithm) { case PG_COMPRESSION_NONE: - ReadDataFromArchiveNone(AH, readF); + InitCompressorNone(cs, compression_spec); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ReadDataFromArchiveZlib(AH, readF); -#else - pg_fatal("this build does not support compression with %s", "gzip"); -#endif + InitCompressorGzip(cs, compression_spec); break; case PG_COMPRESSION_LZ4: pg_fatal("compression with %s is not yet supported", "LZ4"); @@ -151,34 +143,8 @@ ReadDataFromArchive(ArchiveHandle *AH, pg_fatal("compression with %s is not yet supported", "ZSTD"); break; } -} -/* - * Compress and write data to the output stream (via writeF). - */ -void -WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen) -{ - switch (cs->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - WriteDataToArchiveNone(AH, cs, data, dLen); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - WriteDataToArchiveZlib(AH, cs, data, dLen); -#else - pg_fatal("this build does not support compression with %s", "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } + return cs; } /* @@ -187,403 +153,177 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, void EndCompressor(ArchiveHandle *AH, CompressorState *cs) { - switch (cs->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - EndCompressorZlib(AH, cs); -#else - pg_fatal("this build does not support compression with %s", - "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } - - free(cs); + cs->end(AH, cs); + pg_free(cs); } -/* Private routines, specific to each compression method. */ - -#ifdef HAVE_LIBZ -/* - * Functions for zlib compressed output. +/*---------------------- + * Compressed stream API + *---------------------- */ -static void -InitCompressorZlib(CompressorState *cs, int level) +static int +hasSuffix(const char *filename, const char *suffix) { - z_streamp zp; - - zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - /* - * zlibOutSize is the buffer size we tell zlib it can output to. We - * actually allocate one extra byte because some routines want to append a - * trailing zero byte to the zlib output. - */ - cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); - cs->zlibOutSize = ZLIB_OUT_SIZE; - - if (deflateInit(zp, level) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) cs->zlibOut; - zp->avail_out = cs->zlibOutSize; + int filenamelen = strlen(filename); + int suffixlen = strlen(suffix); + + if (filenamelen < suffixlen) + return 0; + + return memcmp(&filename[filenamelen - suffixlen], + suffix, + suffixlen) == 0; } +/* free() without changing errno; useful in several places below */ static void -EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs) +free_keep_errno(void *p) { - z_streamp zp = cs->zp; - - zp->next_in = NULL; - zp->avail_in = 0; - - /* Flush any remaining data from zlib buffer */ - DeflateCompressorZlib(AH, cs, true); - - if (deflateEnd(zp) != Z_OK) - pg_fatal("could not close compression stream: %s", zp->msg); + int save_errno = errno; - free(cs->zlibOut); - free(cs->zp); + free(p); + errno = save_errno; } -static void -DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush) +/* + * Compression None implementation + */ +static size_t +read_none(void *ptr, size_t size, CompressFileHandle *CFH) { - z_streamp zp = cs->zp; - char *out = cs->zlibOut; - int res = Z_OK; + FILE *fp = (FILE *) CFH->private_data; + size_t ret; - while (cs->zp->avail_in != 0 || flush) - { - res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH); - if (res == Z_STREAM_ERROR) - pg_fatal("could not compress data: %s", zp->msg); - if ((flush && (zp->avail_out < cs->zlibOutSize)) - || (zp->avail_out == 0) - || (zp->avail_in != 0) - ) - { - /* - * Extra paranoia: avoid zero-length chunks, since a zero length - * chunk is the EOF marker in the custom format. This should never - * happen but... - */ - if (zp->avail_out < cs->zlibOutSize) - { - /* - * Any write function should do its own error checking but to - * make sure we do a check here as well... - */ - size_t len = cs->zlibOutSize - zp->avail_out; - - cs->writeF(AH, out, len); - } - zp->next_out = (void *) out; - zp->avail_out = cs->zlibOutSize; - } + if (size == 0) + return 0; - if (res == Z_STREAM_END) - break; - } + ret = fread(ptr, 1, size, fp); + if (ret != size && !feof(fp)) + pg_fatal("could not read from input file: %s", + strerror(errno)); + + return ret; } -static void -WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) +static size_t +write_none(const void *ptr, size_t size, CompressFileHandle *CFH) { - cs->zp->next_in = (void *) unconstify(char *, data); - cs->zp->avail_in = dLen; - DeflateCompressorZlib(AH, cs, false); + return fwrite(ptr, 1, size, (FILE *) CFH->private_data); } -static void -ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF) +static const char * +get_error_none(CompressFileHandle *CFH) { - z_streamp zp; - char *out; - int res = Z_OK; - size_t cnt; - char *buf; - size_t buflen; - - zp = (z_streamp) pg_malloc(sizeof(z_stream)); - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - buf = pg_malloc(ZLIB_IN_SIZE); - buflen = ZLIB_IN_SIZE; - - out = pg_malloc(ZLIB_OUT_SIZE + 1); - - if (inflateInit(zp) != Z_OK) - pg_fatal("could not initialize compression library: %s", - zp->msg); - - /* no minimal chunk size for zlib */ - while ((cnt = readF(AH, &buf, &buflen))) - { - zp->next_in = (void *) buf; - zp->avail_in = cnt; - - while (zp->avail_in > 0) - { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; + return strerror(errno); +} - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); +static char * +gets_none(char *ptr, int size, CompressFileHandle *CFH) +{ + return fgets(ptr, size, (FILE *) CFH->private_data); +} - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); - } - } +static int +getc_none(CompressFileHandle *CFH) +{ + FILE *fp = (FILE *) CFH->private_data; + int ret; - zp->next_in = NULL; - zp->avail_in = 0; - while (res != Z_STREAM_END) + ret = fgetc(fp); + if (ret == EOF) { - zp->next_out = (void *) out; - zp->avail_out = ZLIB_OUT_SIZE; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - pg_fatal("could not uncompress data: %s", zp->msg); - - out[ZLIB_OUT_SIZE - zp->avail_out] = '\0'; - ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH); + if (!feof(fp)) + pg_fatal("could not read from input file: %s", strerror(errno)); + else + pg_fatal("could not read from input file: end of file"); } - if (inflateEnd(zp) != Z_OK) - pg_fatal("could not close compression library: %s", zp->msg); - - free(buf); - free(out); - free(zp); + return ret; } -#endif /* HAVE_LIBZ */ - - -/* - * Functions for uncompressed output. - */ -static void -ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF) +static int +close_none(CompressFileHandle *CFH) { - size_t cnt; - char *buf; - size_t buflen; + FILE *fp = (FILE *) CFH->private_data; + int ret = 0; - buf = pg_malloc(ZLIB_OUT_SIZE); - buflen = ZLIB_OUT_SIZE; + CFH->private_data = NULL; - while ((cnt = readF(AH, &buf, &buflen))) - { - ahwrite(buf, 1, cnt, AH); - } + if (fp) + ret = fclose(fp); - free(buf); + return ret; } -static void -WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, - const char *data, size_t dLen) + +static int +eof_none(CompressFileHandle *CFH) { - cs->writeF(AH, data, dLen); + return feof((FILE *) CFH->private_data); } - -/*---------------------- - * Compressed stream API - *---------------------- - */ - -/* - * cfp represents an open stream, wrapping the underlying FILE or gzFile - * pointer. This is opaque to the callers. - */ -struct cfp +static int +open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH) { - pg_compress_specification compression_spec; - void *fp; -}; + Assert(CFH->private_data == NULL); -#ifdef HAVE_LIBZ -static int hasSuffix(const char *filename, const char *suffix); -#endif + if (fd >= 0) + CFH->private_data = fdopen(dup(fd), mode); + else + CFH->private_data = fopen(path, mode); -/* free() without changing errno; useful in several places below */ -static void -free_keep_errno(void *p) -{ - int save_errno = errno; + if (CFH->private_data == NULL) + return 1; - free(p); - errno = save_errno; + return 0; } -/* - * Open a file for reading. 'path' is the file to open, and 'mode' should - * be either "r" or "rb". - * - * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path' - * doesn't already have it) and try again. So if you pass "foo" as 'path', - * this will open either "foo" or "foo.gz". - * - * On failure, return NULL with an error code in errno. - */ -cfp * -cfopen_read(const char *path, const char *mode) +static int +open_write_none(const char *path, const char *mode, CompressFileHandle *CFH) { - cfp *fp; + Assert(CFH->private_data == NULL); + CFH->private_data = fopen(path, mode); + if (CFH->private_data == NULL) + return 1; - pg_compress_specification compression_spec = {0}; - -#ifdef HAVE_LIBZ - if (hasSuffix(path, ".gz")) - { - compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(path, mode, &compression_spec); - } - else -#endif - { - compression_spec.algorithm = PG_COMPRESSION_NONE; - fp = cfopen(path, mode, &compression_spec); -#ifdef HAVE_LIBZ - if (fp == NULL) - { - char *fname; - - fname = psprintf("%s.gz", path); - compression_spec.algorithm = PG_COMPRESSION_GZIP; - fp = cfopen(fname, mode, &compression_spec); - free_keep_errno(fname); - } -#endif - } - return fp; + return 0; } -/* - * Open a file for writing. 'path' indicates the path name, and 'mode' must - * be a filemode as accepted by fopen() and gzopen() that indicates writing - * ("w", "wb", "a", or "ab"). - * - * If 'compression_spec.algorithm' is GZIP, a gzip compressed stream is opened, - * and 'compression_spec.level' used. The ".gz" suffix is automatically added to - * 'path' in that case. - * - * On failure, return NULL with an error code in errno. - */ -cfp * -cfopen_write(const char *path, const char *mode, - const pg_compress_specification *compression_spec) +static void +InitCompressNone(CompressFileHandle *CFH, + const pg_compress_specification *compression_spec) { - cfp *fp; - - if (compression_spec->algorithm == PG_COMPRESSION_NONE) - fp = cfopen(path, mode, compression_spec); - else - { -#ifdef HAVE_LIBZ - char *fname; - - fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, compression_spec); - free_keep_errno(fname); -#else - pg_fatal("this build does not support compression with %s", "gzip"); - fp = NULL; /* keep compiler quiet */ -#endif - } - return fp; + CFH->open_func = open_none; + CFH->open_write_func = open_write_none; + CFH->read_func = read_none; + CFH->write_func = write_none; + CFH->gets_func = gets_none; + CFH->getc_func = getc_none; + CFH->close_func = close_none; + CFH->eof_func = eof_none; + CFH->get_error_func = get_error_none; + + CFH->private_data = NULL; } /* - * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or - * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The - * descriptor is not dup'ed and it is the caller's responsibility to do so. - * The caller must verify that the 'compress_algorithm' is supported by the - * current build. - * - * On failure, return NULL with an error code in errno. + * Public interface */ -static cfp * -cfopen_internal(const char *path, int fd, const char *mode, - const pg_compress_specification *compression_spec) +CompressFileHandle * +InitCompressFileHandle(const pg_compress_specification *compression_spec) { - cfp *fp = pg_malloc(sizeof(cfp)); + CompressFileHandle *CFH; - fp->compression_spec = *compression_spec; + CFH = pg_malloc0(sizeof(CompressFileHandle)); switch (compression_spec->algorithm) { case PG_COMPRESSION_NONE: - if (fd >= 0) - fp->fp = fdopen(fd, mode); - else - fp->fp = fopen(path, mode); - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } - + InitCompressNone(CFH, compression_spec); break; case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - if (compression_spec->level != Z_DEFAULT_COMPRESSION) - { - /* - * user has specified a compression level, so tell zlib to use - * it - */ - char mode_compression[32]; - - snprintf(mode_compression, sizeof(mode_compression), "%s%d", - mode, compression_spec->level); - if (fd >= 0) - fp->fp = gzdopen(fd, mode_compression); - // fp->compressedfp = gzopen(path, mode_compression); - else - fp->fp = gzopen(path, mode_compression); - } - else - { - /* don't specify a level, just use the zlib default */ - if (fd >= 0) - fp->fp = gzdopen(fd, mode); - else - fp->fp = gzopen(path, mode); - // fp->compressedfp = gzopen(path, mode); - } - - if (fp->fp == NULL) - { - free_keep_errno(fp); - fp = NULL; - } -#else - pg_fatal("this build does not support compression with %s", "gzip"); -#endif + InitCompressGzip(CFH, compression_spec); break; case PG_COMPRESSION_LZ4: pg_fatal("compression with %s is not yet supported", "LZ4"); @@ -593,266 +333,88 @@ cfopen_internal(const char *path, int fd, const char *mode, break; } - return fp; + return CFH; } -cfp * -cfopen(const char *path, const char *mode, - const pg_compress_specification *compression_spec) +/* + * Open a file for reading. 'path' is the file to open, and 'mode' should + * be either "r" or "rb". + * + * If the file at 'path' does not exist, we append the "{.gz,.lz4}" suffix (i + * 'path' doesn't already have it) and try again. So if you pass "foo" as + * 'path', this will open either "foo" or "foo.gz" or "foo.lz4", trying in that + * order. + * + * On failure, return NULL with an error code in errno. + */ +CompressFileHandle * +InitDiscoverCompressFileHandle(const char *path, const char *mode) { - return cfopen_internal(path, -1, mode, compression_spec); -} + CompressFileHandle *CFH = NULL; + struct stat st; + char *fname; + pg_compress_specification compression_spec = {0}; -cfp * -cfdopen(int fd, const char *mode, - const pg_compress_specification *compression_spec) -{ - return cfopen_internal(NULL, fd, mode, compression_spec); -} + // compression_spec.algorithm = PG_COMPRESSION_NONE; -int -cfread(void *ptr, int size, cfp *fp) -{ - int ret = 0; + Assert(strcmp(mode, "r") == 0 || strcmp(mode, "rb") == 0); - if (size == 0) - return 0; + fname = strdup(path); - switch (fp->compression_spec.algorithm) + if (hasSuffix(fname, ".gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else { - case PG_COMPRESSION_NONE: - ret = fread(ptr, 1, size, (FILE *) fp->fp); - if (ret != size && !feof((FILE *) fp->fp)) - READ_ERROR_EXIT((FILE *) fp->fp); + bool exists; - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzread((gzFile) fp->fp, ptr, size); - if (ret != size && !gzeof((gzFile) fp->fp)) - { - int errnum; - const char *errmsg = gzerror((gzFile) fp->fp, &errnum); - - pg_fatal("could not read from input file: %s", - errnum == Z_ERRNO ? strerror(errno) : errmsg); - } -#else - pg_fatal("this build does not support compression with %s", - "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } - - return ret; -} - -int -cfwrite(const void *ptr, int size, cfp *fp) -{ - int ret = 0; - - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fwrite(ptr, 1, size, (FILE *) fp->fp); - break; - case PG_COMPRESSION_GZIP: + exists = (stat(path, &st) == 0); + /* avoid unused warning if it is not build with compression */ + if (exists) + compression_spec.algorithm = PG_COMPRESSION_NONE; #ifdef HAVE_LIBZ - ret = gzwrite((gzFile) fp->fp, ptr, size); -#else - pg_fatal("this build does not support compression with %s", - "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } - - return ret; -} - -int -cfgetc(cfp *fp) -{ - int ret = 0; - - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fgetc((FILE *) fp->fp); - if (ret == EOF) - READ_ERROR_EXIT((FILE *) fp->fp); + if (!exists) + { + free_keep_errno(fname); + fname = psprintf("%s.gz", path); + exists = (stat(fname, &st) == 0); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgetc((gzFile) fp->fp); - if (ret == EOF) - { - if (!gzeof((gzFile) fp->fp)) - pg_fatal("could not read from input file: %s", strerror(errno)); - else - pg_fatal("could not read from input file: end of file"); - } -#else - pg_fatal("this build does not support compression with %s", - "gzip"); + if (exists) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + } #endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } - - return ret; -} - -char * -cfgets(cfp *fp, char *buf, int len) -{ - char *ret = NULL; - - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fgets(buf, len, (FILE *) fp->fp); +#ifdef USE_LZ4 + if (!exists) + { + free_keep_errno(fname); + fname = psprintf("%s.lz4", path); + exists = (stat(fname, &st) == 0); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzgets((gzFile) fp->fp, buf, len); -#else - pg_fatal("this build does not support compression with %s", - "gzip"); + if (exists) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + } #endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; } - return ret; -} - -int -cfclose(cfp *fp) -{ - int ret = 0; - - if (fp == NULL) + CFH = InitCompressFileHandle(&compression_spec); + if (CFH->open_func(fname, -1, mode, CFH)) { - errno = EBADF; - return EOF; + free_keep_errno(CFH); + CFH = NULL; } + free_keep_errno(fname); - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = fclose((FILE *) fp->fp); - fp->fp = NULL; - - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzclose((gzFile) fp->fp); - fp->fp = NULL; -#else - pg_fatal("this build does not support compression with %s", - "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } - - free_keep_errno(fp); - - return ret; + return CFH; } int -cfeof(cfp *fp) +DestroyCompressFileHandle(CompressFileHandle *CFH) { int ret = 0; - switch (fp->compression_spec.algorithm) - { - case PG_COMPRESSION_NONE: - ret = feof((FILE *) fp->fp); + if (CFH->private_data) + ret = CFH->close_func(CFH); - break; - case PG_COMPRESSION_GZIP: -#ifdef HAVE_LIBZ - ret = gzeof((gzFile) fp->fp); -#else - pg_fatal("this build does not support compression with %s", - "gzip"); -#endif - break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } + free_keep_errno(CFH); return ret; } - -const char * -get_cfp_error(cfp *fp) -{ - if (fp->compression_spec.algorithm == PG_COMPRESSION_GZIP) - { -#ifdef HAVE_LIBZ - int errnum; - const char *errmsg = gzerror((gzFile) fp->fp, &errnum); - - if (errnum != Z_ERRNO) - return errmsg; -#else - pg_fatal("this build does not support compression with %s", "gzip"); -#endif - } - - return strerror(errno); -} - -#ifdef HAVE_LIBZ -static int -hasSuffix(const char *filename, const char *suffix) -{ - int filenamelen = strlen(filename); - int suffixlen = strlen(suffix); - - if (filenamelen < suffixlen) - return 0; - - return memcmp(&filename[filenamelen - suffixlen], - suffix, - suffixlen) == 0; -} - -#endif diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 768096c820d..afe6b22efaf 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -37,34 +37,63 @@ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len); */ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen); -/* struct definition appears in compress_io.c */ typedef struct CompressorState CompressorState; +struct CompressorState +{ + /* + * Read all compressed data from the input stream (via readF) and print it + * out with ahwrite(). + */ + void (*readData) (ArchiveHandle *AH, CompressorState *cs); + + /* + * Compress and write data to the output stream (via writeF). + */ + void (*writeData) (ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); + void (*end) (ArchiveHandle *AH, CompressorState *cs); + + ReadFunc readF; + WriteFunc writeF; + + pg_compress_specification compression_spec; + void *private_data; +}; extern CompressorState *AllocateCompressor(const pg_compress_specification *compression_spec, + ReadFunc readF, WriteFunc writeF); -extern void ReadDataFromArchive(ArchiveHandle *AH, - const pg_compress_specification *compression_spec, - ReadFunc readF); -extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, - const void *data, size_t dLen); extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); +/* + * Compress File Handle + */ +typedef struct CompressFileHandle CompressFileHandle; + +struct CompressFileHandle +{ + int (*open_func) (const char *path, int fd, const char *mode, + CompressFileHandle *CFH); + int (*open_write_func) (const char *path, const char *mode, + CompressFileHandle *cxt); + size_t (*read_func) (void *ptr, size_t size, CompressFileHandle *CFH); + size_t (*write_func) (const void *ptr, size_t size, + struct CompressFileHandle *CFH); + char *(*gets_func) (char *s, int size, CompressFileHandle *CFH); + int (*getc_func) (CompressFileHandle *CFH); + int (*eof_func) (CompressFileHandle *CFH); + int (*close_func) (CompressFileHandle *CFH); + const char *(*get_error_func) (CompressFileHandle *CFH); + + pg_compress_specification compression_spec; + void *private_data; +}; -typedef struct cfp cfp; +extern CompressFileHandle *InitCompressFileHandle( + const pg_compress_specification *compression_spec); -extern cfp *cfopen(const char *path, const char *mode, - const pg_compress_specification *compression_spec); -extern cfp *cfdopen(int fd, const char *mode, - const pg_compress_specification *compression_spec); -extern cfp *cfopen_read(const char *path, const char *mode); -extern cfp *cfopen_write(const char *path, const char *mode, - const pg_compress_specification *compression_spec); -extern int cfread(void *ptr, int size, cfp *fp); -extern int cfwrite(const void *ptr, int size, cfp *fp); -extern int cfgetc(cfp *fp); -extern char *cfgets(cfp *fp, char *buf, int len); -extern int cfclose(cfp *fp); -extern int cfeof(cfp *fp); -extern const char *get_cfp_error(cfp *fp); +extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path, + const char *mode); +extern int DestroyCompressFileHandle(CompressFileHandle *CFH); #endif diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index ca62f9a3740..aa2c91829c0 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -2,6 +2,7 @@ pg_dump_common_sources = files( 'compress_io.c', + 'compress_gzip.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index cbe110c917a..06f0b46cbfc 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -95,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH); static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification *compression_spec); -static cfp *SaveOutput(ArchiveHandle *AH); -static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput); +static CompressFileHandle *SaveOutput(ArchiveHandle *AH); +static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); static void restore_toc_entries_prefork(ArchiveHandle *AH, @@ -272,7 +272,7 @@ CloseArchive(Archive *AHX) /* Close the output */ errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -355,7 +355,7 @@ RestoreArchive(Archive *AHX) bool parallel_mode; bool supports_compression; TocEntry *te; - cfp *sav; + CompressFileHandle *sav; AH->stage = STAGE_INITIALIZING; @@ -1127,7 +1127,7 @@ PrintTOCSummary(Archive *AHX) TocEntry *te; pg_compress_specification out_compression_spec = {0}; teSection curSection; - cfp *sav; + CompressFileHandle *sav; const char *fmtName; char stamp_str[64]; @@ -1143,9 +1143,10 @@ PrintTOCSummary(Archive *AHX) strcpy(stamp_str, "[unknown]"); ahprintf(AH, ";\n; Archive created at %s\n", stamp_str); - ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n", + ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n", sanitize_line(AH->archdbname, false), - AH->tocCount, AH->compression_spec.level); + AH->tocCount, + get_compress_algorithm_name(AH->compression_spec.algorithm)); switch (AH->format) { @@ -1502,6 +1503,7 @@ static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification *compression_spec) { + CompressFileHandle *CFH; const char *mode; int fn = -1; @@ -1524,33 +1526,32 @@ SetOutput(ArchiveHandle *AH, const char *filename, else mode = PG_BINARY_W; - if (fn >= 0) - AH->OF = cfdopen(dup(fn), mode, compression_spec); - else - AH->OF = cfopen(filename, mode, compression_spec); + CFH = InitCompressFileHandle(compression_spec); - if (!AH->OF) + if (CFH->open_func(filename, fn, mode, CFH)) { if (filename) pg_fatal("could not open output file \"%s\": %m", filename); else pg_fatal("could not open output file: %m"); } + + AH->OF = CFH; } -static cfp * +static CompressFileHandle * SaveOutput(ArchiveHandle *AH) { - return (cfp *) AH->OF; + return (CompressFileHandle *) AH->OF; } static void -RestoreOutput(ArchiveHandle *AH, cfp *savedOutput) +RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput) { int res; errno = 0; - res = cfclose(AH->OF); + res = DestroyCompressFileHandle(AH->OF); if (res != 0) pg_fatal("could not close output file: %m"); @@ -1689,7 +1690,11 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) else if (RestoringToDB(AH)) bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb); else - bytes_written = cfwrite(ptr, size * nmemb, AH->OF); + { + CompressFileHandle *CFH = (CompressFileHandle *) AH->OF; + + bytes_written = CFH->write_func(ptr, size * nmemb, CFH); + } if (bytes_written != size * nmemb) WRITE_ERROR_EXIT; @@ -2031,6 +2036,18 @@ ReadStr(ArchiveHandle *AH) return buf; } +static bool +_fileExistsInDirectory(const char *dir, const char *filename) +{ + struct stat st; + char buf[MAXPGPATH]; + + if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH) + pg_fatal("directory name too long: \"%s\"", dir); + + return (stat(buf, &st) == 0 && S_ISREG(st.st_mode)); +} + static int _discoverArchiveFormat(ArchiveHandle *AH) { @@ -2061,26 +2078,12 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { - char buf[MAXPGPATH]; - - if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat")) return AH->format; - } - #ifdef HAVE_LIBZ - if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH) - pg_fatal("directory name too long: \"%s\"", - AH->fSpec); - if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) - { - AH->format = archDirectory; + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) return AH->format; - } #endif pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); @@ -2178,6 +2181,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, SetupWorkerPtrType setupWorkerPtr) { ArchiveHandle *AH; + CompressFileHandle *CFH; pg_compress_specification out_compress_spec = {0}; pg_log_debug("allocating AH for %s, format %d", @@ -2233,7 +2237,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, /* Open stdout with no compression for AH output handle */ out_compress_spec.algorithm = PG_COMPRESSION_NONE; - AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, &out_compress_spec); + CFH = InitCompressFileHandle(&out_compress_spec); + if (CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH)) + pg_fatal("could not open stdout for appending: %m"); + AH->OF = CFH; /* * On Windows, we need to use binary mode to read/write non-text files, @@ -3646,12 +3653,7 @@ WriteHead(ArchiveHandle *AH) AH->WriteBytePtr(AH, AH->intSize); AH->WriteBytePtr(AH, AH->offSize); AH->WriteBytePtr(AH, AH->format); - /* - * For now the compression type is implied by the level. This will need - * to change once support for more compression algorithms is added, - * requiring a format bump. - */ - WriteInt(AH, AH->compression_spec.level); + AH->WriteBytePtr(AH, AH->compression_spec.algorithm); crtm = *localtime(&AH->createDate); WriteInt(AH, crtm.tm_sec); WriteInt(AH, crtm.tm_min); @@ -3722,10 +3724,11 @@ ReadHead(ArchiveHandle *AH) pg_fatal("expected format (%d) differs from format found in file (%d)", AH->format, fmt); - /* Guess the compression method based on the level */ - AH->compression_spec.algorithm = PG_COMPRESSION_NONE; - if (AH->version >= K_VERS_1_2) + if (AH->version >= K_VERS_1_15) + AH->compression_spec.algorithm = AH->ReadBytePtr(AH); + else if (AH->version >= K_VERS_1_2) { + /* Guess the compression method based on the level */ if (AH->version < K_VERS_1_4) AH->compression_spec.level = AH->ReadBytePtr(AH); else @@ -3737,10 +3740,17 @@ ReadHead(ArchiveHandle *AH) else AH->compression_spec.algorithm = PG_COMPRESSION_GZIP; + if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE) + { + bool unsupported = false; + #ifndef HAVE_LIBZ - if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) - pg_fatal("archive is compressed, but this installation does not support compression"); + if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) + unsupported = true; #endif + if (unsupported) + pg_fatal("archive is compressed, but this installation does not support compression"); + } if (AH->version >= K_VERS_1_4) { diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 4725e49747b..18b38c17abc 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -65,10 +65,13 @@ #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path * behavior */ #define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0) /* add tableam */ +#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add + * compression_algorithm + * in header */ /* Current archive version number (the format we can output) */ #define K_VERS_MAJOR 1 -#define K_VERS_MINOR 14 +#define K_VERS_MINOR 15 #define K_VERS_REV 0 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 0e87444de85..40cd90b7325 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - ctx->cs = AllocateCompressor(&AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression_spec, + NULL, + _CustomWriteFunc); } /* @@ -317,15 +319,15 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) CompressorState *cs = ctx->cs; if (dLen > 0) - /* WriteDataToArchive() internally throws write errors */ - WriteDataToArchive(AH, cs, data, dLen); + /* writeData() internally throws write errors */ + cs->writeData(AH, cs, data, dLen); } /* * Called by the archiver when a dumper's 'DataDumper' routine has * finished. * - * Optional. + * Mandatory. */ static void _EndData(ArchiveHandle *AH, TocEntry *te) @@ -333,6 +335,8 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; EndCompressor(AH, ctx->cs); + ctx->cs = NULL; + /* Send the end marker */ WriteInt(AH, 0); } @@ -377,7 +381,9 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) WriteInt(AH, oid); - ctx->cs = AllocateCompressor(&AH->compression_spec, _CustomWriteFunc); + ctx->cs = AllocateCompressor(&AH->compression_spec, + NULL, + _CustomWriteFunc); } /* @@ -566,7 +572,12 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te) static void _PrintData(ArchiveHandle *AH) { - ReadDataFromArchive(AH, &AH->compression_spec, _CustomReadFunc); + CompressorState *cs; + + cs = AllocateCompressor(&AH->compression_spec, + _CustomReadFunc, NULL); + cs->readData(AH, cs); + EndCompressor(AH, cs); } static void diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index ffb8a0e4dd7..2d4baf58c22 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -50,9 +50,8 @@ typedef struct */ char *directory; - cfp *dataFH; /* currently open data file */ - - cfp *LOsTocFH; /* file handle for blobs.toc */ + CompressFileHandle *dataFH; /* currently open data file */ + CompressFileHandle *LOsTocFH; /* file handle for blobs.toc */ ParallelState *pstate; /* for parallel backup / restore */ } lclContext; @@ -198,11 +197,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) else { /* Read Mode */ char fname[MAXPGPATH]; - cfp *tocFH; + CompressFileHandle *tocFH; setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R); if (tocFH == NULL) pg_fatal("could not open input file \"%s\": %m", fname); @@ -218,7 +217,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) ReadToc(AH); /* Nothing else in the file, so close it again... */ - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); ctx->dataFH = NULL; } @@ -327,9 +326,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, - &AH->compression_spec); - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(&AH->compression_spec); + + if (ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -346,15 +345,16 @@ static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen) + if (dLen > 0 && CFH->write_func(data, dLen, CFH) != dLen) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error_func(CFH)); } } @@ -370,7 +370,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te) lclContext *ctx = (lclContext *) AH->formatData; /* Close the file */ - if (cfclose(ctx->dataFH) != 0) + if (DestroyCompressFileHandle(ctx->dataFH) != 0) pg_fatal("could not close data file: %m"); ctx->dataFH = NULL; @@ -385,26 +385,25 @@ _PrintFileData(ArchiveHandle *AH, char *filename) size_t cnt; char *buf; size_t buflen; - cfp *cfp; + CompressFileHandle *CFH; if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); - - if (!cfp) + CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R); + if (!CFH) pg_fatal("could not open input file \"%s\": %m", filename); buf = pg_malloc(ZLIB_OUT_SIZE); buflen = ZLIB_OUT_SIZE; - while ((cnt = cfread(buf, buflen, cfp))) + while ((cnt = CFH->read_func(buf, buflen, CFH))) { ahwrite(buf, 1, cnt, AH); } free(buf); - if (cfclose(cfp) != 0) + if (DestroyCompressFileHandle(CFH) != 0) pg_fatal("could not close data file \"%s\": %m", filename); } @@ -435,6 +434,7 @@ _LoadLOs(ArchiveHandle *AH) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH; char tocfname[MAXPGPATH]; char line[MAXPGPATH]; @@ -442,14 +442,14 @@ _LoadLOs(ArchiveHandle *AH) setFilePath(AH, tocfname, "blobs.toc"); - ctx->LOsTocFH = cfopen_read(tocfname, PG_BINARY_R); + CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R); if (ctx->LOsTocFH == NULL) pg_fatal("could not open large object TOC file \"%s\" for input: %m", tocfname); /* Read the LOs TOC file line-by-line, and process each LO */ - while ((cfgets(ctx->LOsTocFH, line, MAXPGPATH)) != NULL) + while ((CFH->gets_func(line, MAXPGPATH, CFH)) != NULL) { char lofname[MAXPGPATH + 1]; char path[MAXPGPATH]; @@ -464,11 +464,11 @@ _LoadLOs(ArchiveHandle *AH) _PrintFileData(AH, path); EndRestoreLO(AH, oid); } - if (!cfeof(ctx->LOsTocFH)) + if (!CFH->eof_func(CFH)) pg_fatal("error reading large object TOC file \"%s\"", tocfname); - if (cfclose(ctx->LOsTocFH) != 0) + if (DestroyCompressFileHandle(ctx->LOsTocFH) != 0) pg_fatal("could not close large object TOC file \"%s\": %m", tocfname); @@ -488,15 +488,16 @@ _WriteByte(ArchiveHandle *AH, const int i) { unsigned char c = (unsigned char) i; lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(&c, 1, ctx->dataFH) != 1) + if (CFH->write_func(&c, 1, CFH) != 1) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error_func(CFH)); } return 1; @@ -512,8 +513,9 @@ static int _ReadByte(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; - return cfgetc(ctx->dataFH); + return CFH->getc_func(CFH); } /* @@ -524,15 +526,16 @@ static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; errno = 0; - if (cfwrite(buf, len, ctx->dataFH) != len) + if (CFH->write_func(buf, len, CFH) != len) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; pg_fatal("could not write to output file: %s", - get_cfp_error(ctx->dataFH)); + CFH->get_error_func(CFH)); } } @@ -545,12 +548,13 @@ static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->dataFH; /* - * If there was an I/O error, we already exited in cfread(), so here we + * If there was an I/O error, we already exited in readF(), so here we * exit on short reads. */ - if (cfread(buf, len, ctx->dataFH) != len) + if (CFH->read_func(buf, len, CFH) != len) pg_fatal("could not read from input file: end of file"); } @@ -573,7 +577,7 @@ _CloseArchive(ArchiveHandle *AH) if (AH->mode == archModeWrite) { - cfp *tocFH; + CompressFileHandle *tocFH; pg_compress_specification compression_spec = {0}; char fname[MAXPGPATH]; @@ -584,8 +588,8 @@ _CloseArchive(ArchiveHandle *AH) /* The TOC is always created uncompressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - tocFH = cfopen_write(fname, PG_BINARY_W, &compression_spec); - if (tocFH == NULL) + tocFH = InitCompressFileHandle(&compression_spec); + if (tocFH->open_write_func(fname, PG_BINARY_W, tocFH)) pg_fatal("could not open output file \"%s\": %m", fname); ctx->dataFH = tocFH; @@ -598,7 +602,7 @@ _CloseArchive(ArchiveHandle *AH) WriteHead(AH); AH->format = archDirectory; WriteToc(AH); - if (cfclose(tocFH) != 0) + if (DestroyCompressFileHandle(tocFH) != 0) pg_fatal("could not close TOC file: %m"); WriteDataChunks(AH, ctx->pstate); @@ -649,8 +653,8 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te) /* The LO TOC file is never compressed */ compression_spec.algorithm = PG_COMPRESSION_NONE; - ctx->LOsTocFH = cfopen_write(fname, "ab", &compression_spec); - if (ctx->LOsTocFH == NULL) + ctx->LOsTocFH = InitCompressFileHandle(&compression_spec); + if (ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -667,9 +671,8 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, &AH->compression_spec); - - if (ctx->dataFH == NULL) + ctx->dataFH = InitCompressFileHandle(&AH->compression_spec); + if (ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH)) pg_fatal("could not open output file \"%s\": %m", fname); } @@ -682,18 +685,19 @@ static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid) { lclContext *ctx = (lclContext *) AH->formatData; + CompressFileHandle *CFH = ctx->LOsTocFH; char buf[50]; int len; - /* Close the LO data file itself */ - if (cfclose(ctx->dataFH) != 0) - pg_fatal("could not close LO data file: %m"); + /* Close the BLOB data file itself */ + if (DestroyCompressFileHandle(ctx->dataFH) != 0) + pg_fatal("could not close blob data file: %m"); ctx->dataFH = NULL; /* register the LO in blobs.toc */ len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid); - if (cfwrite(buf, len, ctx->LOsTocFH) != len) - pg_fatal("could not write to LOs TOC file"); + if (CFH->write_func(buf, len, CFH) != len) + pg_fatal("could not write to blobs TOC file"); } /* @@ -706,8 +710,8 @@ _EndLOs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; - if (cfclose(ctx->LOsTocFH) != 0) - pg_fatal("could not close LOs TOC file: %m"); + if (DestroyCompressFileHandle(ctx->LOsTocFH) != 0) + pg_fatal("could not close blobs TOC file: %m"); ctx->LOsTocFH = NULL; } diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 2eeef2a4783..f3ba9263213 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -94,7 +94,7 @@ my %pgdump_runs = ( command => [ 'pg_restore', '-l', "$tempdir/compression_gzip_custom.dump", ], - expected => qr/Compression: 1/, + expected => qr/Compression: gzip/, name => 'data content is gzip-compressed' }, }, @@ -239,8 +239,8 @@ my %pgdump_runs = ( command => [ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ], expected => $supports_gzip ? - qr/Compression: -1/ : - qr/Compression: 0/, + qr/Compression: gzip/ : + qr/Compression: none/, name => 'data content is gzip-compressed by default if available', }, }, @@ -264,8 +264,8 @@ my %pgdump_runs = ( command => [ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ], expected => $supports_gzip ? - qr/Compression: -1/ : - qr/Compression: 0/, + qr/Compression: gzip/ : + qr/Compression: none/, name => 'data content is gzip-compressed by default', }, glob_patterns => [ diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck index fe432e2cccc..62f3e9a81d4 100755 --- a/src/tools/pginclude/cpluspluscheck +++ b/src/tools/pginclude/cpluspluscheck @@ -150,6 +150,7 @@ do # pg_dump is not C++-clean because it uses "public" and "namespace" # as field names, which is unfortunate but we won't change it now. + test "$f" = src/bin/pg_dump/compress_gzip.h && continue test "$f" = src/bin/pg_dump/compress_io.h && continue test "$f" = src/bin/pg_dump/parallel.h && continue test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 23bafec5f79..840191d680b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -428,6 +428,7 @@ CompiledExprState CompositeIOData CompositeTypeStmt CompoundAffixFlag +CompressFileHandle CompressionLocation CompressorState ComputeXidHorizonsResult @@ -1034,6 +1035,7 @@ GucStack GucStackState GucStringAssignHook GucStringCheckHook +GzipCompressorState HANDLE HASHACTION HASHBUCKET -- 2.25.1
>From 100ff6665ddc4965d9cc4c0f2cd03d9b17a46099 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 14 Jan 2023 10:31:47 -0600 Subject: [PATCH 4/7] f! --- src/bin/pg_dump/compress_gzip.c | 9 +-------- src/bin/pg_dump/pg_backup_archiver.c | 1 - 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c index 37c841c5a9b..b00be32f2e9 100644 --- a/src/bin/pg_dump/compress_gzip.c +++ b/src/bin/pg_dump/compress_gzip.c @@ -291,17 +291,10 @@ static int Gzip_close(CompressFileHandle *CFH) { gzFile gzfp = (gzFile) CFH->private_data; - int save_errno; - int ret; CFH->private_data = NULL; - ret = gzclose(gzfp); - - save_errno = errno; - errno = save_errno; - - return ret; + return gzclose(gzfp); } static int diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 06f0b46cbfc..7f06beff61c 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -385,7 +385,6 @@ RestoreArchive(Archive *AHX) */ supports_compression = true; if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE && - AH->compression_spec.algorithm == PG_COMPRESSION_GZIP && AH->PrintTocDataPtr != NULL) { for (te = AH->toc->next; te != AH->toc; te = te->next) -- 2.25.1
>From efba6161c6d6849e5ff7cd922b1572a0e27e76b7 Mon Sep 17 00:00:00 2001 From: Georgios Kokolatos <gkokola...@pm.me> Date: Wed, 21 Dec 2022 09:49:36 +0000 Subject: [PATCH 5/7] Add LZ4 compression in pg_{dump|restore} Within compress_lz4.{c,h} the streaming API and a file API compression is implemented.. The first one, is aimed at inlined use cases and thus simple lz4.h calls can be used directly. The second one is generating output, or is parsing input, which can be read/generated via the lz4 utility. Wherever the LZ4F api does not implement all the functionality corresponding to fread(), fwrite(), fgets(), fgetc(), feof(), and fclose(), it has been implemented localy. --- doc/src/sgml/ref/pg_dump.sgml | 13 +- src/bin/pg_dump/Makefile | 2 + src/bin/pg_dump/compress_io.c | 11 +- src/bin/pg_dump/compress_lz4.c | 618 +++++++++++++++++++++++++++ src/bin/pg_dump/compress_lz4.h | 22 + src/bin/pg_dump/meson.build | 8 +- src/bin/pg_dump/pg_backup_archiver.c | 14 +- src/bin/pg_dump/pg_dump.c | 5 +- src/bin/pg_dump/t/002_pg_dump.pl | 82 +++- src/tools/pginclude/cpluspluscheck | 1 + src/tools/pgindent/typedefs.list | 1 + 11 files changed, 756 insertions(+), 21 deletions(-) create mode 100644 src/bin/pg_dump/compress_lz4.c create mode 100644 src/bin/pg_dump/compress_lz4.h diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index 2c938cd7e14..49d218905fb 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -330,9 +330,10 @@ PostgreSQL documentation machine-readable format that <application>pg_restore</application> can read. A directory format archive can be manipulated with standard Unix tools; for example, files in an uncompressed archive - can be compressed with the <application>gzip</application> tool. - This format is compressed by default and also supports parallel - dumps. + can be compressed with the <application>gzip</application> or + <application>lz4</application>tool. + This format is compressed by default using <literal>gzip</literal> + and also supports parallel dumps. </para> </listitem> </varlistentry> @@ -654,7 +655,7 @@ PostgreSQL documentation <para> Specify the compression method and/or the compression level to use. The compression method can be set to <literal>gzip</literal> or - <literal>none</literal> for no compression. + <literal>lz4</literal> or <literal>none</literal> for no compression. A compression detail string can optionally be specified. If the detail string is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the @@ -675,8 +676,8 @@ PostgreSQL documentation individual table-data segments, and the default is to compress using <literal>gzip</literal> at a moderate level. For plain text output, setting a nonzero compression level causes the entire output file to be compressed, - as though it had been fed through <application>gzip</application>; but the default - is not to compress. + as though it had been fed through <application>gzip</application> or + <application>lz4</application>; but the default is not to compress. </para> <para> The tar archive format currently does not support compression at all. diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index 7a19f5d6172..a1401377ab9 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -17,6 +17,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global export GZIP_PROGRAM=$(GZIP) +export LZ4 export with_icu override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) @@ -25,6 +26,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ compress_gzip.o \ + compress_lz4.o \ compress_io.o \ dumputils.o \ parallel.o \ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 576a8653193..8ebefd1ed13 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -60,6 +60,7 @@ #include "compress_io.h" #include "compress_gzip.h" +#include "compress_lz4.h" #include "pg_backup_utils.h" #ifdef HAVE_LIBZ @@ -137,7 +138,7 @@ AllocateCompressor(const pg_compress_specification *compression_spec, InitCompressorGzip(cs, compression_spec); break; case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); + InitCompressorLZ4(cs, compression_spec); break; case PG_COMPRESSION_ZSTD: pg_fatal("compression with %s is not yet supported", "ZSTD"); @@ -326,7 +327,7 @@ InitCompressFileHandle(const pg_compress_specification *compression_spec) InitCompressGzip(CFH, compression_spec); break; case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); + InitCompressLZ4(CFH, compression_spec); break; case PG_COMPRESSION_ZSTD: pg_fatal("compression with %s is not yet supported", "ZSTD"); @@ -340,12 +341,12 @@ InitCompressFileHandle(const pg_compress_specification *compression_spec) * Open a file for reading. 'path' is the file to open, and 'mode' should * be either "r" or "rb". * - * If the file at 'path' does not exist, we append the "{.gz,.lz4}" suffix (i + * If the file at 'path' does not exist, we append the ".gz" suffix (if * 'path' doesn't already have it) and try again. So if you pass "foo" as - * 'path', this will open either "foo" or "foo.gz" or "foo.lz4", trying in that - * order. + * 'path', this will open either "foo" or "foo.gz", trying in that order. * * On failure, return NULL with an error code in errno. + * */ CompressFileHandle * InitDiscoverCompressFileHandle(const char *path, const char *mode) diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c new file mode 100644 index 00000000000..c97e16187a0 --- /dev/null +++ b/src/bin/pg_dump/compress_lz4.c @@ -0,0 +1,618 @@ +/*------------------------------------------------------------------------- + * + * compress_lz4.c + * Routines for archivers to write an uncompressed or compressed data + * stream. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_lz4.c + * + *------------------------------------------------------------------------- + */ +#include "postgres_fe.h" +#include "pg_backup_utils.h" + +#include "compress_lz4.h" + +#ifdef USE_LZ4 +#include <lz4.h> +#include <lz4frame.h> + +#define LZ4_OUT_SIZE (4 * 1024) +#define LZ4_IN_SIZE (16 * 1024) + +/* + * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library. + * Redefine it for installations with a lesser version. + */ +#ifndef LZ4F_HEADER_SIZE_MAX +#define LZ4F_HEADER_SIZE_MAX 32 +#endif + +/*---------------------- + * Compressor API + *---------------------- + */ + +typedef struct LZ4CompressorState +{ + char *outbuf; + size_t outsize; +} LZ4CompressorState; + +/* Private routines that support LZ4 compressed data I/O */ +static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs); +static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); +static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs); + +static void +ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) +{ + LZ4_streamDecode_t lz4StreamDecode; + char *buf; + char *decbuf; + size_t buflen; + size_t cnt; + + buflen = LZ4_IN_SIZE; + buf = pg_malloc(buflen); + decbuf = pg_malloc(buflen); + + LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0); + + while ((cnt = cs->readF(AH, &buf, &buflen))) + { + int decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode, + buf, decbuf, + cnt, buflen); + + ahwrite(decbuf, 1, decBytes, AH); + } + + pg_free(buf); + pg_free(decbuf); +} + +static void +WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data; + size_t compressed; + size_t requiredsize = LZ4_compressBound(dLen); + + if (requiredsize > LZ4cs->outsize) + { + LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize); + LZ4cs->outsize = requiredsize; + } + + compressed = LZ4_compress_default(data, LZ4cs->outbuf, + dLen, LZ4cs->outsize); + + if (compressed <= 0) + pg_fatal("failed to LZ4 compress data"); + + cs->writeF(AH, LZ4cs->outbuf, compressed); +} + +static void +EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) +{ + LZ4CompressorState *LZ4cs; + + LZ4cs = (LZ4CompressorState *) cs->private_data; + if (LZ4cs) + { + pg_free(LZ4cs->outbuf); + pg_free(LZ4cs); + cs->private_data = NULL; + } +} + + +/* Public routines that support LZ4 compressed data I/O */ +void +InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) +{ + cs->readData = ReadDataFromArchiveLZ4; + cs->writeData = WriteDataToArchiveLZ4; + cs->end = EndCompressorLZ4; + + cs->compression_spec = compression_spec; + + /* Will be lazy init'd */ + cs->private_data = pg_malloc0(sizeof(LZ4CompressorState)); +} + +/*---------------------- + * Compress File API + *---------------------- + */ + +/* + * State needed for LZ4 (de)compression using the CompressFileHandle API. + */ +typedef struct LZ4File +{ + FILE *fp; + + LZ4F_preferences_t prefs; + + LZ4F_compressionContext_t ctx; + LZ4F_decompressionContext_t dtx; + + bool inited; + bool compressing; + + size_t buflen; + char *buffer; + + size_t overflowalloclen; + size_t overflowlen; + char *overflowbuf; + + size_t errcode; +} LZ4File; + +/* + * LZ4 equivalent to feof() or gzeof(). The end of file + * is reached if there is no decompressed output in the + * overflow buffer and the end of the file is reached. + */ +static int +LZ4File_eof(CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + + return fs->overflowlen == 0 && feof(fs->fp); +} + +static const char * +LZ4File_get_error(CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + const char *errmsg; + + if (LZ4F_isError(fs->errcode)) + errmsg = LZ4F_getErrorName(fs->errcode); + else + errmsg = strerror(errno); + + return errmsg; +} + +/* + * Prepare an already alloc'ed LZ4File struct for subsequent calls. + * + * It creates the nessary contexts for the operations. When compressing, + * it additionally writes the LZ4 header in the output stream. + */ +static int +LZ4File_init(LZ4File * fs, int size, bool compressing) +{ + size_t status; + + if (fs->inited) + return 0; + + fs->compressing = compressing; + fs->inited = true; + + if (fs->compressing) + { + fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs); + if (fs->buflen < LZ4F_HEADER_SIZE_MAX) + fs->buflen = LZ4F_HEADER_SIZE_MAX; + + status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION); + if (LZ4F_isError(status)) + { + fs->errcode = status; + return 1; + } + + fs->buffer = pg_malloc(fs->buflen); + status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen, + &fs->prefs); + + if (LZ4F_isError(status)) + { + fs->errcode = status; + return 1; + } + + if (fwrite(fs->buffer, 1, status, fs->fp) != status) + { + errno = errno ? : ENOSPC; + return 1; + } + } + else + { + status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION); + if (LZ4F_isError(status)) + { + fs->errcode = status; + return 1; + } + + fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE; + fs->buffer = pg_malloc(fs->buflen); + + fs->overflowalloclen = fs->buflen; + fs->overflowbuf = pg_malloc(fs->overflowalloclen); + fs->overflowlen = 0; + } + + return 0; +} + +/* + * Read already decompressed content from the overflow buffer into 'ptr' up to + * 'size' bytes, if available. If the eol_flag is set, then stop at the first + * occurance of the new line char prior to 'size' bytes. + * + * Any unread content in the overflow buffer, is moved to the beginning. + */ +static int +LZ4File_read_overflow(LZ4File * fs, void *ptr, int size, bool eol_flag) +{ + char *p; + int readlen = 0; + + if (fs->overflowlen == 0) + return 0; + + if (fs->overflowlen >= size) + readlen = size; + else + readlen = fs->overflowlen; + + if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen))) + /* Include the line terminating char */ + readlen = p - fs->overflowbuf + 1; + + memcpy(ptr, fs->overflowbuf, readlen); + fs->overflowlen -= readlen; + + if (fs->overflowlen > 0) + memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen); + + return readlen; +} + +/* + * The workhorse for reading decompressed content out of an LZ4 compressed + * stream. + * + * It will read up to 'ptrsize' decompressed content, or up to the new line char + * if found first when the eol_flag is set. It is possible that the decompressed + * output generated by reading any compressed input via the LZ4F API, exceeds + * 'ptrsize'. Any exceeding decompressed content is stored at an overflow + * buffer within LZ4File. Of course, when the function is called, it will first + * try to consume any decompressed content already present in the overflow + * buffer, before decompressing new content. + */ +static int +LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag) +{ + size_t dsize = 0; + size_t rsize; + size_t size = ptrsize; + bool eol_found = false; + + void *readbuf; + + /* Lazy init */ + if (!fs->inited && LZ4File_init(fs, size, false /* decompressing */ )) + return -1; + + /* Verfiy that there is enough space in the outbuf */ + if (size > fs->buflen) + { + fs->buflen = size; + fs->buffer = pg_realloc(fs->buffer, size); + } + + /* use already decompressed content if available */ + dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag); + if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize))) + return dsize; + + readbuf = pg_malloc(size); + + do + { + char *rp; + char *rend; + + rsize = fread(readbuf, 1, size, fs->fp); + if (rsize < size && !feof(fs->fp)) + return -1; + + rp = (char *) readbuf; + rend = (char *) readbuf + rsize; + + while (rp < rend) + { + size_t status; + size_t outlen = fs->buflen; + size_t read_remain = rend - rp; + + memset(fs->buffer, 0, outlen); + status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen, + rp, &read_remain, NULL); + if (LZ4F_isError(status)) + { + fs->errcode = status; + return -1; + } + + rp += read_remain; + + /* + * fill in what space is available in ptr if the eol flag is set, + * either skip if one already found or fill up to EOL if present + * in the outbuf + */ + if (outlen > 0 && dsize < size && eol_found == false) + { + char *p; + size_t lib = (eol_flag == 0) ? size - dsize : size - 1 - dsize; + size_t len = outlen < lib ? outlen : lib; + + if (eol_flag == true && + (p = memchr(fs->buffer, '\n', outlen)) && + (size_t) (p - fs->buffer + 1) <= len) + { + len = p - fs->buffer + 1; + eol_found = true; + } + + memcpy((char *) ptr + dsize, fs->buffer, len); + dsize += len; + + /* move what did not fit, if any, at the begining of the buf */ + if (len < outlen) + memmove(fs->buffer, fs->buffer + len, outlen - len); + outlen -= len; + } + + /* if there is available output, save it */ + if (outlen > 0) + { + while (fs->overflowlen + outlen > fs->overflowalloclen) + { + fs->overflowalloclen *= 2; + fs->overflowbuf = pg_realloc(fs->overflowbuf, + fs->overflowalloclen); + } + + memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen); + fs->overflowlen += outlen; + } + } + } while (rsize == size && dsize < size && eol_found == 0); + + pg_free(readbuf); + + return (int) dsize; +} + +/* + * Compress size bytes from ptr and write them to the stream. + */ +static size_t +LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + size_t status; + int remaining = size; + + if (!fs->inited && LZ4File_init(fs, size, true)) + return -1; + + while (remaining > 0) + { + int chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE; + + remaining -= chunk; + + status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen, + ptr, chunk, NULL); + if (LZ4F_isError(status)) + { + fs->errcode = status; + return -1; + } + + if (fwrite(fs->buffer, 1, status, fs->fp) != status) + { + errno = errno ? : ENOSPC; + return 1; + } + } + + return size; +} + +/* + * fread() equivalent implementation for LZ4 compressed files. + */ +static size_t +LZ4File_read(void *ptr, size_t size, CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + int ret; + + ret = LZ4File_read_internal(fs, ptr, size, false); + if (ret != size && !LZ4File_eof(CFH)) + pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH)); + + return ret; +} + +/* + * fgetc() equivalent implementation for LZ4 compressed files. + */ +static int +LZ4File_getc(CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + unsigned char c; + + if (LZ4File_read_internal(fs, &c, 1, false) != 1) + { + if (!LZ4File_eof(CFH)) + pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH)); + else + pg_fatal("could not read from input file: end of file"); + } + + return c; +} + +/* + * fgets() equivalent implementation for LZ4 compressed files. + */ +static char * +LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH) +{ + LZ4File *fs = (LZ4File *) CFH->private_data; + size_t dsize; + + dsize = LZ4File_read_internal(fs, ptr, size, true); + if (dsize < 0) + pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH)); + + /* Done reading */ + if (dsize == 0) + return NULL; + + return ptr; +} + +/* + * Finalize (de)compression of a stream. When compressing it will write any + * remaining content and/or generated footer from the LZ4 API. + */ +static int +LZ4File_close(CompressFileHandle *CFH) +{ + FILE *fp; + LZ4File *fs = (LZ4File *) CFH->private_data; + size_t status; + int ret; + + fp = fs->fp; + if (fs->inited) + { + if (fs->compressing) + { + status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL); + if (LZ4F_isError(status)) + pg_fatal("failed to end compression: %s", + LZ4F_getErrorName(status)); + else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status) + { + errno = errno ? : ENOSPC; + WRITE_ERROR_EXIT; + } + + status = LZ4F_freeCompressionContext(fs->ctx); + if (LZ4F_isError(status)) + pg_fatal("failed to end compression: %s", + LZ4F_getErrorName(status)); + } + else + { + status = LZ4F_freeDecompressionContext(fs->dtx); + if (LZ4F_isError(status)) + pg_fatal("failed to end decompression: %s", + LZ4F_getErrorName(status)); + pg_free(fs->overflowbuf); + } + + pg_free(fs->buffer); + } + + pg_free(fs); + + return fclose(fp); +} + +static int +LZ4File_open(const char *path, int fd, const char *mode, + CompressFileHandle *CFH) +{ + FILE *fp; + LZ4File *lz4fp = (LZ4File *) CFH->private_data; + + if (fd >= 0) + fp = fdopen(fd, mode); + else + fp = fopen(path, mode); + if (fp == NULL) + { + lz4fp->errcode = errno; + return 1; + } + + lz4fp->fp = fp; + + return 0; +} + +static int +LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH) +{ + char *fname; + int ret; + + fname = psprintf("%s.lz4", path); + ret = CFH->open_func(fname, -1, mode, CFH); + pg_free(fname); + + return ret; +} + +void +InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +{ + LZ4File *lz4fp; + + CFH->open_func = LZ4File_open; + CFH->open_write_func = LZ4File_open_write; + CFH->read_func = LZ4File_read; + CFH->write_func = LZ4File_write; + CFH->gets_func = LZ4File_gets; + CFH->getc_func = LZ4File_getc; + CFH->eof_func = LZ4File_eof; + CFH->close_func = LZ4File_close; + CFH->get_error_func = LZ4File_get_error; + + CFH->compression_spec = compression_spec; + lz4fp = pg_malloc0(sizeof(*lz4fp)); + if (CFH->compression_spec.level >= 0) + lz4fp->prefs.compressionLevel = CFH->compression_spec.level; + + CFH->private_data = lz4fp; +} +#else /* USE_LZ4 */ +void +InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "LZ4"); +} + +void +InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "LZ4"); +} +#endif /* USE_LZ4 */ diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h new file mode 100644 index 00000000000..74595db1b98 --- /dev/null +++ b/src/bin/pg_dump/compress_lz4.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * compress_lz4.h + * Interface to compress_io.c routines + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_lz4.h + * + *------------------------------------------------------------------------- + */ +#ifndef _COMPRESS_LZ4_H_ +#define _COMPRESS_LZ4_H_ + +#include "compress_io.h" + +extern void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec); +extern void InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec); + +#endif /* _COMPRESS_LZ4_H_ */ diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index aa2c91829c0..473d40d456f 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -3,6 +3,7 @@ pg_dump_common_sources = files( 'compress_io.c', 'compress_gzip.c', + 'compress_lz4.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', @@ -17,7 +18,7 @@ pg_dump_common_sources = files( pg_dump_common = static_library('libpgdump_common', pg_dump_common_sources, c_pch: pch_postgres_fe_h, - dependencies: [frontend_code, libpq, zlib], + dependencies: [frontend_code, libpq, lz4, zlib], kwargs: internal_lib_args, ) @@ -85,7 +86,10 @@ tests += { 'sd': meson.current_source_dir(), 'bd': meson.current_build_dir(), 'tap': { - 'env': {'GZIP_PROGRAM': gzip.path()}, + 'env': { + 'GZIP_PROGRAM': gzip.path(), + 'LZ4': program_lz4.found() ? program_lz4.path() : '', + }, 'tests': [ 't/001_basic.pl', 't/002_pg_dump.pl', diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 7f06beff61c..2d406a5f0e3 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -394,6 +394,10 @@ RestoreArchive(Archive *AHX) #ifndef HAVE_LIBZ if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) supports_compression = false; +#endif +#ifndef USE_LZ4 + if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4) + supports_compression = false; #endif if (supports_compression == false) pg_fatal("cannot restore from compressed archive (compression not supported in this installation)"); @@ -2073,7 +2077,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) /* * Check if the specified archive is a directory. If so, check if - * there's a "toc.dat" (or "toc.dat.gz") file in it. + * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it. */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { @@ -2083,6 +2087,10 @@ _discoverArchiveFormat(ArchiveHandle *AH) #ifdef HAVE_LIBZ if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) return AH->format; +#endif +#ifdef USE_LZ4 + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4")) + return AH->format; #endif pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); @@ -3746,6 +3754,10 @@ ReadHead(ArchiveHandle *AH) #ifndef HAVE_LIBZ if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP) unsupported = true; +#endif +#ifndef USE_LZ4 + if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4) + unsupported = true; #endif if (unsupported) pg_fatal("archive is compressed, but this installation does not support compression"); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 20f73729fac..224d2c900ce 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -715,13 +715,12 @@ main(int argc, char **argv) case PG_COMPRESSION_NONE: /* fallthrough */ case PG_COMPRESSION_GZIP: + /* fallthrough */ + case PG_COMPRESSION_LZ4: break; case PG_COMPRESSION_ZSTD: pg_fatal("compression with %s is not yet supported", "ZSTD"); break; - case PG_COMPRESSION_LZ4: - pg_fatal("compression with %s is not yet supported", "LZ4"); - break; } /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index f3ba9263213..f497ec60407 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -139,6 +139,80 @@ my %pgdump_runs = ( args => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ], }, }, + + # Do not use --no-sync to give test coverage for data sync. + compression_lz4_custom => { + test_key => 'compression', + compile_option => 'lz4', + dump_cmd => [ + 'pg_dump', '--format=custom', + '--compress=lz4', "--file=$tempdir/compression_lz4_custom.dump", + 'postgres', + ], + restore_cmd => [ + 'pg_restore', + "--file=$tempdir/compression_lz4_custom.sql", + "$tempdir/compression_lz4_custom.dump", + ], + command_like => { + command => [ + 'pg_restore', + '-l', "$tempdir/compression_lz4_custom.dump", + ], + expected => qr/Compression: lz4/, + name => 'data content is lz4 compressed' + }, + }, + + # Do not use --no-sync to give test coverage for data sync. + compression_lz4_dir => { + test_key => 'compression', + compile_option => 'lz4', + dump_cmd => [ + 'pg_dump', '--jobs=2', + '--format=directory', '--compress=lz4:1', + "--file=$tempdir/compression_lz4_dir", 'postgres', + ], + # Give coverage for manually compressed blob.toc files during + # restore. + compress_cmd => { + program => $ENV{'LZ4'}, + args => [ + '-z', '-f', '--rm', + "$tempdir/compression_lz4_dir/blobs.toc", + "$tempdir/compression_lz4_dir/blobs.toc.lz4", + ], + }, + # Verify that data files where compressed + glob_patterns => [ + "$tempdir/compression_lz4_dir/toc.dat", + "$tempdir/compression_lz4_dir/*.dat.lz4", + ], + restore_cmd => [ + 'pg_restore', '--jobs=2', + "--file=$tempdir/compression_lz4_dir.sql", + "$tempdir/compression_lz4_dir", + ], + }, + + compression_lz4_plain => { + test_key => 'compression', + compile_option => 'lz4', + dump_cmd => [ + 'pg_dump', '--format=plain', '--compress=lz4', + "--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres', + ], + # Decompress the generated file to run through the tests. + compress_cmd => { + program => $ENV{'LZ4'}, + args => [ + '-d', '-f', + "$tempdir/compression_lz4_plain.sql.lz4", + "$tempdir/compression_lz4_plain.sql", + ], + }, + }, + clean => { dump_cmd => [ 'pg_dump', @@ -4175,11 +4249,11 @@ foreach my $run (sort keys %pgdump_runs) my $run_db = 'postgres'; # Skip command-level tests for gzip if there is no support for it. - if ( defined($pgdump_runs{$run}->{compile_option}) - && $pgdump_runs{$run}->{compile_option} eq 'gzip' - && !$supports_gzip) + if ($pgdump_runs{$run}->{compile_option} && + ($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) || + ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)) { - note "$run: skipped due to no gzip support"; + note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support"; next; } diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck index 62f3e9a81d4..2b461e797c6 100755 --- a/src/tools/pginclude/cpluspluscheck +++ b/src/tools/pginclude/cpluspluscheck @@ -152,6 +152,7 @@ do # as field names, which is unfortunate but we won't change it now. test "$f" = src/bin/pg_dump/compress_gzip.h && continue test "$f" = src/bin/pg_dump/compress_io.h && continue + test "$f" = src/bin/pg_dump/compress_lz4.h && continue test "$f" = src/bin/pg_dump/parallel.h && continue test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue test "$f" = src/bin/pg_dump/pg_dump.h && continue diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 840191d680b..232228d427c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1381,6 +1381,7 @@ LWLock LWLockHandle LWLockMode LWLockPadded +LZ4CompressorState LZ4F_compressionContext_t LZ4F_decompressOptions_t LZ4F_decompressionContext_t -- 2.25.1
>From cf1efb67c49c5e31d77049f5469967dd750ee8c9 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 14 Jan 2023 10:45:02 -0600 Subject: [PATCH 6/7] f! --- src/bin/pg_dump/compress_lz4.c | 34 ++++++++++++++++---------------- src/bin/pg_dump/compress_lz4.h | 4 ++-- src/bin/pg_dump/t/002_pg_dump.pl | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index c97e16187a0..0e259a6251a 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -117,13 +117,13 @@ EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) /* Public routines that support LZ4 compressed data I/O */ void -InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) +InitCompressorLZ4(CompressorState *cs, const pg_compress_specification *compression_spec) { cs->readData = ReadDataFromArchiveLZ4; cs->writeData = WriteDataToArchiveLZ4; cs->end = EndCompressorLZ4; - cs->compression_spec = compression_spec; + cs->compression_spec = *compression_spec; /* Will be lazy init'd */ cs->private_data = pg_malloc0(sizeof(LZ4CompressorState)); @@ -189,7 +189,7 @@ LZ4File_get_error(CompressFileHandle *CFH) /* * Prepare an already alloc'ed LZ4File struct for subsequent calls. * - * It creates the nessary contexts for the operations. When compressing, + * It creates the necessary contexts for the operations. When compressing, * it additionally writes the LZ4 header in the output stream. */ static int @@ -228,7 +228,7 @@ LZ4File_init(LZ4File * fs, int size, bool compressing) if (fwrite(fs->buffer, 1, status, fs->fp) != status) { - errno = errno ? : ENOSPC; + errno = errno ? errno : ENOSPC; return 1; } } @@ -255,7 +255,7 @@ LZ4File_init(LZ4File * fs, int size, bool compressing) /* * Read already decompressed content from the overflow buffer into 'ptr' up to * 'size' bytes, if available. If the eol_flag is set, then stop at the first - * occurance of the new line char prior to 'size' bytes. + * occurrence of the new line char prior to 'size' bytes. * * Any unread content in the overflow buffer, is moved to the beginning. */ @@ -309,10 +309,10 @@ LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag) void *readbuf; /* Lazy init */ - if (!fs->inited && LZ4File_init(fs, size, false /* decompressing */ )) + if (LZ4File_init(fs, size, false /* decompressing */ )) return -1; - /* Verfiy that there is enough space in the outbuf */ + /* Verify that there is enough space in the outbuf */ if (size > fs->buflen) { fs->buflen = size; @@ -363,10 +363,10 @@ LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag) if (outlen > 0 && dsize < size && eol_found == false) { char *p; - size_t lib = (eol_flag == 0) ? size - dsize : size - 1 - dsize; + size_t lib = eol_flag ? size - 1 - dsize : size - dsize ; size_t len = outlen < lib ? outlen : lib; - if (eol_flag == true && + if (eol_flag && (p = memchr(fs->buffer, '\n', outlen)) && (size_t) (p - fs->buffer + 1) <= len) { @@ -377,7 +377,7 @@ LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag) memcpy((char *) ptr + dsize, fs->buffer, len); dsize += len; - /* move what did not fit, if any, at the begining of the buf */ + /* move what did not fit, if any, at the beginning of the buf */ if (len < outlen) memmove(fs->buffer, fs->buffer + len, outlen - len); outlen -= len; @@ -414,7 +414,7 @@ LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH) size_t status; int remaining = size; - if (!fs->inited && LZ4File_init(fs, size, true)) + if (LZ4File_init(fs, size, true)) return -1; while (remaining > 0) @@ -433,7 +433,7 @@ LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH) if (fwrite(fs->buffer, 1, status, fs->fp) != status) { - errno = errno ? : ENOSPC; + errno = errno ? errno : ENOSPC; return 1; } } @@ -520,7 +520,7 @@ LZ4File_close(CompressFileHandle *CFH) LZ4F_getErrorName(status)); else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status) { - errno = errno ? : ENOSPC; + errno = errno ? errno : ENOSPC; WRITE_ERROR_EXIT; } @@ -582,7 +582,7 @@ LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH) } void -InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification *compression_spec) { LZ4File *lz4fp; @@ -596,7 +596,7 @@ InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compres CFH->close_func = LZ4File_close; CFH->get_error_func = LZ4File_get_error; - CFH->compression_spec = compression_spec; + CFH->compression_spec = *compression_spec; lz4fp = pg_malloc0(sizeof(*lz4fp)); if (CFH->compression_spec.level >= 0) lz4fp->prefs.compressionLevel = CFH->compression_spec.level; @@ -605,13 +605,13 @@ InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compres } #else /* USE_LZ4 */ void -InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) +InitCompressorLZ4(CompressorState *cs, const pg_compress_specification *compression_spec) { pg_fatal("this build does not support compression with %s", "LZ4"); } void -InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification *compression_spec) { pg_fatal("this build does not support compression with %s", "LZ4"); } diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h index 74595db1b98..69a3d9c171f 100644 --- a/src/bin/pg_dump/compress_lz4.h +++ b/src/bin/pg_dump/compress_lz4.h @@ -16,7 +16,7 @@ #include "compress_io.h" -extern void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec); -extern void InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec); +extern void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification *compression_spec); +extern void InitCompressLZ4(CompressFileHandle *CFH, const pg_compress_specification *compression_spec); #endif /* _COMPRESS_LZ4_H_ */ diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index f497ec60407..263995a2b7a 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -183,7 +183,7 @@ my %pgdump_runs = ( "$tempdir/compression_lz4_dir/blobs.toc.lz4", ], }, - # Verify that data files where compressed + # Verify that data files were compressed glob_patterns => [ "$tempdir/compression_lz4_dir/toc.dat", "$tempdir/compression_lz4_dir/*.dat.lz4", -- 2.25.1
>From d2fe3c9c5bb1fdce5c58af2773c603358085a0bb Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Wed, 4 Jan 2023 21:21:53 -0600 Subject: [PATCH 7/7] TMP: pg_dump: use lz4 by default, for CI only --- src/bin/pg_dump/pg_dump.c | 7 +++++-- src/bin/pg_dump/t/002_pg_dump.pl | 8 ++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 224d2c900ce..cf5083c432f 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -733,8 +733,11 @@ main(int argc, char **argv) #ifdef HAVE_LIBZ parse_compress_specification(PG_COMPRESSION_GZIP, NULL, &compression_spec); -#else - /* Nothing to do in the default case */ +#endif + +#ifdef USE_LZ4 + parse_compress_specification(PG_COMPRESSION_LZ4, NULL, + &compression_spec); #endif } diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 263995a2b7a..3485ebca57d 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -313,9 +313,9 @@ my %pgdump_runs = ( command => [ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ], expected => $supports_gzip ? - qr/Compression: gzip/ : + qr/Compression: lz4/ : qr/Compression: none/, - name => 'data content is gzip-compressed by default if available', + name => 'data content is lz4-compressed by default if available', }, }, @@ -338,7 +338,7 @@ my %pgdump_runs = ( command => [ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ], expected => $supports_gzip ? - qr/Compression: gzip/ : + qr/Compression: lz4/ : qr/Compression: none/, name => 'data content is gzip-compressed by default', }, @@ -346,7 +346,7 @@ my %pgdump_runs = ( "$tempdir/defaults_dir_format/toc.dat", "$tempdir/defaults_dir_format/blobs.toc", $supports_gzip ? - "$tempdir/defaults_dir_format/*.dat.gz" : + "$tempdir/defaults_dir_format/*.dat.lz4" : "$tempdir/defaults_dir_format/*.dat", ], }, -- 2.25.1