Hi,

I have added support for decompressing a gzip compressed tar file
at client. pg_basebackup can enable server side compression for
plain format backup with this change.

Added a gzip extractor which decompresses the compressed archive
and forwards it to the next streamer. I have done initial testing and
working on updating the test coverage.

Note: Before applying the patch, please apply Robert's v11 version
of the patches 0001 and 0002.

Thanks,
Dipesh
From 737badce26ed05b5cdb64d9ffd1735fef9acbbf8 Mon Sep 17 00:00:00 2001
From: Dipesh Pandit <dipesh.pan...@enterprisedb.com>
Date: Wed, 19 Jan 2022 17:11:45 +0530
Subject: [PATCH] Support for extracting gzip compressed archive

pg_basebackup can support server side compression using gzip. In
order to support plain format backup with option '-Fp' we need to
add support for decompressing the compressed blocks at client. This
patch addresses the extraction of gzip compressed blocks at client.
---
 src/bin/pg_basebackup/bbstreamer.h      |   1 +
 src/bin/pg_basebackup/bbstreamer_file.c | 175 ++++++++++++++++++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c   |  58 +++++++++--
 3 files changed, 225 insertions(+), 9 deletions(-)

diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
index fc88b50..270b0df 100644
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -205,6 +205,7 @@ extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
 											const char *(*link_map) (const char *),
 											void (*report_output_file) (const char *));
 
+extern bbstreamer *bbstreamer_gzip_extractor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/bbstreamer_file.c
index 77ca222..350af1d 100644
--- a/src/bin/pg_basebackup/bbstreamer_file.c
+++ b/src/bin/pg_basebackup/bbstreamer_file.c
@@ -37,6 +37,13 @@ typedef struct bbstreamer_gzip_writer
 	char	   *pathname;
 	gzFile		gzfile;
 }			bbstreamer_gzip_writer;
+
+typedef struct bbstreamer_gzip_extractor
+{
+	bbstreamer	base;
+	z_stream	zstream;
+	size_t		bytes_written;
+} bbstreamer_gzip_extractor;
 #endif
 
 typedef struct bbstreamer_extractor
@@ -76,6 +83,21 @@ const bbstreamer_ops bbstreamer_gzip_writer_ops = {
 	.finalize = bbstreamer_gzip_writer_finalize,
 	.free = bbstreamer_gzip_writer_free
 };
+
+static void bbstreamer_gzip_extractor_content(bbstreamer *streamer,
+											  bbstreamer_member *member,
+											  const char *data, int len,
+											  bbstreamer_archive_context context);
+static void bbstreamer_gzip_extractor_finalize(bbstreamer *streamer);
+static void bbstreamer_gzip_extractor_free(bbstreamer *streamer);
+static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
+static void gzip_pfree(void *opaque, void *address);
+
+const bbstreamer_ops bbstreamer_gzip_extractor_ops = {
+	.content = bbstreamer_gzip_extractor_content,
+	.finalize = bbstreamer_gzip_extractor_finalize,
+	.free = bbstreamer_gzip_extractor_free
+};
 #endif
 
 static void bbstreamer_extractor_content(bbstreamer *streamer,
@@ -349,6 +371,159 @@ get_gz_error(gzFile gzf)
 #endif
 
 /*
+ * Create a new base backup streamer that performs decompression of gzip
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_gzip_extractor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBZ
+	bbstreamer_gzip_extractor	*streamer;
+	z_stream *zs;
+
+	Assert(next != NULL);
+
+	streamer = palloc0(sizeof(bbstreamer_gzip_extractor));
+	*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+		&bbstreamer_gzip_extractor_ops;
+
+	streamer->base.bbs_next = next;
+	initStringInfo(&streamer->base.bbs_buffer);
+
+	/* Initialize internal stream state for decompression */
+	zs = &streamer->zstream;
+	zs->zalloc = gzip_palloc;
+	zs->zfree = gzip_pfree;
+	zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
+	zs->avail_out = streamer->base.bbs_buffer.maxlen;
+
+	/*
+	 * Data compression was initialized using deflateInit2 to request a gzip
+	 * header. Similarly, we are using inflateInit2 to initialize data
+	 * decompression.
+	 * "windowBits" must be greater than or equal to "windowBits" value
+	 * provided to deflateInit2 while compressing.
+	 */
+	if (inflateInit2(zs, 15 + 16) != Z_OK)
+	{
+		pg_log_error("could not initialize compression library");
+		exit(1);
+
+	}
+
+	return &streamer->base;
+#else
+	pg_log_error("this build does not support compression");
+	exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Decompress the input data to output buffer until we ran out of the input
+ * data. Each time the output buffer is full invoke bbstreamer_content to pass
+ * on the decompressed data to next streamer.
+ */
+static void
+bbstreamer_gzip_extractor_content(bbstreamer *streamer,
+								  bbstreamer_member *member,
+								  const char *data, int len,
+								  bbstreamer_archive_context context)
+{
+	bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer;
+	z_stream *zs = &mystreamer->zstream;
+	int res;
+
+
+	zs->next_in = (uint8 *) data;
+	zs->avail_in = len;
+
+	/* Process the current chunk */
+	while (zs->avail_in > 0)
+	{
+		Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
+
+		zs->next_out = (uint8 *)
+			mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+		zs->avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+		/*
+		 * Decompresses data starting at zs->next_in and update zs->next_in
+		 * and zs->avail_in, generate output data starting at zs->next_out
+		 * and update zs->next_out and zs->avail_out accordingly.
+		 */
+		res = inflate(zs, Z_NO_FLUSH);
+
+		if (res == Z_STREAM_ERROR)
+			pg_log_error("could not decompress data: %s", zs->msg);
+
+		mystreamer->bytes_written = mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
+
+		/* If output buffer is full then pass on the content to next streamer */
+		if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+		{
+			bbstreamer_content(mystreamer->base.bbs_next, member, mystreamer->base.bbs_buffer.data,
+							   mystreamer->base.bbs_buffer.maxlen, context);
+			mystreamer->bytes_written = 0;
+		}
+	}
+
+	/*
+	 * End of the stream, if there is some pending data in output buffers then
+	 * we must forward it to next streamer.
+	 */
+	if (res == Z_STREAM_END) {
+		bbstreamer_content(mystreamer->base.bbs_next, member, mystreamer->base.bbs_buffer.data,
+				mystreamer->bytes_written, context);
+	}
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_gzip_extractor_finalize(bbstreamer *streamer)
+{
+	bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer;
+
+	bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_gzip_extractor_free(bbstreamer *streamer)
+{
+	bbstreamer_gzip_extractor *mystreamer = (bbstreamer_gzip_extractor *) streamer;
+
+	bbstreamer_free(mystreamer->base.bbs_next);
+	pfree(mystreamer->base.bbs_buffer.data);
+	pfree(streamer);
+}
+
+/*
+ * Wrapper function to adjust the signature of palloc to match what libz
+ * expects.
+ */
+static void *
+gzip_palloc(void *opaque, unsigned items, unsigned size)
+{
+	return palloc(items * size);
+}
+
+/*
+ * Wrapper function to adjust the signature of pfree to match what libz
+ * expects.
+ */
+static void
+gzip_pfree(void *opaque, void *address)
+{
+	pfree(address);
+}
+#endif
+
+/*
  * Create a bbstreamer that extracts an archive.
  *
  * All pathnames in the archive are interpreted relative to basepath.
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 6ee49a5..b5e31aa 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -111,6 +111,12 @@ typedef enum
 	STREAM_WAL
 } IncludeWal;
 
+typedef enum
+{
+	BACKUP_COMPRESSION_NONE,
+	BACKUP_COMPRESSION_GZIP
+} compression_type;
+
 /* Global options */
 static char *basedir = NULL;
 static TablespaceList tablespace_dirs = {NULL, NULL};
@@ -173,6 +179,10 @@ static int	has_xlogendptr = 0;
 static volatile LONG has_xlogendptr = 0;
 #endif
 
+/* Server side compression method and compression level */
+static compression_type	server_compression_type = BACKUP_COMPRESSION_NONE;
+static int				server_compression_level = 0;
+
 /* Contents of configuration file to be generated */
 static PQExpBuffer recoveryconfcontents = NULL;
 
@@ -1028,15 +1038,23 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	/* At present, we only know how to parse tar archives. */
 	if (must_parse_archive && !is_tar)
 	{
-		pg_log_error("unable to parse archive: %s", archive_name);
-		pg_log_info("only tar archives can be parsed");
-		if (format == 'p')
-			pg_log_info("plain format requires pg_basebackup to parse the archive");
-		if (inject_manifest)
-			pg_log_info("using - as the output directory requires pg_basebackup to parse the archive");
-		if (writerecoveryconf)
-			pg_log_info("the -R option requires pg_basebackup to parse the archive");
-		exit(1);
+		/*
+		 * If the archived is compressed using a compression method other than
+		 * gzip then we don't know how to extract it.
+		 */
+		if (server_compression != NULL &&
+				server_compression_type != BACKUP_COMPRESSION_GZIP)
+		{
+			pg_log_error("unable to parse archive: %s", archive_name);
+			pg_log_info("only tar archives can be parsed");
+			if (format == 'p')
+				pg_log_info("plain format requires pg_basebackup to parse the archive");
+			if (inject_manifest)
+				pg_log_info("using - as the output directory requires pg_basebackup to parse the archive");
+			if (writerecoveryconf)
+				pg_log_info("the -R option requires pg_basebackup to parse the archive");
+			exit(1);
+		}
 	}
 
 	if (format == 'p')
@@ -1136,6 +1154,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	else if (expect_unterminated_tarfile)
 		streamer = bbstreamer_tar_terminator_new(streamer);
 
+	/*
+	 * Extract the gzip compressed archive using a gzip extractor and then
+	 * forward it to next streamer.
+	 */
+	if (format == 'p' && server_compression_type == BACKUP_COMPRESSION_GZIP)
+		streamer = bbstreamer_gzip_extractor_new(streamer);
+
 	/* Return the results. */
 	*manifest_inject_streamer_p = manifest_inject_streamer;
 	return streamer;
@@ -2448,6 +2473,21 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (server_compression != NULL)
+	{
+		if (strcmp(server_compression, "gzip") == 0)
+			server_compression_type = BACKUP_COMPRESSION_GZIP;
+		else if (strlen(server_compression) == 5 &&
+				strncmp(server_compression, "gzip", 4) == 0 &&
+				server_compression[4] >= '1' && server_compression[4] <= '9')
+		{
+			server_compression_type = BACKUP_COMPRESSION_GZIP;
+			server_compression_level = server_compression[4] - '0';
+		}
+	}
+	else
+		server_compression_type = BACKUP_COMPRESSION_NONE;
+
 	/*
 	 * Compression doesn't make sense unless tar format is in use.
 	 */
-- 
1.8.3.1

Reply via email to