On Wed, Mar 23, 2022 at 06:57:04PM -0400, Robert Haas wrote:
> On Wed, Mar 23, 2022 at 5:52 PM Justin Pryzby <pry...@telsasoft.com> wrote:
> > Also because the library may not be compiled with threading.  A few days 
> > ago, I
> > tried to rebase the original "parallel workers" patch over the COMPRESS 
> > DETAIL
> > patch but then couldn't test it, even after trying various versions of the 
> > zstd
> > package and trying to compile it locally.  I'll try again soon...
> 
> Ah. Right, I can update the comment to mention that.

Actually, I suggest to remove those comments:
| "We check for failure here because..."

That should be the rule rather than the exception, so shouldn't require
justifying why one might checks the return value of library and system calls.

In bbsink_zstd_new(), I think you need to check to see if workers were
requested (same as the issue you found with "level").  If someone builds
against a version of zstd which doesn't support some parameter, you'll
currently call SetParameter with that flag anyway, with a default value.
That's not currently breaking anything for me (even though workers=N doesn't
work) but I think it's fragile and could break, maybe when compiled against an
old zstd, or with future options.  SetParameter should only be called when the
user requested to set the parameter.  I handled that for workers in 003, but
didn't touch "level", which is probably fine, but maybe should change for
consistency.

src/backend/replication/basebackup_zstd.c:              elog(ERROR, "could not 
set zstd compression level to %d: %s",
src/bin/pg_basebackup/bbstreamer_gzip.c:                pg_log_error("could not 
set compression level %d: %s",
src/bin/pg_basebackup/bbstreamer_zstd.c:                        
pg_log_error("could not set compression level to: %d: %s",

I'm not sure why these messages sometimes mention the current compression
method and sometimes don't.  I suggest that they shouldn't - errcontext will
have the algorithm, and the user already specified it anyway.  It'd allow the
compiler to merge strings.

Here's a patch for zstd --long mode.  (I don't actually use pg_basebackup, but
I will want to use long mode with pg_dump).  The "strategy" params may also be
interesting, but I haven't played with it.  rsyncable is certainly interesting,
but currently an experimental, nonpublic interface - and a good example of why
to not call SetParameter for params which the user didn't specify: PGDG might
eventually compile postgres against a zstd which supports rsyncable flag.  And
someone might install somewhere which doesn't support rsyncable, but the server
would try to call SetParameter(rsyncable, 0), and the rsyncable ID number
would've changed, so zstd would probably reject it, and basebackup would be
unusable...

$ time src/bin/pg_basebackup/pg_basebackup -h /tmp -Ft -D- --wal-method=none 
--no-manifest -Z zstd:long=1 --checkpoint fast |wc -c
4625935
real    0m1,334s

$ time src/bin/pg_basebackup/pg_basebackup -h /tmp -Ft -D- --wal-method=none 
--no-manifest -Z zstd:long=0 --checkpoint fast |wc -c
8426516
real    0m0,880s
>From e73af18e791f784b3853511f10fe9e573984bcf4 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Thu, 24 Mar 2022 17:21:11 -0400
Subject: [PATCH 1/5] Fix a few goofs in new backup compression code.

When we try to set the zstd compression level either on the client
or on the server, check for errors.

For any algorithm, on the client side, don't try to set the compression
level unless the user specified one. This was visibly broken for
zstd, which managed to set -1 rather than 0 in this case, but tidy
up the code for the other methods, too.

On the client side, if we fail to create a ZSTD_CCtx, exit after
reporting the error. Otherwise we'll dereference a null pointer.
---
 src/backend/replication/basebackup_zstd.c |  8 ++++++--
 src/bin/pg_basebackup/bbstreamer_gzip.c   |  3 ++-
 src/bin/pg_basebackup/bbstreamer_lz4.c    |  3 ++-
 src/bin/pg_basebackup/bbstreamer_zstd.c   | 19 +++++++++++++++++--
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index bb5b668c2ab..5496eaa72b7 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -98,13 +98,17 @@ bbsink_zstd_begin_backup(bbsink *sink)
 {
 	bbsink_zstd *mysink = (bbsink_zstd *) sink;
 	size_t		output_buffer_bound;
+	size_t		ret;
 
 	mysink->cctx = ZSTD_createCCtx();
 	if (!mysink->cctx)
 		elog(ERROR, "could not create zstd compression context");
 
-	ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
-						   mysink->compresslevel);
+	ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
+								 mysink->compresslevel);
+	if (ZSTD_isError(ret))
+		elog(ERROR, "could not set zstd compression level to %d: %s",
+			 mysink->compresslevel, ZSTD_getErrorName(ret));
 
 	/*
 	 * We need our own buffer, because we're going to pass different data to
diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/bbstreamer_gzip.c
index 1979e956399..760619fcd74 100644
--- a/src/bin/pg_basebackup/bbstreamer_gzip.c
+++ b/src/bin/pg_basebackup/bbstreamer_gzip.c
@@ -116,7 +116,8 @@ bbstreamer_gzip_writer_new(char *pathname, FILE *file,
 		}
 	}
 
-	if (gzsetparams(streamer->gzfile, compress->level,
+	if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0 &&
+		gzsetparams(streamer->gzfile, compress->level,
 					Z_DEFAULT_STRATEGY) != Z_OK)
 	{
 		pg_log_error("could not set compression level %d: %s",
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c
index a6ec317e2bd..67f841d96a9 100644
--- a/src/bin/pg_basebackup/bbstreamer_lz4.c
+++ b/src/bin/pg_basebackup/bbstreamer_lz4.c
@@ -89,7 +89,8 @@ bbstreamer_lz4_compressor_new(bbstreamer *next, bc_specification *compress)
 	prefs = &streamer->prefs;
 	memset(prefs, 0, sizeof(LZ4F_preferences_t));
 	prefs->frameInfo.blockSizeID = LZ4F_max256KB;
-	prefs->compressionLevel = compress->level;
+	if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
+		prefs->compressionLevel = compress->level;
 
 	/*
 	 * Find out the compression bound, it specifies the minimum destination
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index caa5edcaf12..7946b6350b6 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -67,6 +67,8 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 {
 #ifdef USE_ZSTD
 	bbstreamer_zstd_frame *streamer;
+	int			compresslevel;
+	size_t		ret;
 
 	Assert(next != NULL);
 
@@ -81,11 +83,24 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 
 	streamer->cctx = ZSTD_createCCtx();
 	if (!streamer->cctx)
+	{
 		pg_log_error("could not create zstd compression context");
+		exit(1);
+	}
 
 	/* Initialize stream compression preferences */
-	ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
-						   compress->level);
+	if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
+		compresslevel = 0;
+	else
+		compresslevel = compress->level;
+	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+								 compresslevel);
+	if (ZSTD_isError(ret))
+	{
+		pg_log_error("could not set zstd compression level to %d: %s",
+					 compresslevel, ZSTD_getErrorName(ret));
+		exit(1);
+	}
 
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
-- 
2.17.1

>From e59d9c1cdcf3f109267c12d4a28525f121c69720 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Wed, 23 Mar 2022 11:00:33 -0400
Subject: [PATCH 2/5] Allow parallel zstd compression when taking a base
 backup.

libzstd allows transparent parallel compression just by setting
an option when creating the compression context, so permit that
for both client and server-side backup compression. To use this,
use something like pg_basebackup --compress WHERE-zstd:workers=N
where WHERE is "client" or "server" and N is an integer.

When compression is performed on the server side, this will spawn
threads inside the PostgreSQL backend. While there is almost no
PostgreSQL server code which is thread-safe, the threads here are used
internally by libzstd and touch only data structures controlled by
libzstd.

Patch by me, based in part on earlier work by Dipesh Pandit
and Jeevan Ladhe.
---
 doc/src/sgml/protocol.sgml                    | 12 +++++--
 doc/src/sgml/ref/pg_basebackup.sgml           |  4 +--
 src/backend/replication/basebackup_zstd.c     | 18 ++++++++++
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 15 +++++++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl  |  5 +++
 src/bin/pg_verifybackup/t/009_extract.pl      | 29 ++++++++++++++--
 src/bin/pg_verifybackup/t/010_client_untar.pl | 33 +++++++++++++++++--
 src/common/backup_compression.c               | 16 +++++++++
 src/include/common/backup_compression.h       |  2 ++
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  3 +-
 10 files changed, 125 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 2fa3cedfe9e..98f0bc3cc34 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
           option.  If the value is an integer, it specifies the compression
           level.  Otherwise, it should be a comma-separated list of items,
           each of the form <literal>keyword</literal> or
-          <literal>keyword=value</literal>. Currently, the only supported
-          keyword is <literal>level</literal>, which sets the compression
-          level.
+          <literal>keyword=value</literal>. Currently, the supported keywords
+          are <literal>level</literal> and <literal>workers</literal>.
         </para>
 
         <para>
+          The <literal>level</literal> keyword sets the compression level.
           For <literal>gzip</literal> the compression level should be an
           integer between 1 and 9, for <literal>lz4</literal> an integer
           between 1 and 12, and for <literal>zstd</literal> an integer
           between 1 and 22.
          </para>
+
+        <para>
+          The <literal>workers</literal> keyword sets the number of threads
+          that should be used for parallel compression. Parallel compression
+          is supported only for <literal>zstd</literal>.
+         </para>
         </listitem>
        </varlistentry>
 
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index d9233beb8e1..82f5f606250 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
         integer, it specifies the compression level.  Otherwise, it should be
         a comma-separated list of items, each of the form
         <literal>keyword</literal> or <literal>keyword=value</literal>.
-        Currently, the only supported keyword is <literal>level</literal>,
-        which sets the compression level.
+        Currently, the supported keywords are <literal>level</literal>
+        and <literal>workers</literal>.
        </para>
        <para>
         If no compression level is specified, the default compression level
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index 5496eaa72b7..d6eb0617d8a 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -28,6 +28,9 @@ typedef struct bbsink_zstd
 	/* Compression level */
 	int			compresslevel;
 
+	/* Number of parallel workers. */
+	int			workers;
+
 	ZSTD_CCtx  *cctx;
 	ZSTD_outBuffer zstd_outBuf;
 } bbsink_zstd;
@@ -83,6 +86,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
 	sink->base.bbs_next = next;
 	sink->compresslevel = compresslevel;
+	sink->workers = compress->workers;
 
 	return &sink->base;
 #endif
@@ -110,6 +114,20 @@ bbsink_zstd_begin_backup(bbsink *sink)
 		elog(ERROR, "could not set zstd compression level to %d: %s",
 			 mysink->compresslevel, ZSTD_getErrorName(ret));
 
+	/*
+	 * We check for failure here because (1) older versions of the library
+	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
+	 * reject an unreasonable values (though in practice it does not seem to do
+	 * so).
+	 */
+	ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+								 mysink->workers);
+	if (ZSTD_isError(ret))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("could not set compression worker count to %d: %s",
+					   mysink->workers, ZSTD_getErrorName(ret)));
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 7946b6350b6..20393da595b 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -102,6 +102,21 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 		exit(1);
 	}
 
+	/*
+	 * We check for failure here because (1) older versions of the library
+	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
+	 * reject unreasonable values (though in practice it does not seem to do
+	 * so).
+	 */
+	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+								 compress->workers);
+	if (ZSTD_isError(ret))
+	{
+		pg_log_error("could not set compression worker count to %d: %s",
+					 compress->workers, ZSTD_getErrorName(ret));
+		exit(1);
+	}
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index 47f3d00ac45..5ba84c22509 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -130,6 +130,11 @@ my @compression_failure_tests = (
 		'invalid compression specification: found empty string where a compression option was expected',
 		'failure on extra, empty compression option'
 	],
+	[
+		'gzip:workers=3',
+		'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
+		'failure on worker count for gzip'
+	],
 );
 for my $cft (@compression_failure_tests)
 {
diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl
index 41a5b370cc5..d6f11b95535 100644
--- a/src/bin/pg_verifybackup/t/009_extract.pl
+++ b/src/bin/pg_verifybackup/t/009_extract.pl
@@ -34,6 +34,12 @@ my @test_configuration = (
 		'compression_method' => 'zstd',
 		'backup_flags' => ['--compress', 'server-zstd:5'],
 		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'parallel zstd',
+		'backup_flags' => ['--compress', 'server-zstd:workers=3'],
+		'enabled' => check_pg_config("#define USE_ZSTD 1"),
+		'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
 	}
 );
 
@@ -55,8 +61,27 @@ for my $tc (@test_configuration)
 		my @verify = ('pg_verifybackup', '-e', $backup_path);
 
 		# A backup with a valid compression method should work.
-		$primary->command_ok(\@backup,
-							 "backup done, compression method \"$method\"");
+		my $backup_stdout = '';
+		my $backup_stderr = '';
+		my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+											  '2>', \$backup_stderr);
+		if ($backup_stdout ne '')
+		{
+			print "# standard output was:\n$backup_stdout";
+		}
+		if ($backup_stderr ne '')
+		{
+			print "# standard error was:\n$backup_stderr";
+		}
+		if (! $backup_result && $tc->{'possibly_unsupported'} &&
+			$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+		{
+			skip "compression with $method not supported by this build", 2;
+		}
+		else
+		{
+			ok($backup_result, "backup done, compression $method");
+		}
 
 		# Make sure that it verifies OK.
 		$primary->command_ok(\@verify,
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index 488a6d1edee..c1cd12cb065 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -49,6 +49,15 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags' => [ '-d' ],
 		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'parallel zstd',
+		'backup_flags' => ['--compress', 'client-zstd:workers=3'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define USE_ZSTD 1"),
+		'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
 	}
 );
 
@@ -69,9 +78,27 @@ for my $tc (@test_configuration)
 			'pg_basebackup', '-D', $backup_path,
 			'-Xfetch', '--no-sync', '-cfast', '-Ft');
 		push @backup, @{$tc->{'backup_flags'}};
-		$primary->command_ok(\@backup,
-							 "client side backup, compression $method");
-
+		my $backup_stdout = '';
+		my $backup_stderr = '';
+		my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+											  '2>', \$backup_stderr);
+		if ($backup_stdout ne '')
+		{
+			print "# standard output was:\n$backup_stdout";
+		}
+		if ($backup_stderr ne '')
+		{
+			print "# standard error was:\n$backup_stderr";
+		}
+		if (! $backup_result && $tc->{'possibly_unsupported'} &&
+			$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+		{
+			skip "compression with $method not supported by this build", 3;
+		}
+		else
+		{
+			ok($backup_result, "client side backup, compression $method");
+		}
 
 		# Verify that the we got the files we expected.
 		my $backup_files = join(',',
diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c
index 0650f975c44..969e08cca20 100644
--- a/src/common/backup_compression.c
+++ b/src/common/backup_compression.c
@@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
 			result->level = expect_integer_value(keyword, value, result);
 			result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
 		}
+		else if (strcmp(keyword, "workers") == 0)
+		{
+			result->workers = expect_integer_value(keyword, value, result);
+			result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
+		}
 		else
 			result->parse_error =
 				psprintf(_("unknown compression option \"%s\""), keyword);
@@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
 							min_level, max_level);
 	}
 
+	/*
+	 * Of the compression algorithms that we currently support, only zstd
+	 * allows parallel workers.
+	 */
+	if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
+		(spec->algorithm != BACKUP_COMPRESSION_ZSTD))
+	{
+		return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
+						get_bc_algorithm_name(spec->algorithm));
+	}
+
 	return NULL;
 }
diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h
index 0565cbc657d..6a0ecaa99c9 100644
--- a/src/include/common/backup_compression.h
+++ b/src/include/common/backup_compression.h
@@ -23,12 +23,14 @@ typedef enum bc_algorithm
 } bc_algorithm;
 
 #define	BACKUP_COMPRESSION_OPTION_LEVEL			(1 << 0)
+#define BACKUP_COMPRESSION_OPTION_WORKERS		(1 << 1)
 
 typedef struct bc_specification
 {
 	bc_algorithm algorithm;
 	unsigned	options;		/* OR of BACKUP_COMPRESSION_OPTION constants */
 	int			level;
+	int			workers;
 	char	   *parse_error;	/* NULL if parsing was OK, else message */
 } bc_specification;
 
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index bee6aacf47c..b6e33516110 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2502,8 +2502,7 @@ sub run_log
 
 	local %ENV = $self->_get_env();
 
-	PostgreSQL::Test::Utils::run_log(@_);
-	return;
+	return PostgreSQL::Test::Utils::run_log(@_);
 }
 
 =pod
-- 
2.17.1

>From b977f6ba8e491145165b9ab9f2f1bd407b4e2d26 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 27 Mar 2022 12:28:32 -0500
Subject: [PATCH 3/5] f!workers

---
 src/backend/replication/basebackup_zstd.c | 31 +++++++++++++----------
 src/bin/pg_basebackup/bbstreamer_zstd.c   | 21 +++++++--------
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index d6eb0617d8a..a112d6e181e 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -71,6 +71,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 #else
 	bbsink_zstd *sink;
 	int		compresslevel;
+	int		workers;
 
 	Assert(next != NULL);
 
@@ -82,11 +83,16 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 		Assert(compresslevel >= 1 && compresslevel <= 22);
 	}
 
+	if (compress->options & BACKUP_COMPRESSION_OPTION_WORKERS)
+		workers = compress->workers;
+	else
+		workers = 0;
+
 	sink = palloc0(sizeof(bbsink_zstd));
 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
 	sink->base.bbs_next = next;
 	sink->compresslevel = compresslevel;
-	sink->workers = compress->workers;
+	sink->workers = workers;
 
 	return &sink->base;
 #endif
@@ -114,19 +120,16 @@ bbsink_zstd_begin_backup(bbsink *sink)
 		elog(ERROR, "could not set zstd compression level to %d: %s",
 			 mysink->compresslevel, ZSTD_getErrorName(ret));
 
-	/*
-	 * We check for failure here because (1) older versions of the library
-	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
-	 * reject an unreasonable values (though in practice it does not seem to do
-	 * so).
-	 */
-	ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
-								 mysink->workers);
-	if (ZSTD_isError(ret))
-		ereport(ERROR,
-				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				errmsg("could not set compression worker count to %d: %s",
-					   mysink->workers, ZSTD_getErrorName(ret)));
+	if (mysink->workers > 0)
+	{
+		ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+									 mysink->workers);
+		if (ZSTD_isError(ret))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("could not set compression worker count to %d: %s",
+						   mysink->workers, ZSTD_getErrorName(ret)));
+	}
 
 	/*
 	 * We need our own buffer, because we're going to pass different data to
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 20393da595b..678af73e6f0 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -102,19 +102,16 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 		exit(1);
 	}
 
-	/*
-	 * We check for failure here because (1) older versions of the library
-	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
-	 * reject unreasonable values (though in practice it does not seem to do
-	 * so).
-	 */
-	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
-								 compress->workers);
-	if (ZSTD_isError(ret))
+	if (compress->workers > 0)
 	{
-		pg_log_error("could not set compression worker count to %d: %s",
-					 compress->workers, ZSTD_getErrorName(ret));
-		exit(1);
+		ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+									 compress->workers);
+		if (ZSTD_isError(ret))
+		{
+			pg_log_error("could not set compression worker count to %d: %s",
+						 compress->workers, ZSTD_getErrorName(ret));
+			exit(1);
+		}
 	}
 
 	/* Initialize the ZSTD output buffer. */
-- 
2.17.1

>From 74124b8d69e5fbe632fd51bff0effec81ebdc806 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 27 Mar 2022 11:55:01 -0500
Subject: [PATCH 4/5] basebackup: support -Z zstd:long

---
 doc/src/sgml/protocol.sgml                | 10 +++++++++-
 doc/src/sgml/ref/pg_basebackup.sgml       |  4 ++--
 src/backend/replication/basebackup_zstd.c | 21 +++++++++++++++++++++
 src/bin/pg_basebackup/bbstreamer_zstd.c   | 13 +++++++++++++
 src/common/backup_compression.c           |  5 +++++
 src/include/common/backup_compression.h   |  2 ++
 6 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 98f0bc3cc34..80f1a1f9a04 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2740,7 +2740,8 @@ The commands accepted in replication mode are:
           level.  Otherwise, it should be a comma-separated list of items,
           each of the form <literal>keyword</literal> or
           <literal>keyword=value</literal>. Currently, the supported keywords
-          are <literal>level</literal> and <literal>workers</literal>.
+          are <literal>level</literal>, <literal>long</literal>, and
+          <literal>workers</literal>.
         </para>
 
         <para>
@@ -2751,6 +2752,13 @@ The commands accepted in replication mode are:
           between 1 and 22.
          </para>
 
+        <para>
+          The <literal>long</literal> keyword enables long-distance matching
+          mode, for improved compression ratio, at the expense of higher memory
+          use.  Long-distance mode is supported only for
+          <literal>zstd</literal>.
+         </para>
+
         <para>
           The <literal>workers</literal> keyword sets the number of threads
           that should be used for parallel compression. Parallel compression
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 82f5f606250..014c454bfab 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
         integer, it specifies the compression level.  Otherwise, it should be
         a comma-separated list of items, each of the form
         <literal>keyword</literal> or <literal>keyword=value</literal>.
-        Currently, the supported keywords are <literal>level</literal>
-        and <literal>workers</literal>.
+        Currently, the supported keywords are <literal>level</literal>,
+        <literal>long</literal>, and <literal>workers</literal>.
        </para>
        <para>
         If no compression level is specified, the default compression level
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index a112d6e181e..b900604f59f 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -31,6 +31,9 @@ typedef struct bbsink_zstd
 	/* Number of parallel workers. */
 	int			workers;
 
+	/* Flags */
+	bool		zstd_long;
+
 	ZSTD_CCtx  *cctx;
 	ZSTD_outBuffer zstd_outBuf;
 } bbsink_zstd;
@@ -72,6 +75,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 	bbsink_zstd *sink;
 	int		compresslevel;
 	int		workers;
+	bool	zstd_long;
 
 	Assert(next != NULL);
 
@@ -88,11 +92,15 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 	else
 		workers = 0;
 
+	zstd_long = (compress->options & BACKUP_COMPRESSION_OPTION_ZSTD_LONG) ?
+				compress->zstd_long : false;
+
 	sink = palloc0(sizeof(bbsink_zstd));
 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
 	sink->base.bbs_next = next;
 	sink->compresslevel = compresslevel;
 	sink->workers = workers;
+	sink->zstd_long = zstd_long;
 
 	return &sink->base;
 #endif
@@ -131,6 +139,19 @@ bbsink_zstd_begin_backup(bbsink *sink)
 						   mysink->workers, ZSTD_getErrorName(ret)));
 	}
 
+	if (mysink->zstd_long)
+	{
+		ret = ZSTD_CCtx_setParameter(mysink->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 mysink->zstd_long);
+		fprintf(stderr, "setting LDM %d\n", ret);
+		if (ZSTD_isError(ret))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("could not set compression flag for %s: %s",
+						   "long", ZSTD_getErrorName(ret)));
+	}
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 678af73e6f0..3c7396a1373 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -114,6 +114,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 		}
 	}
 
+	if (compress->zstd_long)
+	{
+		ret = ZSTD_CCtx_setParameter(streamer->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 compress->zstd_long);
+		if (ZSTD_isError(ret))
+		{
+			pg_log_error("could not set compression flag for %s: %s",
+						 "long", ZSTD_getErrorName(ret));
+			exit(1);
+		}
+	}
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c
index 969e08cca20..f43a5608e65 100644
--- a/src/common/backup_compression.c
+++ b/src/common/backup_compression.c
@@ -182,6 +182,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
 			result->workers = expect_integer_value(keyword, value, result);
 			result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
 		}
+		else if (strcmp(keyword, "long") == 0)
+		{
+			result->zstd_long = expect_integer_value(keyword, value, result); // XXX: expect_bool?
+			result->options |= BACKUP_COMPRESSION_OPTION_ZSTD_LONG;
+		}
 		else
 			result->parse_error =
 				psprintf(_("unknown compression option \"%s\""), keyword);
diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h
index 6a0ecaa99c9..a378631a8da 100644
--- a/src/include/common/backup_compression.h
+++ b/src/include/common/backup_compression.h
@@ -24,6 +24,7 @@ typedef enum bc_algorithm
 
 #define	BACKUP_COMPRESSION_OPTION_LEVEL			(1 << 0)
 #define BACKUP_COMPRESSION_OPTION_WORKERS		(1 << 1)
+#define BACKUP_COMPRESSION_OPTION_ZSTD_LONG		(1 << 2)
 
 typedef struct bc_specification
 {
@@ -31,6 +32,7 @@ typedef struct bc_specification
 	unsigned	options;		/* OR of BACKUP_COMPRESSION_OPTION constants */
 	int			level;
 	int			workers;
+	int			zstd_long;
 	char	   *parse_error;	/* NULL if parsing was OK, else message */
 } bc_specification;
 
-- 
2.17.1

>From 28c7236534634498265c3e4d6544c836052f009f Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Thu, 10 Mar 2022 20:16:19 -0600
Subject: [PATCH 5/5] pg_basebackup: support Zstd negative compression levels

"higher than maximum" is bogus

TODO: each compression methods should enforce its own levels
---
 src/backend/replication/basebackup_zstd.c |  2 +-
 src/bin/pg_basebackup/bbstreamer_zstd.c   | 16 +++++++---------
 src/common/backup_compression.c           |  6 +++++-
 3 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index b900604f59f..e18535bcc13 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -84,7 +84,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 	else
 	{
 		compresslevel = compress->level;
-		Assert(compresslevel >= 1 && compresslevel <= 22);
+		Assert(compresslevel >= -7 && compresslevel <= 22 && compresslevel != 0);
 	}
 
 	if (compress->options & BACKUP_COMPRESSION_OPTION_WORKERS)
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 3c7396a1373..31fbf2d0bc3 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -89,16 +89,14 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 	}
 
 	/* Initialize stream compression preferences */
-	if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
-		compresslevel = 0;
-	else
-		compresslevel = compress->level;
-	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
-								 compresslevel);
-	if (ZSTD_isError(ret))
+
+	if (compress->options & BACKUP_COMPRESSION_OPTION_LEVEL)
 	{
-		pg_log_error("could not set zstd compression level to %d: %s",
-					 compresslevel, ZSTD_getErrorName(ret));
+		ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+							   compress->level);
+		if (ZSTD_isError(ret))
+			pg_log_error("could not set compression level to: %d: %s",
+					compress->level, ZSTD_getErrorName(ret));
 		exit(1);
 	}
 
diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c
index f43a5608e65..b568eccd65f 100644
--- a/src/common/backup_compression.c
+++ b/src/common/backup_compression.c
@@ -265,13 +265,17 @@ validate_bc_specification(bc_specification *spec)
 		else if (spec->algorithm == BACKUP_COMPRESSION_LZ4)
 			max_level = 12;
 		else if (spec->algorithm == BACKUP_COMPRESSION_ZSTD)
+		{
 			max_level = 22;
+			/* The minimum level depends on the version.. */
+			min_level = -7;
+		}
 		else
 			return psprintf(_("compression algorithm \"%s\" does not accept a compression level"),
 							get_bc_algorithm_name(spec->algorithm));
 
 		if (spec->level < min_level || spec->level > max_level)
-			return psprintf(_("compression algorithm \"%s\" expects a compression level between %d and %d"),
+			return psprintf(_("compression algorithm \"%s\" expects a nonzero compression level between %d and %d"),
 							get_bc_algorithm_name(spec->algorithm),
 							min_level, max_level);
 	}
-- 
2.17.1

Reply via email to