Hi, I have added support for decompressing a gzip compressed tar file at client. pg_basebackup can enable server side compression for plain format backup with this change.
Added a gzip extractor which decompresses the compressed archive and forwards it to the next streamer. I have done initial testing and working on updating the test coverage. Note: Before applying the patch, please apply Robert's v11 version of the patches 0001 and 0002. Thanks, Dipesh
From 737badce26ed05b5cdb64d9ffd1735fef9acbbf8 Mon Sep 17 00:00:00 2001 From: Dipesh Pandit <dipesh.pan...@enterprisedb.com> Date: Wed, 19 Jan 2022 17:11:45 +0530 Subject: [PATCH] Support for extracting gzip compressed archive pg_basebackup can support server side compression using gzip. In order to support plain format backup with option '-Fp' we need to add support for decompressing the compressed blocks at client. This patch addresses the extraction of gzip compressed blocks at client. --- src/bin/pg_basebackup/bbstreamer.h | 1 + src/bin/pg_basebackup/bbstreamer_file.c | 175 ++++++++++++++++++++++++++++++++ src/bin/pg_basebackup/pg_basebackup.c | 58 +++++++++-- 3 files changed, 225 insertions(+), 9 deletions(-) diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h index fc88b50..270b0df 100644 --- a/src/bin/pg_basebackup/bbstreamer.h +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -205,6 +205,7 @@ extern bbstreamer *bbstreamer_extractor_new(const char *basepath, const char *(*link_map) (const char *), void (*report_output_file) (const char *)); +extern bbstreamer *bbstreamer_gzip_extractor_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/bbstreamer_file.c index 77ca222..350af1d 100644 --- a/src/bin/pg_basebackup/bbstreamer_file.c +++ b/src/bin/pg_basebackup/bbstreamer_file.c @@ -37,6 +37,13 @@ typedef struct bbstreamer_gzip_writer char *pathname; gzFile gzfile; } bbstreamer_gzip_writer; + +typedef struct bbstreamer_gzip_extractor +{ + bbstreamer base; + z_stream zstream; + size_t bytes_written; +} bbstreamer_gzip_extractor; #endif typedef struct bbstreamer_extractor @@ -76,6 +83,21 @@ const bbstreamer_ops bbstreamer_gzip_writer_ops = { .finalize = bbstreamer_gzip_writer_finalize, .free = bbstreamer_gzip_writer_free }; + +static void bbstreamer_gzip_extractor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_gzip_extractor_finalize(bbstreamer *streamer); +static void bbstreamer_gzip_extractor_free(bbstreamer *streamer); +static void *gzip_palloc(void *opaque, unsigned items, unsigned size); +static void gzip_pfree(void *opaque, void *address); + +const bbstreamer_ops bbstreamer_gzip_extractor_ops = { + .content = bbstreamer_gzip_extractor_content, + .finalize = bbstreamer_gzip_extractor_finalize, + .free = bbstreamer_gzip_extractor_free +}; #endif static void bbstreamer_extractor_content(bbstreamer *streamer, @@ -349,6 +371,159 @@ get_gz_error(gzFile gzf) #endif /* + * Create a new base backup streamer that performs decompression of gzip + * compressed blocks. + */ +bbstreamer * +bbstreamer_gzip_extractor_new(bbstreamer *next) +{ +#ifdef HAVE_LIBZ + bbstreamer_gzip_extractor *streamer; + z_stream *zs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(bbstreamer_gzip_extractor)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_gzip_extractor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + zs = &streamer->zstream; + zs->zalloc = gzip_palloc; + zs->zfree = gzip_pfree; + zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; + zs->avail_out = streamer->base.bbs_buffer.maxlen; + + /* + * Data compression was initialized using deflateInit2 to request a gzip + * header. Similarly, we are using inflateInit2 to initialize data + * decompression. + * "windowBits" must be greater than or equal to "windowBits" value + * provided to deflateInit2 while compressing. + */ + if (inflateInit2(zs, 15 + 16) != Z_OK) + { + pg_log_error("could not initialize compression library"); + exit(1); + + } + + return &streamer->base; +#else + pg_log_error("this build does not support compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZ +/* + * Decompress the input data to output buffer until we ran out of the input + * data. Each time the output buffer is full invoke bbstreamer_content to pass + * on the decompressed data to next streamer. + */ +static void +bbstreamer_gzip_extractor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer; + z_stream *zs = &mystreamer->zstream; + int res; + + + zs->next_in = (uint8 *) data; + zs->avail_in = len; + + /* Process the current chunk */ + while (zs->avail_in > 0) + { + Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); + + zs->next_out = (uint8 *) + mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + zs->avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * Decompresses data starting at zs->next_in and update zs->next_in + * and zs->avail_in, generate output data starting at zs->next_out + * and update zs->next_out and zs->avail_out accordingly. + */ + res = inflate(zs, Z_NO_FLUSH); + + if (res == Z_STREAM_ERROR) + pg_log_error("could not decompress data: %s", zs->msg); + + mystreamer->bytes_written = mystreamer->base.bbs_buffer.maxlen - zs->avail_out; + + /* If output buffer is full then pass on the content to next streamer */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + bbstreamer_content(mystreamer->base.bbs_next, member, mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, context); + mystreamer->bytes_written = 0; + } + } + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (res == Z_STREAM_END) { + bbstreamer_content(mystreamer->base.bbs_next, member, mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, context); + } +} + +/* + * End-of-stream processing. + */ +static void +bbstreamer_gzip_extractor_finalize(bbstreamer *streamer) +{ + bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer; + + bbstreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +bbstreamer_gzip_extractor_free(bbstreamer *streamer) +{ + bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer; + + bbstreamer_free(mystreamer->base.bbs_next); + pfree(mystreamer->base.bbs_buffer.data); + pfree(streamer); +} + +/* + * Wrapper function to adjust the signature of palloc to match what libz + * expects. + */ +static void * +gzip_palloc(void *opaque, unsigned items, unsigned size) +{ + return palloc(items * size); +} + +/* + * Wrapper function to adjust the signature of pfree to match what libz + * expects. + */ +static void +gzip_pfree(void *opaque, void *address) +{ + pfree(address); +} +#endif + +/* * Create a bbstreamer that extracts an archive. * * All pathnames in the archive are interpreted relative to basepath. diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 6ee49a5..b5e31aa 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -111,6 +111,12 @@ typedef enum STREAM_WAL } IncludeWal; +typedef enum +{ + BACKUP_COMPRESSION_NONE, + BACKUP_COMPRESSION_GZIP +} compression_type; + /* Global options */ static char *basedir = NULL; static TablespaceList tablespace_dirs = {NULL, NULL}; @@ -173,6 +179,10 @@ static int has_xlogendptr = 0; static volatile LONG has_xlogendptr = 0; #endif +/* Server side compression method and compression level */ +static compression_type server_compression_type = BACKUP_COMPRESSION_NONE; +static int server_compression_level = 0; + /* Contents of configuration file to be generated */ static PQExpBuffer recoveryconfcontents = NULL; @@ -1028,15 +1038,23 @@ CreateBackupStreamer(char *archive_name, char *spclocation, /* At present, we only know how to parse tar archives. */ if (must_parse_archive && !is_tar) { - pg_log_error("unable to parse archive: %s", archive_name); - pg_log_info("only tar archives can be parsed"); - if (format == 'p') - pg_log_info("plain format requires pg_basebackup to parse the archive"); - if (inject_manifest) - pg_log_info("using - as the output directory requires pg_basebackup to parse the archive"); - if (writerecoveryconf) - pg_log_info("the -R option requires pg_basebackup to parse the archive"); - exit(1); + /* + * If the archived is compressed using a compression method other than + * gzip then we don't know how to extract it. + */ + if (server_compression != NULL && + server_compression_type != BACKUP_COMPRESSION_GZIP) + { + pg_log_error("unable to parse archive: %s", archive_name); + pg_log_info("only tar archives can be parsed"); + if (format == 'p') + pg_log_info("plain format requires pg_basebackup to parse the archive"); + if (inject_manifest) + pg_log_info("using - as the output directory requires pg_basebackup to parse the archive"); + if (writerecoveryconf) + pg_log_info("the -R option requires pg_basebackup to parse the archive"); + exit(1); + } } if (format == 'p') @@ -1136,6 +1154,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation, else if (expect_unterminated_tarfile) streamer = bbstreamer_tar_terminator_new(streamer); + /* + * Extract the gzip compressed archive using a gzip extractor and then + * forward it to next streamer. + */ + if (format == 'p' && server_compression_type == BACKUP_COMPRESSION_GZIP) + streamer = bbstreamer_gzip_extractor_new(streamer); + /* Return the results. */ *manifest_inject_streamer_p = manifest_inject_streamer; return streamer; @@ -2448,6 +2473,21 @@ main(int argc, char **argv) exit(1); } + if (server_compression != NULL) + { + if (strcmp(server_compression, "gzip") == 0) + server_compression_type = BACKUP_COMPRESSION_GZIP; + else if (strlen(server_compression) == 5 && + strncmp(server_compression, "gzip", 4) == 0 && + server_compression[4] >= '1' && server_compression[4] <= '9') + { + server_compression_type = BACKUP_COMPRESSION_GZIP; + server_compression_level = server_compression[4] - '0'; + } + } + else + server_compression_type = BACKUP_COMPRESSION_NONE; + /* * Compression doesn't make sense unless tar format is in use. */ -- 1.8.3.1