On Wed, Mar 23, 2022 at 06:57:04PM -0400, Robert Haas wrote: > On Wed, Mar 23, 2022 at 5:52 PM Justin Pryzby <pry...@telsasoft.com> wrote: > > Also because the library may not be compiled with threading. A few days > > ago, I > > tried to rebase the original "parallel workers" patch over the COMPRESS > > DETAIL > > patch but then couldn't test it, even after trying various versions of the > > zstd > > package and trying to compile it locally. I'll try again soon... > > Ah. Right, I can update the comment to mention that.
Actually, I suggest to remove those comments: | "We check for failure here because..." That should be the rule rather than the exception, so shouldn't require justifying why one might checks the return value of library and system calls. In bbsink_zstd_new(), I think you need to check to see if workers were requested (same as the issue you found with "level"). If someone builds against a version of zstd which doesn't support some parameter, you'll currently call SetParameter with that flag anyway, with a default value. That's not currently breaking anything for me (even though workers=N doesn't work) but I think it's fragile and could break, maybe when compiled against an old zstd, or with future options. SetParameter should only be called when the user requested to set the parameter. I handled that for workers in 003, but didn't touch "level", which is probably fine, but maybe should change for consistency. src/backend/replication/basebackup_zstd.c: elog(ERROR, "could not set zstd compression level to %d: %s", src/bin/pg_basebackup/bbstreamer_gzip.c: pg_log_error("could not set compression level %d: %s", src/bin/pg_basebackup/bbstreamer_zstd.c: pg_log_error("could not set compression level to: %d: %s", I'm not sure why these messages sometimes mention the current compression method and sometimes don't. I suggest that they shouldn't - errcontext will have the algorithm, and the user already specified it anyway. It'd allow the compiler to merge strings. Here's a patch for zstd --long mode. (I don't actually use pg_basebackup, but I will want to use long mode with pg_dump). The "strategy" params may also be interesting, but I haven't played with it. rsyncable is certainly interesting, but currently an experimental, nonpublic interface - and a good example of why to not call SetParameter for params which the user didn't specify: PGDG might eventually compile postgres against a zstd which supports rsyncable flag. And someone might install somewhere which doesn't support rsyncable, but the server would try to call SetParameter(rsyncable, 0), and the rsyncable ID number would've changed, so zstd would probably reject it, and basebackup would be unusable... $ time src/bin/pg_basebackup/pg_basebackup -h /tmp -Ft -D- --wal-method=none --no-manifest -Z zstd:long=1 --checkpoint fast |wc -c 4625935 real 0m1,334s $ time src/bin/pg_basebackup/pg_basebackup -h /tmp -Ft -D- --wal-method=none --no-manifest -Z zstd:long=0 --checkpoint fast |wc -c 8426516 real 0m0,880s
>From e73af18e791f784b3853511f10fe9e573984bcf4 Mon Sep 17 00:00:00 2001 From: Robert Haas <rh...@postgresql.org> Date: Thu, 24 Mar 2022 17:21:11 -0400 Subject: [PATCH 1/5] Fix a few goofs in new backup compression code. When we try to set the zstd compression level either on the client or on the server, check for errors. For any algorithm, on the client side, don't try to set the compression level unless the user specified one. This was visibly broken for zstd, which managed to set -1 rather than 0 in this case, but tidy up the code for the other methods, too. On the client side, if we fail to create a ZSTD_CCtx, exit after reporting the error. Otherwise we'll dereference a null pointer. --- src/backend/replication/basebackup_zstd.c | 8 ++++++-- src/bin/pg_basebackup/bbstreamer_gzip.c | 3 ++- src/bin/pg_basebackup/bbstreamer_lz4.c | 3 ++- src/bin/pg_basebackup/bbstreamer_zstd.c | 19 +++++++++++++++++-- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index bb5b668c2ab..5496eaa72b7 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -98,13 +98,17 @@ bbsink_zstd_begin_backup(bbsink *sink) { bbsink_zstd *mysink = (bbsink_zstd *) sink; size_t output_buffer_bound; + size_t ret; mysink->cctx = ZSTD_createCCtx(); if (!mysink->cctx) elog(ERROR, "could not create zstd compression context"); - ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, - mysink->compresslevel); + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, + mysink->compresslevel); + if (ZSTD_isError(ret)) + elog(ERROR, "could not set zstd compression level to %d: %s", + mysink->compresslevel, ZSTD_getErrorName(ret)); /* * We need our own buffer, because we're going to pass different data to diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/bbstreamer_gzip.c index 1979e956399..760619fcd74 100644 --- a/src/bin/pg_basebackup/bbstreamer_gzip.c +++ b/src/bin/pg_basebackup/bbstreamer_gzip.c @@ -116,7 +116,8 @@ bbstreamer_gzip_writer_new(char *pathname, FILE *file, } } - if (gzsetparams(streamer->gzfile, compress->level, + if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0 && + gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) { pg_log_error("could not set compression level %d: %s", diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c index a6ec317e2bd..67f841d96a9 100644 --- a/src/bin/pg_basebackup/bbstreamer_lz4.c +++ b/src/bin/pg_basebackup/bbstreamer_lz4.c @@ -89,7 +89,8 @@ bbstreamer_lz4_compressor_new(bbstreamer *next, bc_specification *compress) prefs = &streamer->prefs; memset(prefs, 0, sizeof(LZ4F_preferences_t)); prefs->frameInfo.blockSizeID = LZ4F_max256KB; - prefs->compressionLevel = compress->level; + if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0) + prefs->compressionLevel = compress->level; /* * Find out the compression bound, it specifies the minimum destination diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index caa5edcaf12..7946b6350b6 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -67,6 +67,8 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) { #ifdef USE_ZSTD bbstreamer_zstd_frame *streamer; + int compresslevel; + size_t ret; Assert(next != NULL); @@ -81,11 +83,24 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) streamer->cctx = ZSTD_createCCtx(); if (!streamer->cctx) + { pg_log_error("could not create zstd compression context"); + exit(1); + } /* Initialize stream compression preferences */ - ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, - compress->level); + if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0) + compresslevel = 0; + else + compresslevel = compress->level; + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compresslevel); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set zstd compression level to %d: %s", + compresslevel, ZSTD_getErrorName(ret)); + exit(1); + } /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; -- 2.17.1
>From e59d9c1cdcf3f109267c12d4a28525f121c69720 Mon Sep 17 00:00:00 2001 From: Robert Haas <rh...@postgresql.org> Date: Wed, 23 Mar 2022 11:00:33 -0400 Subject: [PATCH 2/5] Allow parallel zstd compression when taking a base backup. libzstd allows transparent parallel compression just by setting an option when creating the compression context, so permit that for both client and server-side backup compression. To use this, use something like pg_basebackup --compress WHERE-zstd:workers=N where WHERE is "client" or "server" and N is an integer. When compression is performed on the server side, this will spawn threads inside the PostgreSQL backend. While there is almost no PostgreSQL server code which is thread-safe, the threads here are used internally by libzstd and touch only data structures controlled by libzstd. Patch by me, based in part on earlier work by Dipesh Pandit and Jeevan Ladhe. --- doc/src/sgml/protocol.sgml | 12 +++++-- doc/src/sgml/ref/pg_basebackup.sgml | 4 +-- src/backend/replication/basebackup_zstd.c | 18 ++++++++++ src/bin/pg_basebackup/bbstreamer_zstd.c | 15 +++++++++ src/bin/pg_basebackup/t/010_pg_basebackup.pl | 5 +++ src/bin/pg_verifybackup/t/009_extract.pl | 29 ++++++++++++++-- src/bin/pg_verifybackup/t/010_client_untar.pl | 33 +++++++++++++++++-- src/common/backup_compression.c | 16 +++++++++ src/include/common/backup_compression.h | 2 ++ src/test/perl/PostgreSQL/Test/Cluster.pm | 3 +- 10 files changed, 125 insertions(+), 12 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 2fa3cedfe9e..98f0bc3cc34 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2739,17 +2739,23 @@ The commands accepted in replication mode are: option. If the value is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form <literal>keyword</literal> or - <literal>keyword=value</literal>. Currently, the only supported - keyword is <literal>level</literal>, which sets the compression - level. + <literal>keyword=value</literal>. Currently, the supported keywords + are <literal>level</literal> and <literal>workers</literal>. </para> <para> + The <literal>level</literal> keyword sets the compression level. For <literal>gzip</literal> the compression level should be an integer between 1 and 9, for <literal>lz4</literal> an integer between 1 and 12, and for <literal>zstd</literal> an integer between 1 and 22. </para> + + <para> + The <literal>workers</literal> keyword sets the number of threads + that should be used for parallel compression. Parallel compression + is supported only for <literal>zstd</literal>. + </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index d9233beb8e1..82f5f606250 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -424,8 +424,8 @@ PostgreSQL documentation integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form <literal>keyword</literal> or <literal>keyword=value</literal>. - Currently, the only supported keyword is <literal>level</literal>, - which sets the compression level. + Currently, the supported keywords are <literal>level</literal> + and <literal>workers</literal>. </para> <para> If no compression level is specified, the default compression level diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index 5496eaa72b7..d6eb0617d8a 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -28,6 +28,9 @@ typedef struct bbsink_zstd /* Compression level */ int compresslevel; + /* Number of parallel workers. */ + int workers; + ZSTD_CCtx *cctx; ZSTD_outBuffer zstd_outBuf; } bbsink_zstd; @@ -83,6 +86,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; sink->compresslevel = compresslevel; + sink->workers = compress->workers; return &sink->base; #endif @@ -110,6 +114,20 @@ bbsink_zstd_begin_backup(bbsink *sink) elog(ERROR, "could not set zstd compression level to %d: %s", mysink->compresslevel, ZSTD_getErrorName(ret)); + /* + * We check for failure here because (1) older versions of the library + * do not support ZSTD_c_nbWorkers and (2) the library might want to + * reject an unreasonable values (though in practice it does not seem to do + * so). + */ + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, + mysink->workers); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression worker count to %d: %s", + mysink->workers, ZSTD_getErrorName(ret))); + /* * We need our own buffer, because we're going to pass different data to * the next sink than what gets passed to us. diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 7946b6350b6..20393da595b 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -102,6 +102,21 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) exit(1); } + /* + * We check for failure here because (1) older versions of the library + * do not support ZSTD_c_nbWorkers and (2) the library might want to + * reject unreasonable values (though in practice it does not seem to do + * so). + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + exit(1); + } + /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index 47f3d00ac45..5ba84c22509 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -130,6 +130,11 @@ my @compression_failure_tests = ( 'invalid compression specification: found empty string where a compression option was expected', 'failure on extra, empty compression option' ], + [ + 'gzip:workers=3', + 'invalid compression specification: compression algorithm "gzip" does not accept a worker count', + 'failure on worker count for gzip' + ], ); for my $cft (@compression_failure_tests) { diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl index 41a5b370cc5..d6f11b95535 100644 --- a/src/bin/pg_verifybackup/t/009_extract.pl +++ b/src/bin/pg_verifybackup/t/009_extract.pl @@ -34,6 +34,12 @@ my @test_configuration = ( 'compression_method' => 'zstd', 'backup_flags' => ['--compress', 'server-zstd:5'], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'server-zstd:workers=3'], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -55,8 +61,27 @@ for my $tc (@test_configuration) my @verify = ('pg_verifybackup', '-e', $backup_path); # A backup with a valid compression method should work. - $primary->command_ok(\@backup, - "backup done, compression method \"$method\""); + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 2; + } + else + { + ok($backup_result, "backup done, compression $method"); + } # Make sure that it verifies OK. $primary->command_ok(\@verify, diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index 488a6d1edee..c1cd12cb065 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -49,6 +49,15 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => [ '-d' ], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'parallel zstd', + 'backup_flags' => ['--compress', 'client-zstd:workers=3'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define USE_ZSTD 1"), + 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/ } ); @@ -69,9 +78,27 @@ for my $tc (@test_configuration) 'pg_basebackup', '-D', $backup_path, '-Xfetch', '--no-sync', '-cfast', '-Ft'); push @backup, @{$tc->{'backup_flags'}}; - $primary->command_ok(\@backup, - "client side backup, compression $method"); - + my $backup_stdout = ''; + my $backup_stderr = ''; + my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout, + '2>', \$backup_stderr); + if ($backup_stdout ne '') + { + print "# standard output was:\n$backup_stdout"; + } + if ($backup_stderr ne '') + { + print "# standard error was:\n$backup_stderr"; + } + if (! $backup_result && $tc->{'possibly_unsupported'} && + $backup_stderr =~ /$tc->{'possibly_unsupported'}/) + { + skip "compression with $method not supported by this build", 3; + } + else + { + ok($backup_result, "client side backup, compression $method"); + } # Verify that the we got the files we expected. my $backup_files = join(',', diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c index 0650f975c44..969e08cca20 100644 --- a/src/common/backup_compression.c +++ b/src/common/backup_compression.c @@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification, result->level = expect_integer_value(keyword, value, result); result->options |= BACKUP_COMPRESSION_OPTION_LEVEL; } + else if (strcmp(keyword, "workers") == 0) + { + result->workers = expect_integer_value(keyword, value, result); + result->options |= BACKUP_COMPRESSION_OPTION_WORKERS; + } else result->parse_error = psprintf(_("unknown compression option \"%s\""), keyword); @@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec) min_level, max_level); } + /* + * Of the compression algorithms that we currently support, only zstd + * allows parallel workers. + */ + if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 && + (spec->algorithm != BACKUP_COMPRESSION_ZSTD)) + { + return psprintf(_("compression algorithm \"%s\" does not accept a worker count"), + get_bc_algorithm_name(spec->algorithm)); + } + return NULL; } diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h index 0565cbc657d..6a0ecaa99c9 100644 --- a/src/include/common/backup_compression.h +++ b/src/include/common/backup_compression.h @@ -23,12 +23,14 @@ typedef enum bc_algorithm } bc_algorithm; #define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0) +#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1) typedef struct bc_specification { bc_algorithm algorithm; unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */ int level; + int workers; char *parse_error; /* NULL if parsing was OK, else message */ } bc_specification; diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index bee6aacf47c..b6e33516110 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -2502,8 +2502,7 @@ sub run_log local %ENV = $self->_get_env(); - PostgreSQL::Test::Utils::run_log(@_); - return; + return PostgreSQL::Test::Utils::run_log(@_); } =pod -- 2.17.1
>From b977f6ba8e491145165b9ab9f2f1bd407b4e2d26 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sun, 27 Mar 2022 12:28:32 -0500 Subject: [PATCH 3/5] f!workers --- src/backend/replication/basebackup_zstd.c | 31 +++++++++++++---------- src/bin/pg_basebackup/bbstreamer_zstd.c | 21 +++++++-------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index d6eb0617d8a..a112d6e181e 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -71,6 +71,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) #else bbsink_zstd *sink; int compresslevel; + int workers; Assert(next != NULL); @@ -82,11 +83,16 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) Assert(compresslevel >= 1 && compresslevel <= 22); } + if (compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) + workers = compress->workers; + else + workers = 0; + sink = palloc0(sizeof(bbsink_zstd)); *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; sink->compresslevel = compresslevel; - sink->workers = compress->workers; + sink->workers = workers; return &sink->base; #endif @@ -114,19 +120,16 @@ bbsink_zstd_begin_backup(bbsink *sink) elog(ERROR, "could not set zstd compression level to %d: %s", mysink->compresslevel, ZSTD_getErrorName(ret)); - /* - * We check for failure here because (1) older versions of the library - * do not support ZSTD_c_nbWorkers and (2) the library might want to - * reject an unreasonable values (though in practice it does not seem to do - * so). - */ - ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, - mysink->workers); - if (ZSTD_isError(ret)) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not set compression worker count to %d: %s", - mysink->workers, ZSTD_getErrorName(ret))); + if (mysink->workers > 0) + { + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, + mysink->workers); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression worker count to %d: %s", + mysink->workers, ZSTD_getErrorName(ret))); + } /* * We need our own buffer, because we're going to pass different data to diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 20393da595b..678af73e6f0 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -102,19 +102,16 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) exit(1); } - /* - * We check for failure here because (1) older versions of the library - * do not support ZSTD_c_nbWorkers and (2) the library might want to - * reject unreasonable values (though in practice it does not seem to do - * so). - */ - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, - compress->workers); - if (ZSTD_isError(ret)) + if (compress->workers > 0) { - pg_log_error("could not set compression worker count to %d: %s", - compress->workers, ZSTD_getErrorName(ret)); - exit(1); + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + exit(1); + } } /* Initialize the ZSTD output buffer. */ -- 2.17.1
>From 74124b8d69e5fbe632fd51bff0effec81ebdc806 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sun, 27 Mar 2022 11:55:01 -0500 Subject: [PATCH 4/5] basebackup: support -Z zstd:long --- doc/src/sgml/protocol.sgml | 10 +++++++++- doc/src/sgml/ref/pg_basebackup.sgml | 4 ++-- src/backend/replication/basebackup_zstd.c | 21 +++++++++++++++++++++ src/bin/pg_basebackup/bbstreamer_zstd.c | 13 +++++++++++++ src/common/backup_compression.c | 5 +++++ src/include/common/backup_compression.h | 2 ++ 6 files changed, 52 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 98f0bc3cc34..80f1a1f9a04 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2740,7 +2740,8 @@ The commands accepted in replication mode are: level. Otherwise, it should be a comma-separated list of items, each of the form <literal>keyword</literal> or <literal>keyword=value</literal>. Currently, the supported keywords - are <literal>level</literal> and <literal>workers</literal>. + are <literal>level</literal>, <literal>long</literal>, and + <literal>workers</literal>. </para> <para> @@ -2751,6 +2752,13 @@ The commands accepted in replication mode are: between 1 and 22. </para> + <para> + The <literal>long</literal> keyword enables long-distance matching + mode, for improved compression ratio, at the expense of higher memory + use. Long-distance mode is supported only for + <literal>zstd</literal>. + </para> + <para> The <literal>workers</literal> keyword sets the number of threads that should be used for parallel compression. Parallel compression diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 82f5f606250..014c454bfab 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -424,8 +424,8 @@ PostgreSQL documentation integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the form <literal>keyword</literal> or <literal>keyword=value</literal>. - Currently, the supported keywords are <literal>level</literal> - and <literal>workers</literal>. + Currently, the supported keywords are <literal>level</literal>, + <literal>long</literal>, and <literal>workers</literal>. </para> <para> If no compression level is specified, the default compression level diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index a112d6e181e..b900604f59f 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -31,6 +31,9 @@ typedef struct bbsink_zstd /* Number of parallel workers. */ int workers; + /* Flags */ + bool zstd_long; + ZSTD_CCtx *cctx; ZSTD_outBuffer zstd_outBuf; } bbsink_zstd; @@ -72,6 +75,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) bbsink_zstd *sink; int compresslevel; int workers; + bool zstd_long; Assert(next != NULL); @@ -88,11 +92,15 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) else workers = 0; + zstd_long = (compress->options & BACKUP_COMPRESSION_OPTION_ZSTD_LONG) ? + compress->zstd_long : false; + sink = palloc0(sizeof(bbsink_zstd)); *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; sink->compresslevel = compresslevel; sink->workers = workers; + sink->zstd_long = zstd_long; return &sink->base; #endif @@ -131,6 +139,19 @@ bbsink_zstd_begin_backup(bbsink *sink) mysink->workers, ZSTD_getErrorName(ret))); } + if (mysink->zstd_long) + { + ret = ZSTD_CCtx_setParameter(mysink->cctx, + ZSTD_c_enableLongDistanceMatching, + mysink->zstd_long); + fprintf(stderr, "setting LDM %d\n", ret); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression flag for %s: %s", + "long", ZSTD_getErrorName(ret))); + } + /* * We need our own buffer, because we're going to pass different data to * the next sink than what gets passed to us. diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 678af73e6f0..3c7396a1373 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -114,6 +114,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) } } + if (compress->zstd_long) + { + ret = ZSTD_CCtx_setParameter(streamer->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->zstd_long); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression flag for %s: %s", + "long", ZSTD_getErrorName(ret)); + exit(1); + } + } + /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c index 969e08cca20..f43a5608e65 100644 --- a/src/common/backup_compression.c +++ b/src/common/backup_compression.c @@ -182,6 +182,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification, result->workers = expect_integer_value(keyword, value, result); result->options |= BACKUP_COMPRESSION_OPTION_WORKERS; } + else if (strcmp(keyword, "long") == 0) + { + result->zstd_long = expect_integer_value(keyword, value, result); // XXX: expect_bool? + result->options |= BACKUP_COMPRESSION_OPTION_ZSTD_LONG; + } else result->parse_error = psprintf(_("unknown compression option \"%s\""), keyword); diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h index 6a0ecaa99c9..a378631a8da 100644 --- a/src/include/common/backup_compression.h +++ b/src/include/common/backup_compression.h @@ -24,6 +24,7 @@ typedef enum bc_algorithm #define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0) #define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1) +#define BACKUP_COMPRESSION_OPTION_ZSTD_LONG (1 << 2) typedef struct bc_specification { @@ -31,6 +32,7 @@ typedef struct bc_specification unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */ int level; int workers; + int zstd_long; char *parse_error; /* NULL if parsing was OK, else message */ } bc_specification; -- 2.17.1
>From 28c7236534634498265c3e4d6544c836052f009f Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Thu, 10 Mar 2022 20:16:19 -0600 Subject: [PATCH 5/5] pg_basebackup: support Zstd negative compression levels "higher than maximum" is bogus TODO: each compression methods should enforce its own levels --- src/backend/replication/basebackup_zstd.c | 2 +- src/bin/pg_basebackup/bbstreamer_zstd.c | 16 +++++++--------- src/common/backup_compression.c | 6 +++++- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index b900604f59f..e18535bcc13 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -84,7 +84,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress) else { compresslevel = compress->level; - Assert(compresslevel >= 1 && compresslevel <= 22); + Assert(compresslevel >= -7 && compresslevel <= 22 && compresslevel != 0); } if (compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index 3c7396a1373..31fbf2d0bc3 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -89,16 +89,14 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress) } /* Initialize stream compression preferences */ - if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0) - compresslevel = 0; - else - compresslevel = compress->level; - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, - compresslevel); - if (ZSTD_isError(ret)) + + if (compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) { - pg_log_error("could not set zstd compression level to %d: %s", - compresslevel, ZSTD_getErrorName(ret)); + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + pg_log_error("could not set compression level to: %d: %s", + compress->level, ZSTD_getErrorName(ret)); exit(1); } diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c index f43a5608e65..b568eccd65f 100644 --- a/src/common/backup_compression.c +++ b/src/common/backup_compression.c @@ -265,13 +265,17 @@ validate_bc_specification(bc_specification *spec) else if (spec->algorithm == BACKUP_COMPRESSION_LZ4) max_level = 12; else if (spec->algorithm == BACKUP_COMPRESSION_ZSTD) + { max_level = 22; + /* The minimum level depends on the version.. */ + min_level = -7; + } else return psprintf(_("compression algorithm \"%s\" does not accept a compression level"), get_bc_algorithm_name(spec->algorithm)); if (spec->level < min_level || spec->level > max_level) - return psprintf(_("compression algorithm \"%s\" expects a compression level between %d and %d"), + return psprintf(_("compression algorithm \"%s\" expects a nonzero compression level between %d and %d"), get_bc_algorithm_name(spec->algorithm), min_level, max_level); } -- 2.17.1