Hi,

Thanks for the feedback, I have incorporated the suggestions
and updated a new patch. PFA v2 patch.

> I think similar to bbstreamer_lz4_compressor_content() in
> bbstreamer_lz4_decompressor_content() we can change len to avail_in.

In bbstreamer_lz4_decompressor_content(), we are modifying avail_in
based on the number of bytes decompressed in each iteration. I think
we cannot replace it with "len" here.

Jeevan, Your v12 patch does not apply on HEAD, it requires a
rebase. I have applied it on commit 400fc6b6487ddf16aa82c9d76e5cfbe64d94f660
to validate my v2 patch.

Thanks,
Dipesh
From 47a0ef4348747ffa61eccd7954e00f3cf5fc7222 Mon Sep 17 00:00:00 2001
From: Dipesh Pandit <dipesh.pan...@enterprisedb.com>
Date: Thu, 3 Feb 2022 18:31:03 +0530
Subject: [PATCH] support client side compression and decompression using LZ4

---
 src/bin/pg_basebackup/Makefile                |   1 +
 src/bin/pg_basebackup/bbstreamer.h            |   3 +
 src/bin/pg_basebackup/bbstreamer_lz4.c        | 431 ++++++++++++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c         |  32 +-
 src/bin/pg_verifybackup/t/009_extract.pl      |   7 +-
 src/bin/pg_verifybackup/t/010_client_untar.pl | 111 +++++++
 src/tools/msvc/Mkvcbuild.pm                   |   1 +
 7 files changed, 580 insertions(+), 6 deletions(-)
 create mode 100644 src/bin/pg_basebackup/bbstreamer_lz4.c
 create mode 100644 src/bin/pg_verifybackup/t/010_client_untar.pl

diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index ada3a5a..1d0db4f 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -43,6 +43,7 @@ BBOBJS = \
 	bbstreamer_file.o \
 	bbstreamer_gzip.o \
 	bbstreamer_inject.o \
+	bbstreamer_lz4.o \
 	bbstreamer_tar.o
 
 all: pg_basebackup pg_receivewal pg_recvlogical
diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
index fe49ae3..c2de77b 100644
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -206,6 +206,9 @@ extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
 											void (*report_output_file) (const char *));
 
 extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
+												 int compresslevel);
+extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c
new file mode 100644
index 0000000..f0bc226
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_lz4.c
@@ -0,0 +1,431 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_lz4.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/bbstreamer_lz4.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+#ifdef HAVE_LIBLZ4
+typedef struct bbstreamer_lz4_frame
+{
+	bbstreamer	base;
+
+	LZ4F_compressionContext_t	cctx;
+	LZ4F_decompressionContext_t	dctx;
+	LZ4F_preferences_t			prefs;
+
+	size_t		bytes_written;
+	bool		header_written;
+} bbstreamer_lz4_frame;
+
+static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+											  bbstreamer_member *member,
+											  const char *data, int len,
+											  bbstreamer_archive_context context);
+static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
+	.content = bbstreamer_lz4_compressor_content,
+	.finalize = bbstreamer_lz4_compressor_finalize,
+	.free = bbstreamer_lz4_compressor_free
+};
+
+static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+												bbstreamer_member *member,
+												const char *data, int len,
+												bbstreamer_archive_context context);
+static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
+	.content = bbstreamer_lz4_decompressor_content,
+	.finalize = bbstreamer_lz4_decompressor_finalize,
+	.free = bbstreamer_lz4_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs lz4 compression of tar
+ * blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel)
+{
+#ifdef HAVE_LIBLZ4
+	bbstreamer_lz4_frame   *streamer;
+	LZ4F_errorCode_t		ctxError;
+	LZ4F_preferences_t	   *prefs;
+	size_t					compressed_bound;
+
+	Assert(next != NULL);
+
+	streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+	*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+		&bbstreamer_lz4_compressor_ops;
+
+	streamer->base.bbs_next = next;
+	initStringInfo(&streamer->base.bbs_buffer);
+	streamer->header_written = false;
+
+	/* Initialize stream compression preferences */
+	prefs = &streamer->prefs;
+	memset(prefs, 0, sizeof(LZ4F_preferences_t));
+	prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+	prefs->compressionLevel = compresslevel;
+
+	/*
+	 * Find out the compression bound, it specifies the minimum destination
+	 * capacity required in worst case for the success of compression operation
+	 * (LZ4F_compressUpdate) based on a given source size and preferences.
+	 */
+	compressed_bound = LZ4F_compressBound(streamer->base.bbs_buffer.maxlen, prefs);
+
+	/* Enlarge buffer if it falls short of compression bound. */
+	if (streamer->base.bbs_buffer.maxlen <= compressed_bound)
+		enlargeStringInfo(&streamer->base.bbs_buffer, compressed_bound);
+
+	ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
+	if (LZ4F_isError(ctxError))
+			pg_log_error("could not create lz4 compression context: %s",
+						 LZ4F_getErrorName(ctxError));
+
+	return &streamer->base;
+#else
+	pg_log_error("this build does not support compression");
+	exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+								  bbstreamer_member *member,
+								  const char *data, int len,
+								  bbstreamer_archive_context context)
+{
+	bbstreamer_lz4_frame   *mystreamer;
+	uint8				   *next_in,
+						   *next_out;
+	size_t					out_bound,
+							compressed_size,
+							avail_out;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+	next_in = (uint8 *) data;
+
+	/* Write header before processing the first input chunk. */
+	if (!mystreamer->header_written)
+	{
+		compressed_size = LZ4F_compressBegin(mystreamer->cctx,
+											 (uint8 *) mystreamer->base.bbs_buffer.data,
+											 mystreamer->base.bbs_buffer.maxlen,
+											 &mystreamer->prefs);
+
+		if (LZ4F_isError(compressed_size))
+			pg_log_error("could not write lz4 header: %s",
+						 LZ4F_getErrorName(compressed_size));
+
+		mystreamer->bytes_written += compressed_size;
+		mystreamer->header_written = true;
+	}
+
+	/*
+	 * Update the offset and capacity of output buffer based on number of bytes
+	 * written to output buffer.
+	 */
+	next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+	avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+	/*
+	 * Find out the compression bound and make sure that output buffer has the
+	 * required capacity for the success of LZ4F_compressUpdate. If needed
+	 * forward the content to next streamer and empty the buffer.
+	 */
+	out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
+	Assert(mystreamer->base.bbs_buffer.maxlen >= out_bound);
+	if (avail_out <= out_bound)
+	{
+			bbstreamer_content(mystreamer->base.bbs_next, member,
+							   mystreamer->base.bbs_buffer.data,
+							   mystreamer->bytes_written,
+							   context);
+
+			avail_out = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->bytes_written = 0;
+			next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+	}
+
+	/*
+	 * This call compresses the data starting at next_in and generates the
+	 * output starting at next_out. It expects the caller to provide the size
+	 * of input buffer and capacity of output buffer by providing parameters
+	 * len and avail_out.
+	 *
+	 * It returns the number of bytes compressed to output buffer.
+	 */
+	compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
+										  next_out, avail_out,
+										  next_in, len, NULL);
+
+	if (LZ4F_isError(compressed_size))
+		pg_log_error("could not compress data: %s",
+					 LZ4F_getErrorName(compressed_size));
+
+	mystreamer->bytes_written += compressed_size;
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
+{
+	bbstreamer_lz4_frame   *mystreamer;
+	uint8				   *next_out;
+	size_t					footer_bound,
+							compressed_size,
+							avail_out;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+	/* Find out the footer bound and update the output buffer. */
+	footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
+	Assert(mystreamer->base.bbs_buffer.maxlen >= footer_bound);
+	if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <=
+		footer_bound)
+	{
+			bbstreamer_content(mystreamer->base.bbs_next, NULL,
+							   mystreamer->base.bbs_buffer.data,
+							   mystreamer->bytes_written,
+							   BBSTREAMER_UNKNOWN);
+
+			avail_out = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->bytes_written = 0;
+			next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+	}
+	else
+	{
+		next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+		avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+	}
+
+	/*
+	 * Finalize the frame and flush whatever data remaining in compression
+	 * context.
+	 */
+	compressed_size = LZ4F_compressEnd(mystreamer->cctx,
+									   next_out, avail_out, NULL);
+
+	if (LZ4F_isError(compressed_size))
+		pg_log_error("could not end lz4 compression: %s",
+					 LZ4F_getErrorName(compressed_size));
+
+	mystreamer->bytes_written += compressed_size;
+
+	bbstreamer_content(mystreamer->base.bbs_next, NULL,
+					   mystreamer->base.bbs_buffer.data,
+					   mystreamer->bytes_written,
+					   BBSTREAMER_UNKNOWN);
+
+	bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_compressor_free(bbstreamer *streamer)
+{
+	bbstreamer_lz4_frame	*mystreamer;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+	bbstreamer_free(streamer->bbs_next);
+	LZ4F_freeCompressionContext(mystreamer->cctx);
+	pfree(streamer->bbs_buffer.data);
+	pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of lz4
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_decompressor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBLZ4
+	bbstreamer_lz4_frame	*streamer;
+	LZ4F_errorCode_t		ctxError;
+
+	Assert(next != NULL);
+
+	streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+	*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+		&bbstreamer_lz4_decompressor_ops;
+
+	streamer->base.bbs_next = next;
+	initStringInfo(&streamer->base.bbs_buffer);
+
+	/* Initialize internal stream state for decompression */
+	ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
+	if (LZ4F_isError(ctxError))
+	{
+		pg_log_error("could not initialize compression library: %s",
+				LZ4F_getErrorName(ctxError));
+		exit(1);
+	}
+
+	return &streamer->base;
+#else
+	pg_log_error("this build does not support compression");
+	exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+									bbstreamer_member *member,
+									const char *data, int len,
+									bbstreamer_archive_context context)
+{
+	bbstreamer_lz4_frame   *mystreamer;
+	uint8				   *next_in,
+						   *next_out;
+	size_t					avail_in,
+							avail_out;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+	next_in = (uint8 *) data;
+	next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+	avail_in = len;
+	avail_out = mystreamer->base.bbs_buffer.maxlen;
+
+	while (avail_in > 0)
+	{
+		size_t	ret,
+				read_size,
+				out_size;
+
+		read_size = avail_in;
+		out_size = avail_out;
+
+		/*
+		 * This call decompresses the data starting at next_in and generates
+		 * the output data starting at next_out. It expects the caller to
+		 * provide size of the input buffer and total capacity of the output
+		 * buffer by providing the read_size and out_size parameters
+		 * respectively.
+		 *
+		 * Per the documentation of LZ4, parameters read_size and out_size
+		 * behaves as dual parameters. On return, the number of bytes consumed
+		 * from the input buffer will be written back to read_size and the
+		 * number of bytes decompressed to output buffer will be written back
+		 * to out_size respectively.
+		 */
+		ret = LZ4F_decompress(mystreamer->dctx,
+							  next_out, &out_size,
+							  next_in, &read_size, NULL);
+
+		if (LZ4F_isError(ret))
+			pg_log_error("could not decompress data: %s",
+						 LZ4F_getErrorName(ret));
+
+		/* Update input buffer based on number of bytes consumed */
+		avail_in -= read_size;
+		next_in += read_size;
+
+		mystreamer->bytes_written += out_size;
+
+		/*
+		 * If output buffer is full then forward the content to next streamer and
+		 * update the output buffer.
+		 */
+		if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+		{
+			bbstreamer_content(mystreamer->base.bbs_next, member,
+							   mystreamer->base.bbs_buffer.data,
+							   mystreamer->base.bbs_buffer.maxlen,
+							   context);
+
+			avail_out = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->bytes_written = 0;
+			next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+		}
+		else
+		{
+			avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+			next_out += mystreamer->bytes_written;
+		}
+	}
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
+{
+	bbstreamer_lz4_frame	*mystreamer;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+	/*
+	 * End of the stream, if there is some pending data in output buffers then
+	 * we must forward it to next streamer.
+	 */
+	bbstreamer_content(mystreamer->base.bbs_next, NULL,
+					   mystreamer->base.bbs_buffer.data,
+					   mystreamer->base.bbs_buffer.maxlen,
+					   BBSTREAMER_UNKNOWN);
+
+	bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
+{
+	bbstreamer_lz4_frame	*mystreamer;
+
+	mystreamer = (bbstreamer_lz4_frame *) streamer;
+	bbstreamer_free(streamer->bbs_next);
+	LZ4F_freeDecompressionContext(mystreamer->dctx);
+	pfree(streamer->bbs_buffer.data);
+	pfree(streamer);
+}
+#endif
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 923659d..00b2563 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1003,6 +1003,11 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
 		*methodres = COMPRESSION_GZIP;
 		*locationres = COMPRESS_LOCATION_SERVER;
 	}
+	else if (pg_strcasecmp(firstpart, "client-lz4") == 0)
+	{
+		*methodres = COMPRESSION_LZ4;
+		*locationres = COMPRESS_LOCATION_CLIENT;
+	}
 	else if (pg_strcasecmp(firstpart, "server-lz4") == 0)
 	{
 		*methodres = COMPRESSION_LZ4;
@@ -1125,7 +1130,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	bbstreamer *manifest_inject_streamer = NULL;
 	bool		inject_manifest;
 	bool		is_tar,
-				is_tar_gz;
+				is_tar_gz,
+				is_tar_lz4;
 	bool		must_parse_archive;
 	int			archive_name_len = strlen(archive_name);
 
@@ -1144,6 +1150,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	is_tar_gz = (archive_name_len > 8 &&
 				 strcmp(archive_name + archive_name_len - 3, ".gz") == 0);
 
+	/* Is this a LZ4 archive? */
+	is_tar_lz4 = (archive_name_len > 8 &&
+				  strcmp(archive_name + archive_name_len - 4, ".lz4") == 0);
+
 	/*
 	 * We have to parse the archive if (1) we're suppose to extract it, or if
 	 * (2) we need to inject backup_manifest or recovery configuration into it.
@@ -1153,7 +1163,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 		(spclocation == NULL && writerecoveryconf));
 
 	/* At present, we only know how to parse tar archives. */
-	if (must_parse_archive && !is_tar && !is_tar_gz)
+	if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4)
 	{
 		pg_log_error("unable to parse archive: %s", archive_name);
 		pg_log_info("only tar archives can be parsed");
@@ -1217,6 +1227,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 												  archive_file,
 												  compresslevel);
 		}
+		else if (compressmethod == COMPRESSION_LZ4)
+		{
+			strlcat(archive_filename, ".lz4", sizeof(archive_filename));
+			streamer = bbstreamer_plain_writer_new(archive_filename,
+												   archive_file);
+			streamer = bbstreamer_lz4_compressor_new(streamer,
+													 compresslevel);
+		}
 		else
 		{
 			Assert(false);		/* not reachable */
@@ -1269,9 +1287,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	 * If the user has requested a server compressed archive along with archive
 	 * extraction at client then we need to decompress it.
 	 */
-	if (format == 'p' && compressmethod == COMPRESSION_GZIP &&
-			compressloc == COMPRESS_LOCATION_SERVER)
-		streamer = bbstreamer_gzip_decompressor_new(streamer);
+	if (format == 'p' && compressloc == COMPRESS_LOCATION_SERVER)
+	{
+		if (compressmethod == COMPRESSION_GZIP)
+			streamer = bbstreamer_gzip_decompressor_new(streamer);
+		else if (compressmethod == COMPRESSION_LZ4)
+			streamer = bbstreamer_lz4_decompressor_new(streamer);
+	}
 
 	/* Return the results. */
 	*manifest_inject_streamer_p = manifest_inject_streamer;
diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl
index 51b77e4..9f9a7cc 100644
--- a/src/bin/pg_verifybackup/t/009_extract.pl
+++ b/src/bin/pg_verifybackup/t/009_extract.pl
@@ -11,7 +11,7 @@ use Config;
 use File::Path qw(rmtree);
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 4;
+use Test::More tests => 6;
 
 my $primary = PostgreSQL::Test::Cluster->new('primary');
 $primary->init(allows_streaming => 1);
@@ -27,6 +27,11 @@ my @test_configuration = (
 		'compression_method' => 'gzip',
 		'backup_flags' => ['--compress', 'server-gzip:5'],
 		'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+	},
+	{
+		'compression_method' => 'lz4',
+		'backup_flags' => ['--compress', 'server-lz4:5'],
+		'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
 	}
 );
 
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
new file mode 100644
index 0000000..34c9b90
--- /dev/null
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# This test case aims to verify that client-side backup compression work
+# properly, and it also aims to verify that pg_verifybackup can verify a base
+# backup that didn't start out in plain format.
+
+use strict;
+use warnings;
+use Config;
+use File::Path qw(rmtree);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+my $backup_path = $primary->backup_dir . '/client-backup';
+my $extract_path = $primary->backup_dir . '/extracted-backup';
+
+my @test_configuration = (
+	{
+		'compression_method' => 'none',
+		'backup_flags' => [],
+		'backup_archive' => 'base.tar',
+		'enabled' => 1
+	},
+	{
+		'compression_method' => 'gzip',
+		'backup_flags' => ['--compress', 'client-gzip:5'],
+		'backup_archive' => 'base.tar.gz',
+		'decompress_program' => $ENV{'GZIP_PROGRAM'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+	},
+	{
+		'compression_method' => 'lz4',
+		'backup_flags' => ['--compress', 'client-lz4:5'],
+		'backup_archive' => 'base.tar.lz4',
+		'decompress_program' => $ENV{'LZ4'},
+		'decompress_flags' => [ '-d' ],
+		'output_file' => 'base.tar',
+		'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
+	}
+);
+
+for my $tc (@test_configuration)
+{
+	my $method = $tc->{'compression_method'};
+
+	SKIP: {
+		skip "$method compression not supported by this build", 3
+			if ! $tc->{'enabled'};
+		skip "no decompressor available for $method", 3
+			if exists $tc->{'decompress_program'} &&
+			!defined $tc->{'decompress_program'};
+
+		# Take a client-side backup.
+		my @backup      = (
+			'pg_basebackup', '-D', $backup_path,
+			'-Xfetch', '--no-sync', '-cfast', '-Ft');
+		push @backup, @{$tc->{'backup_flags'}};
+		$primary->command_ok(\@backup,
+							 "client side backup, compression $method");
+
+
+		# Verify that the we got the files we expected.
+		my $backup_files = join(',',
+			sort grep { $_ ne '.' && $_ ne '..' } slurp_dir($backup_path));
+		my $expected_backup_files = join(',',
+			sort ('backup_manifest', $tc->{'backup_archive'}));
+		is($backup_files,$expected_backup_files,
+			"found expected backup files, compression $method");
+
+		# Decompress.
+		if (exists $tc->{'decompress_program'})
+		{
+			my @decompress = ($tc->{'decompress_program'});
+			push @decompress, @{$tc->{'decompress_flags'}}
+				if $tc->{'decompress_flags'};
+			push @decompress, $backup_path . '/' . $tc->{'backup_archive'};
+			push @decompress, $backup_path . '/' . $tc->{'output_file'}
+				if $tc->{'output_file'};
+			system_or_bail(@decompress);
+		}
+
+		SKIP: {
+			my $tar = $ENV{TAR};
+			# don't check for a working tar here, to accomodate various odd
+			# cases such as AIX. If tar doesn't work the init_from_backup below
+			# will fail.
+			skip "no tar program available", 1
+				if (!defined $tar || $tar eq '');
+
+			# Untar.
+			mkdir($extract_path);
+			system_or_bail($tar, 'xf', $backup_path . '/base.tar',
+				'-C', $extract_path);
+
+			# Verify.
+			$primary->command_ok([ 'pg_verifybackup', '-n',
+				'-m', "$backup_path/backup_manifest", '-e', $extract_path ],
+				"verify backup, compression $method");
+		}
+
+		# Cleanup.
+		rmtree($extract_path);
+		rmtree($backup_path);
+	}
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index a310bcb..bab81bd 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -379,6 +379,7 @@ sub mkvcbuild
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_file.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c');
+	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c');
 	$pgbasebackup->AddLibrary('ws2_32.lib');
 
-- 
1.8.3.1

Reply via email to