On Fri, Sep 17, 2021 at 08:12:42AM +0000, gkokola...@pm.me wrote:
> Yeah, I was considering it to split them over to a separate commit,
> then decided against it. Will do so in the future.

I have been digging into the issue I saw in the TAP tests when closing
a segment, and found the problem.  The way you manipulate
frameInfo.contentSize by just setting it to WalSegSz when *opening*
a segment causes problems on LZ4F_compressEnd(), making the code
throw a ERROR_frameSize_wrong.  In lz4frame.c, the end of
LZ4F_compressEnd() triggers this check and the error:
    if (cctxPtr->prefs.frameInfo.contentSize) {
        if (cctxPtr->prefs.frameInfo.contentSize != cctxPtr->totalInSize)
            return err0r(LZ4F_ERROR_frameSize_wrong);
    }

We don't really care about contentSize as long as a segment is not
completed.  Rather than filling contentSize all the time we write
something, we'd better update frameInfo once the segment is
completed and closed.  That would also take take of the error as this
is not checked if contentSize is 0.  It seems to me that we should
fill in the information when doing a CLOSE_NORMAL.

-       if (stream->walmethod->compression() == 0 &&
+       if (stream->walmethod->compression() == COMPRESSION_NONE &&
                stream->walmethod->existsfile(fn))
This one was a more serious issue, as the compression() callback would
return an integer for the compression level but v5 compared it to a
WalCompressionMethod.  In order to take care of this issue, mainly for
pg_basebackup, I think that we have to update the compression()
callback to compression_method(), and it is cleaner to save the
compression method as well as the compression level for the tar data.

I am attaching a new patch, on which I have done many tweaks and
adjustments while reviewing it.  The attached patch fixes the second
issue, and I have done nothing about the first issue yet, but that
should be simple enough to address as this needs an update of the
frame info when closing a completed segment.  Could you look at it?

Thanks,
--
Michael
From 37e3800d279566445864ed82f29e8d650c72d8cd Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Sat, 18 Sep 2021 15:11:49 +0900
Subject: [PATCH v6] Teach pg_receivewal to use LZ4 compression

The program pg_receivewal can use gzip compression to store the received WAL.
This commit teaches it to also be able to use LZ4 compression. It is required
that the binary is build using the -llz4 flag. It is enabled via the --with-lz4
flag on configuration time.

Previously, the user had to use the option --compress with a value between [0-9]
to denote that gzip compression was required. This specific behaviour has not
maintained. A newly introduced option --compression-method=[LZ4|gzip] can be
used to ask for the logs to be compressed. Compression values can be selected
only when the compression method is gzip. A compression value of 0 now returns
an error.

Under the hood there is nothing exceptional to be noted. Tar based archives have
not yet been taught to use LZ4 compression. If that is felt useful, then it is
easy to be added in the future.

Tests have been added to verify the creation and correctness of the generated
LZ4 files. The later is achieved by the use of LZ4 program, if present in the
installation.
---
 src/bin/pg_basebackup/Makefile               |   1 +
 src/bin/pg_basebackup/pg_basebackup.c        |   7 +-
 src/bin/pg_basebackup/pg_receivewal.c        | 278 +++++++++++++++----
 src/bin/pg_basebackup/receivelog.c           |   2 +-
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  81 +++++-
 src/bin/pg_basebackup/walmethods.c           | 216 ++++++++++++--
 src/bin/pg_basebackup/walmethods.h           |  16 +-
 doc/src/sgml/ref/pg_receivewal.sgml          |  28 +-
 src/Makefile.global.in                       |   1 +
 src/tools/pgindent/typedefs.list             |   1 +
 10 files changed, 546 insertions(+), 85 deletions(-)

diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 459d514183..387d728345 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -24,6 +24,7 @@ export TAR
 # used by the command "gzip" to pass down options, so stick with a different
 # name.
 export GZIP_PROGRAM=$(GZIP)
+export LZ4
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c 
b/src/bin/pg_basebackup/pg_basebackup.c
index 669aa207a3..18c6a93cec 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -555,10 +555,13 @@ LogStreamerMain(logstreamer_param *param)
        stream.replication_slot = replication_slot;
 
        if (format == 'p')
-               stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0,
+               stream.walmethod = CreateWalDirectoryMethod(param->xlog,
+                                                                               
                        COMPRESSION_NONE, 0,
                                                                                
                        stream.do_sync);
        else
-               stream.walmethod = CreateWalTarMethod(param->xlog, 
compresslevel,
+               stream.walmethod = CreateWalTarMethod(param->xlog,
+                                                                               
          COMPRESSION_NONE, /* ignored */
+                                                                               
          compresslevel,
                                                                                
          stream.do_sync);
 
        if (!ReceiveXlogStream(param->bgconn, &stream))
diff --git a/src/bin/pg_basebackup/pg_receivewal.c 
b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..48fd9491c9 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -29,9 +29,16 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#ifdef HAVE_LIBLZ4
+#include "lz4frame.h"
+#endif
+
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
+/* Default compression level for gzip compression method */
+#define DEFAULT_ZLIB_COMPRESSLEVEL 5
+
 /* Global options */
 static char *basedir = NULL;
 static int     verbose = 0;
@@ -45,6 +52,7 @@ static bool do_drop_slot = false;
 static bool do_sync = true;
 static bool synchronous = false;
 static char *replication_slot = NULL;
+static WalCompressionMethod compression_method = COMPRESSION_NONE;
 static XLogRecPtr endpos = InvalidXLogRecPtr;
 
 
@@ -63,16 +71,6 @@ disconnect_atexit(void)
                PQfinish(conn);
 }
 
-/* Routines to evaluate segment file format */
-#define IsCompressXLogFileName(fname)   \
-       (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
-        strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
-        strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
-#define IsPartialCompressXLogFileName(fname)   \
-       (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
-        strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&         \
-        strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
-
 static void
 usage(void)
 {
@@ -92,7 +90,10 @@ usage(void)
        printf(_("      --synchronous      flush write-ahead log immediately 
after writing\n"));
        printf(_("  -v, --verbose          output verbose messages\n"));
        printf(_("  -V, --version          output version information, then 
exit\n"));
-       printf(_("  -Z, --compress=0-9     compress logs with given compression 
level\n"));
+       printf(_("      --compression-method=METHOD\n"
+                        "                         method to compress logs\n"));
+       printf(_("  -Z, --compress=1-9     compress logs with given compression 
level (default: %d)"),
+                  DEFAULT_ZLIB_COMPRESSLEVEL);
        printf(_("  -?, --help             show this help, then exit\n"));
        printf(_("\nConnection options:\n"));
        printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -108,6 +109,79 @@ usage(void)
        printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 }
 
+
+/*
+ * Check if the filename looks like an xlog file. Also note if it is partial
+ * and/or compressed file.
+ */
+static bool
+is_xlogfilename(const char *filename, bool *ispartial,
+                               WalCompressionMethod *wal_compression_method)
+{
+       size_t          fname_len = strlen(filename);
+       size_t          xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
+
+       /* File does not look like a XLOG file */
+       if (xlog_pattern_len != XLOG_FNAME_LEN)
+               return false;
+
+       /* File looks like a complete uncompressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN)
+       {
+               *ispartial = false;
+               *wal_compression_method = COMPRESSION_NONE;
+               return true;
+       }
+
+       /* File looks like a complete zlib compressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
+               strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
+       {
+               *ispartial = false;
+               *wal_compression_method = COMPRESSION_ZLIB;
+               return true;
+       }
+
+       /* File looks like a complete LZ4 compressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
+               strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
+       {
+               *ispartial = false;
+               *wal_compression_method = COMPRESSION_LZ4;
+               return true;
+       }
+
+       /* File looks like a partial uncompressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
+               strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
+       {
+               *ispartial = true;
+               *wal_compression_method = COMPRESSION_NONE;
+               return true;
+       }
+
+       /* File looks like a partial zlib compressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
+               strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
+       {
+               *ispartial = true;
+               *wal_compression_method = COMPRESSION_ZLIB;
+               return true;
+       }
+
+       /* File looks like a partial LZ4 compressed XLOG file */
+       if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
+               strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
+       {
+               *ispartial = true;
+               *wal_compression_method = COMPRESSION_LZ4;
+               return true;
+       }
+
+       /* File does not look like something we recognise */
+       return false;
+}
+
 static bool
 stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
 {
@@ -213,33 +287,11 @@ FindStreamingStart(uint32 *tli)
        {
                uint32          tli;
                XLogSegNo       segno;
+               WalCompressionMethod wal_compression_method;
                bool            ispartial;
-               bool            iscompress;
 
-               /*
-                * Check if the filename looks like an xlog file, or a .partial 
file.
-                */
-               if (IsXLogFileName(dirent->d_name))
-               {
-                       ispartial = false;
-                       iscompress = false;
-               }
-               else if (IsPartialXLogFileName(dirent->d_name))
-               {
-                       ispartial = true;
-                       iscompress = false;
-               }
-               else if (IsCompressXLogFileName(dirent->d_name))
-               {
-                       ispartial = false;
-                       iscompress = true;
-               }
-               else if (IsPartialCompressXLogFileName(dirent->d_name))
-               {
-                       ispartial = true;
-                       iscompress = true;
-               }
-               else
+               if (!is_xlogfilename(dirent->d_name,
+                                                        &ispartial, 
&wal_compression_method))
                        continue;
 
                /*
@@ -250,14 +302,18 @@ FindStreamingStart(uint32 *tli)
                /*
                 * Check that the segment has the right size, if it's supposed 
to be
                 * completed.  For non-compressed segments just check the 
on-disk size
-                * and see if it matches a completed segment. For compressed 
segments,
-                * look at the last 4 bytes of the compressed file, which is 
where the
-                * uncompressed size is located for gz files with a size lower 
than
-                * 4GB, and then compare it to the size of a completed segment. 
The 4
-                * last bytes correspond to the ISIZE member according to
-                * http://www.zlib.org/rfc-gzip.html.
+                * and see if it matches a completed segment. For zlib 
compressed
+                * segments, look at the last 4 bytes of the compressed file, 
which is
+                * where the uncompressed size is located for gz files with a 
size
+                * lower than 4GB, and then compare it to the size of a 
completed
+                * segment. The 4 last bytes correspond to the ISIZE member 
according
+                * to http://www.zlib.org/rfc-gzip.html.
+                *
+                * For LZ4 compressed segments read the header using the 
exposed API
+                * and compare the uncompressed file size, stored in
+                * LZ4F_frameInfo_t{.contentSize}, to that of a completed 
segment.
                 */
-               if (!ispartial && !iscompress)
+               if (!ispartial && wal_compression_method == COMPRESSION_NONE)
                {
                        struct stat statbuf;
                        char            fullpath[MAXPGPATH * 2];
@@ -276,7 +332,7 @@ FindStreamingStart(uint32 *tli)
                                continue;
                        }
                }
-               else if (!ispartial && iscompress)
+               else if (!ispartial && wal_compression_method == 
COMPRESSION_ZLIB)
                {
                        int                     fd;
                        char            buf[4];
@@ -322,6 +378,80 @@ FindStreamingStart(uint32 *tli)
                                continue;
                        }
                }
+               else if (!ispartial && compression_method == COMPRESSION_LZ4)
+               {
+#ifdef HAVE_LIBLZ4
+                       int                     fd;
+                       int                     r;
+                       size_t          consumed_len = LZ4F_HEADER_SIZE_MAX;
+                       char            buf[LZ4F_HEADER_SIZE_MAX];
+                       char            fullpath[MAXPGPATH * 2];
+                       LZ4F_frameInfo_t frame_info = {0};
+                       LZ4F_decompressionContext_t ctx = NULL;
+                       LZ4F_errorCode_t status;
+
+                       snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, 
dirent->d_name);
+
+                       fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
+                       if (fd < 0)
+                       {
+                               pg_log_error("could not open file \"%s\": %m", 
fullpath);
+                               exit(1);
+                       }
+
+                       r = read(fd, buf, sizeof(buf));
+                       if (r != sizeof(buf))
+                       {
+                               if (r < 0)
+                                       pg_log_error("could not read file 
\"%s\": %m", fullpath);
+                               else
+                                       pg_log_error("could not read file 
\"%s\": read %d of %zu",
+                                                                fullpath, r, 
sizeof(buf));
+                               exit(1);
+                       }
+                       close(fd);
+
+                       status = LZ4F_createDecompressionContext(&ctx, 
LZ4F_VERSION);
+                       if (LZ4F_isError(status))
+                       {
+                               pg_log_error("could not create LZ4 
decompression context: %s",
+                                                        
LZ4F_getErrorName(status));
+                               exit(1);
+                       }
+
+                       LZ4F_getFrameInfo(ctx, &frame_info, (void *) buf, 
&consumed_len);
+                       if (consumed_len <= LZ4F_HEADER_SIZE_MIN ||
+                               consumed_len >= LZ4F_HEADER_SIZE_MAX)
+                       {
+                               pg_log_warning("compressed segment file \"%s\" 
has incorrect header size %lu, skipping",
+                                                          dirent->d_name, 
consumed_len);
+                               (void) LZ4F_freeDecompressionContext(ctx);
+                               continue;
+                       }
+
+                       if (frame_info.contentSize != WalSegSz)
+                       {
+                               pg_log_warning("compressed segment file \"%s\" 
has incorrect uncompressed size %lld, skipping",
+                                                          dirent->d_name, 
frame_info.contentSize);
+                               (void) LZ4F_freeDecompressionContext(ctx);
+                               continue;
+                       }
+
+                       status = LZ4F_freeDecompressionContext(ctx);
+                       if (LZ4F_isError(status))
+                       {
+                               pg_log_error("could not free LZ4 decompression 
context: %s",
+                                                        
LZ4F_getErrorName(status));
+                               exit(1);
+                       }
+#else
+                       pg_log_error("could not check segment file \"%s\" 
compressed with LZ4",
+                                                dirent->d_name);
+                       pg_log_error("this build does not support compression 
with %s",
+                                                "LZ4");
+                       exit(1);
+#endif
+               }
 
                /* Looks like a valid segment. Remember that we saw it. */
                if ((segno > high_segno) ||
@@ -432,7 +562,9 @@ StreamLog(void)
        stream.synchronous = synchronous;
        stream.do_sync = do_sync;
        stream.mark_done = false;
-       stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+       stream.walmethod = CreateWalDirectoryMethod(basedir,
+                                                                               
                compression_method,
+                                                                               
                compresslevel,
                                                                                
                stream.do_sync);
        stream.partial_suffix = ".partial";
        stream.replication_slot = replication_slot;
@@ -485,6 +617,7 @@ main(int argc, char **argv)
                {"status-interval", required_argument, NULL, 's'},
                {"slot", required_argument, NULL, 'S'},
                {"verbose", no_argument, NULL, 'v'},
+               {"compression-method", required_argument, NULL, 'I'},
                {"compress", required_argument, NULL, 'Z'},
 /* action */
                {"create-slot", no_argument, NULL, 1},
@@ -570,8 +703,22 @@ main(int argc, char **argv)
                        case 'v':
                                verbose++;
                                break;
+                       case 'I':
+                               if (pg_strcasecmp(optarg, "gzip") == 0)
+                                       compression_method = COMPRESSION_ZLIB;
+                               else if (pg_strcasecmp(optarg, "lz4") == 0)
+                                       compression_method = COMPRESSION_LZ4;
+                               else if (pg_strcasecmp(optarg, "none") == 0)
+                                       compression_method = COMPRESSION_NONE;
+                               else
+                               {
+                                       pg_log_error("invalid value \"%s\" for 
option %s",
+                                                                optarg, 
"--compress-method");
+                                       exit(1);
+                               }
+                               break;
                        case 'Z':
-                               if (!option_parse_int(optarg, "-Z/--compress", 
0, 9,
+                               if (!option_parse_int(optarg, "-Z/--compress", 
1, 9,
                                                                          
&compresslevel))
                                        exit(1);
                                break;
@@ -651,13 +798,44 @@ main(int argc, char **argv)
                exit(1);
        }
 
-#ifndef HAVE_LIBZ
-       if (compresslevel != 0)
+
+       /*
+        * Compression related arguments
+        */
+       if (compression_method != COMPRESSION_NONE)
        {
-               pg_log_error("this build does not support compression");
+#ifndef HAVE_LIBZ
+               if (compression_method == COMPRESSION_ZLIB)
+               {
+                       pg_log_error("this build does not support compression 
with %s",
+                                                "gzip");
+                       exit(1);
+               }
+#endif
+#ifndef HAVE_LIBLZ4
+               if (compression_method == COMPRESSION_LZ4)
+               {
+                       pg_log_error("this build does not support compression 
with %s",
+                                                "LZ4");
+                       exit(1);
+               }
+#endif
+       }
+
+       if (compression_method != COMPRESSION_ZLIB && compresslevel != 0)
+       {
+               pg_log_error("can only use --compress with 
--compression-method=gzip");
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                               progname);
                exit(1);
        }
-#endif
+
+       if (compression_method == COMPRESSION_ZLIB && compresslevel == 0)
+       {
+               pg_log_info("no --compression specified, will be using %d",
+                                       DEFAULT_ZLIB_COMPRESSLEVEL);
+               compresslevel = DEFAULT_ZLIB_COMPRESSLEVEL;
+       }
 
        /*
         * Check existence of destination folder.
diff --git a/src/bin/pg_basebackup/receivelog.c 
b/src/bin/pg_basebackup/receivelog.c
index 72b8d9e315..2d4f660daa 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -109,7 +109,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
         * When streaming to tar, no file with this name will exist before, so 
we
         * never have to verify a size.
         */
-       if (stream->walmethod->compression() == 0 &&
+       if (stream->walmethod->compression_method() == COMPRESSION_NONE &&
                stream->walmethod->existsfile(fn))
        {
                size = stream->walmethod->get_file_size(fn);
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl 
b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..1bd8fd0d3e 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 34;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -33,6 +33,13 @@ $primary->command_fails(
 $primary->command_fails(
        [ 'pg_receivewal', '-D', $stream_dir, '--synchronous', '--no-sync' ],
        'failure if --synchronous specified with --no-sync');
+$primary->command_fails_like(
+       [
+               'pg_receivewal', '-D', $stream_dir, '--compression-method', 
'none',
+               '--compress',    '1'
+       ],
+       qr/\Qpg_receivewal: error: can only use --compress with 
--compression-method=gzip/,
+       'failure if --compression-method=none specified with --compress');
 
 # Slot creation and drop
 my $slot_name = 'test';
@@ -41,7 +48,7 @@ $primary->command_ok(
        'creating a replication slot');
 my $slot = $primary->slot($slot_name);
 is($slot->{'slot_type'}, 'physical', 'physical replication slot was created');
-is($slot->{'restart_lsn'}, '', 'restart LSN of new slot is null');
+is($slot->{'restart_lsn'}, '',       'restart LSN of new slot is null');
 $primary->command_ok([ 'pg_receivewal', '--slot', $slot_name, '--drop-slot' ],
        'dropping a replication slot');
 is($primary->slot($slot_name)->{'slot_type'},
@@ -90,8 +97,11 @@ SKIP:
        # a valid value.
        $primary->command_ok(
                [
-                       'pg_receivewal', '-D',     $stream_dir,  '--verbose',
-                       '--endpos',      $nextlsn, '--compress', '1 ',
+                       'pg_receivewal',        '-D',
+                       $stream_dir,            '--verbose',
+                       '--endpos',             $nextlsn,
+                       '--compression-method', 'gzip',
+                       '--compress',           '1 ',
                        '--no-loop'
                ],
                "streaming some WAL using ZLIB compression");
@@ -128,14 +138,71 @@ SKIP:
                "gzip verified the integrity of compressed WAL segments");
 }
 
+# Check LZ4 compression if available
+SKIP:
+{
+       skip "postgres was not built with LZ4 support", 5
+         if (!check_pg_config("#define HAVE_LIBLZ4 1"));
+
+       # Generate more WAL including one completed, compressed segment.
+       $primary->psql('postgres', 'SELECT pg_switch_wal();');
+       $nextlsn =
+         $primary->safe_psql('postgres', 'SELECT 
pg_current_wal_insert_lsn();');
+       chomp($nextlsn);
+       $primary->psql('postgres',
+               'INSERT INTO test_table VALUES (generate_series(201,300));');
+
+       # Stream up to the given position
+       $primary->command_ok(
+               [
+                       'pg_receivewal', '-D',
+                       $stream_dir,     '--verbose',
+                       '--endpos',      $nextlsn,
+                       '--no-loop',     '--compression-method',
+                       'lz4'
+               ],
+               'streaming some WAL using --compression-method=lz4');
+
+       # Verify that the stored files are generated with their expected
+       # names.
+       my @lz4_wals = glob "$stream_dir/*.lz4";
+       is(scalar(@lz4_wals), 1,
+               "one WAL segment compressed with LZ4 was created");
+       my @lz4_partial_wals = glob "$stream_dir/*.lz4.partial";
+       is(scalar(@lz4_partial_wals),
+               1, "one partial WAL segment compressed with LZ4 was created");
+
+       # Verify that the start streaming position is computed correctly by
+       # comparing it with the partial file generated previously.  The name
+       # of the previous partial, now-completed WAL segment is updated, keeping
+       # its base number.
+       $partial_wals[0] =~ s/(\.gz)?\.partial$/.lz4/;
+       is($lz4_wals[0] eq $partial_wals[0],
+               1, "one partial WAL segment is now completed");
+       # Update the list of partial wals with the current one.
+       @partial_wals = @lz4_partial_wals;
+
+       # Check the integrity of the completed segment, if LZ4 is an available
+       # command.
+       my $lz4 = $ENV{LZ4};
+       skip "program lz4 is not found in your system", 1
+         if ( !defined $lz4
+               || $lz4 eq ''
+               || system_log($lz4, '--version') != 0);
+
+       my $lz4_is_valid = system_log($lz4, '-t', @lz4_wals);
+       is($lz4_is_valid, 0,
+               "lz4 verified the integrity of compressed WAL segments");
+}
+
 # Verify that the start streaming position is computed and that the value is
-# correct regardless of whether ZLIB is available.
+# correct regardless of whether any compression is available.
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 $nextlsn =
   $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
 chomp($nextlsn);
 $primary->psql('postgres',
-       'INSERT INTO test_table VALUES (generate_series(200,300));');
+       'INSERT INTO test_table VALUES (generate_series(301,400));');
 $primary->command_ok(
        [
                'pg_receivewal', '-D',     $stream_dir, '--verbose',
@@ -143,7 +210,7 @@ $primary->command_ok(
        ],
        "streaming some WAL");
 
-$partial_wals[0] =~ s/(\.gz)?.partial//;
+$partial_wals[0] =~ s/(\.gz|\.lz4)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
 # Permissions on WAL files should be default
diff --git a/src/bin/pg_basebackup/walmethods.c 
b/src/bin/pg_basebackup/walmethods.c
index 8695647db4..6c43164e38 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
 #include <sys/stat.h>
 #include <time.h>
 #include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
 #ifdef HAVE_LIBZ
 #include <zlib.h>
 #endif
@@ -30,6 +34,9 @@
 /* Size of zlib buffer for .tar.gz */
 #define ZLIB_OUT_SIZE 4096
 
+/* Size of lz4 input chunk for .lz4 */
+#define LZ4_IN_SIZE  4096
+
 /*-------------------------------------------------------------------------
  * WalDirectoryMethod - write wal to a directory looking like pg_wal
  *-------------------------------------------------------------------------
@@ -41,6 +48,7 @@
 typedef struct DirectoryMethodData
 {
        char       *basedir;
+       WalCompressionMethod compression_method;
        int                     compression;
        bool            sync;
 } DirectoryMethodData;
@@ -59,6 +67,11 @@ typedef struct DirectoryMethodFile
 #ifdef HAVE_LIBZ
        gzFile          gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+       LZ4F_compressionContext_t ctx;
+       size_t          lz4bufsize;
+       void       *lz4buf;
+#endif
 } DirectoryMethodFile;
 
 static const char *
@@ -74,7 +87,9 @@ dir_get_file_name(const char *pathname, const char 
*temp_suffix)
        char       *filename = pg_malloc0(MAXPGPATH * sizeof(char));
 
        snprintf(filename, MAXPGPATH, "%s%s%s",
-                        pathname, dir_data->compression > 0 ? ".gz" : "",
+                        pathname,
+                        dir_data->compression_method == COMPRESSION_ZLIB ? 
".gz" :
+                        dir_data->compression_method == COMPRESSION_LZ4 ? 
".lz4" : "",
                         temp_suffix ? temp_suffix : "");
 
        return filename;
@@ -90,6 +105,11 @@ dir_open_for_write(const char *pathname, const char 
*temp_suffix, size_t pad_to_
 #ifdef HAVE_LIBZ
        gzFile          gzfp = NULL;
 #endif
+#ifdef HAVE_LIBLZ4
+       LZ4F_compressionContext_t ctx = NULL;
+       size_t          lz4bufsize = 0;
+       void       *lz4buf = NULL;
+#endif
 
        filename = dir_get_file_name(pathname, temp_suffix);
        snprintf(tmppath, sizeof(tmppath), "%s/%s",
@@ -107,7 +127,7 @@ dir_open_for_write(const char *pathname, const char 
*temp_suffix, size_t pad_to_
                return NULL;
 
 #ifdef HAVE_LIBZ
-       if (dir_data->compression > 0)
+       if (dir_data->compression_method == COMPRESSION_ZLIB)
        {
                gzfp = gzdopen(fd, "wb");
                if (gzfp == NULL)
@@ -124,9 +144,59 @@ dir_open_for_write(const char *pathname, const char 
*temp_suffix, size_t pad_to_
                }
        }
 #endif
+#ifdef HAVE_LIBLZ4
+       if (dir_data->compression_method == COMPRESSION_LZ4)
+       {
+               LZ4F_preferences_t lz4preferences = {0};
+               size_t          ctx_out;
+               size_t          header_size;
+
+               /*
+                * Set all the preferences to default but do note contentSize. 
It will
+                * be needed in FindStreamingStart.
+                */
+               memset(&lz4preferences, 0, sizeof(LZ4F_frameInfo_t));
+               lz4preferences.frameInfo.contentSize = (unsigned long long) 
WalSegSz;
+               ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+               lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, &lz4preferences);
+               if (LZ4F_isError(ctx_out))
+               {
+                       close(fd);
+                       return NULL;
+               }
+
+               lz4buf = pg_malloc0(lz4bufsize);
+
+               /* add the header */
+               header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, 
&lz4preferences);
+               if (LZ4F_isError(header_size))
+               {
+                       pg_free(lz4buf);
+                       close(fd);
+                       return NULL;
+               }
+
+               errno = 0;
+               if (write(fd, lz4buf, header_size) != header_size)
+               {
+                       int                     save_errno = errno;
+
+                       (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+                       (void) LZ4F_freeCompressionContext(ctx);
+                       pg_free(lz4buf);
+                       close(fd);
+
+                       /*
+                        * If write didn't set errno, assume problem is no disk 
space.
+                        */
+                       errno = save_errno ? save_errno : ENOSPC;
+                       return NULL;
+               }
+       }
+#endif
 
        /* Do pre-padding on non-compressed files */
-       if (pad_to_size && dir_data->compression == 0)
+       if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
        {
                PGAlignedXLogBlock zerobuf;
                int                     bytes;
@@ -171,9 +241,19 @@ dir_open_for_write(const char *pathname, const char 
*temp_suffix, size_t pad_to_
                        fsync_parent_path(tmppath) != 0)
                {
 #ifdef HAVE_LIBZ
-                       if (dir_data->compression > 0)
+                       if (dir_data->compression_method == COMPRESSION_ZLIB)
                                gzclose(gzfp);
                        else
+#endif
+#ifdef HAVE_LIBLZ4
+                       if (dir_data->compression_method == COMPRESSION_LZ4)
+                       {
+                               (void) LZ4F_compressEnd(ctx, lz4buf, 
lz4bufsize, NULL);
+                               (void) LZ4F_freeCompressionContext(ctx);
+                               pg_free(lz4buf);
+                               close(fd);
+                       }
+                       else
 #endif
                                close(fd);
                        return NULL;
@@ -182,9 +262,18 @@ dir_open_for_write(const char *pathname, const char 
*temp_suffix, size_t pad_to_
 
        f = pg_malloc0(sizeof(DirectoryMethodFile));
 #ifdef HAVE_LIBZ
-       if (dir_data->compression > 0)
+       if (dir_data->compression_method == COMPRESSION_ZLIB)
                f->gzfp = gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+       if (dir_data->compression_method == COMPRESSION_LZ4)
+       {
+               f->ctx = ctx;
+               f->lz4buf = lz4buf;
+               f->lz4bufsize = lz4bufsize;
+       }
+#endif
+
        f->fd = fd;
        f->currpos = 0;
        f->pathname = pg_strdup(pathname);
@@ -204,9 +293,46 @@ dir_write(Walfile f, const void *buf, size_t count)
        Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-       if (dir_data->compression > 0)
+       if (dir_data->compression_method == COMPRESSION_ZLIB)
                r = (ssize_t) gzwrite(df->gzfp, buf, count);
        else
+#endif
+#ifdef HAVE_LIBLZ4
+       if (dir_data->compression_method == COMPRESSION_LZ4)
+       {
+               size_t          chunk;
+               size_t          remaining;
+               const void *inbuf = buf;
+
+               remaining = count;
+               while (remaining > 0)
+               {
+                       size_t          compressed;
+
+                       if (remaining > LZ4_IN_SIZE)
+                               chunk = LZ4_IN_SIZE;
+                       else
+                               chunk = remaining;
+
+                       remaining -= chunk;
+                       compressed = LZ4F_compressUpdate(df->ctx,
+                                                                               
         df->lz4buf, df->lz4bufsize,
+                                                                               
         inbuf, chunk,
+                                                                               
         NULL);
+
+                       if (LZ4F_isError(compressed))
+                               return -1;
+
+                       if (write(df->fd, df->lz4buf, compressed) != compressed)
+                               return -1;
+
+                       inbuf = ((char *) inbuf) + chunk;
+               }
+
+               /* Our caller keeps track of the uncompressed size. */
+               r = (ssize_t) count;
+       }
+       else
 #endif
                r = write(df->fd, buf, count);
        if (r > 0)
@@ -234,9 +360,29 @@ dir_close(Walfile f, WalCloseMethod method)
        Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-       if (dir_data->compression > 0)
+       if (dir_data->compression_method == COMPRESSION_ZLIB)
                r = gzclose(df->gzfp);
        else
+#endif
+#ifdef HAVE_LIBLZ4
+       if (dir_data->compression_method == COMPRESSION_LZ4)
+       {
+               /* Flush any internal buffers */
+               size_t          compressed;
+
+               compressed = LZ4F_compressEnd(df->ctx,
+                                                                         
df->lz4buf, df->lz4bufsize,
+                                                                         NULL);
+
+               if (LZ4F_isError(compressed))
+                       return -1;
+
+               if (write(df->fd, df->lz4buf, compressed) != compressed)
+                       return -1;
+
+               r = close(df->fd);
+       }
+       else
 #endif
                r = close(df->fd);
 
@@ -291,6 +437,12 @@ dir_close(Walfile f, WalCloseMethod method)
                }
        }
 
+#ifdef HAVE_LIBLZ4
+       pg_free(df->lz4buf);
+       /* supports free on NULL */
+       LZ4F_freeCompressionContext(df->ctx);
+#endif
+
        pg_free(df->pathname);
        pg_free(df->fullpath);
        if (df->temp_suffix)
@@ -309,12 +461,27 @@ dir_sync(Walfile f)
                return 0;
 
 #ifdef HAVE_LIBZ
-       if (dir_data->compression > 0)
+       if (dir_data->compression_method == COMPRESSION_ZLIB)
        {
                if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != 
Z_OK)
                        return -1;
        }
 #endif
+#ifdef HAVE_LIBLZ4
+       if (dir_data->compression_method == COMPRESSION_LZ4)
+       {
+               DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+               size_t          compressed;
+
+               /* Flush any internal buffers */
+               compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, 
NULL);
+               if (LZ4F_isError(compressed))
+                       return -1;
+
+               if (write(df->fd, df->lz4buf, compressed) != compressed)
+                       return -1;
+       }
+#endif
 
        return fsync(((DirectoryMethodFile *) f)->fd);
 }
@@ -334,10 +501,10 @@ dir_get_file_size(const char *pathname)
        return statbuf.st_size;
 }
 
-static int
-dir_compression(void)
+static WalCompressionMethod
+dir_compression_method(void)
 {
-       return dir_data->compression;
+       return dir_data->compression_method;
 }
 
 static bool
@@ -373,7 +540,9 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
+CreateWalDirectoryMethod(const char *basedir,
+                                                WalCompressionMethod 
compression_method,
+                                                int compression, bool sync)
 {
        WalWriteMethod *method;
 
@@ -383,7 +552,7 @@ CreateWalDirectoryMethod(const char *basedir, int 
compression, bool sync)
        method->get_current_pos = dir_get_current_pos;
        method->get_file_size = dir_get_file_size;
        method->get_file_name = dir_get_file_name;
-       method->compression = dir_compression;
+       method->compression_method = dir_compression_method;
        method->close = dir_close;
        method->sync = dir_sync;
        method->existsfile = dir_existsfile;
@@ -391,6 +560,7 @@ CreateWalDirectoryMethod(const char *basedir, int 
compression, bool sync)
        method->getlasterror = dir_getlasterror;
 
        dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+       dir_data->compression_method = compression_method;
        dir_data->compression = compression;
        dir_data->basedir = pg_strdup(basedir);
        dir_data->sync = sync;
@@ -424,6 +594,7 @@ typedef struct TarMethodData
 {
        char       *tarfilename;
        int                     fd;
+       WalCompressionMethod compression_method;
        int                     compression;
        bool            sync;
        TarMethodFile *currentfile;
@@ -731,10 +902,10 @@ tar_get_file_size(const char *pathname)
        return -1;
 }
 
-static int
-tar_compression(void)
+static WalCompressionMethod
+tar_compression_method(void)
 {
-       return tar_data->compression;
+       return tar_data->compression_method;
 }
 
 static off_t
@@ -1031,8 +1202,16 @@ tar_finish(void)
        return true;
 }
 
+/*
+ * The argument compression_method is currently ignored. It is in place for
+ * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
+ * between the different compression methods. CreateWalTarMethod and its family
+ * of functions handle only zlib compression.
+ */
 WalWriteMethod *
-CreateWalTarMethod(const char *tarbase, int compression, bool sync)
+CreateWalTarMethod(const char *tarbase,
+                                  WalCompressionMethod compression_method,
+                                  int compression, bool sync)
 {
        WalWriteMethod *method;
        const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
@@ -1043,7 +1222,7 @@ CreateWalTarMethod(const char *tarbase, int compression, 
bool sync)
        method->get_current_pos = tar_get_current_pos;
        method->get_file_size = tar_get_file_size;
        method->get_file_name = tar_get_file_name;
-       method->compression = tar_compression;
+       method->compression_method = tar_compression_method;
        method->close = tar_close;
        method->sync = tar_sync;
        method->existsfile = tar_existsfile;
@@ -1054,6 +1233,7 @@ CreateWalTarMethod(const char *tarbase, int compression, 
bool sync)
        tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 
1);
        sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
        tar_data->fd = -1;
+       tar_data->compression_method = compression_method;
        tar_data->compression = compression;
        tar_data->sync = sync;
 #ifdef HAVE_LIBZ
diff --git a/src/bin/pg_basebackup/walmethods.h 
b/src/bin/pg_basebackup/walmethods.h
index 4abdfd8333..3e378c87b6 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -19,6 +19,13 @@ typedef enum
        CLOSE_NO_RENAME
 } WalCloseMethod;
 
+typedef enum
+{
+       COMPRESSION_LZ4,
+       COMPRESSION_ZLIB,
+       COMPRESSION_NONE
+} WalCompressionMethod;
+
 /*
  * A WalWriteMethod structure represents the different methods used
  * to write the streaming WAL as it's received.
@@ -58,8 +65,8 @@ struct WalWriteMethod
         */
        char       *(*get_file_name) (const char *pathname, const char 
*temp_suffix);
 
-       /* Return the level of compression */
-       int                     (*compression) (void);
+       /* Returns the compression method */
+       WalCompressionMethod (*compression_method) (void);
 
        /*
         * Write count number of bytes to the file, and return the number of 
bytes
@@ -95,8 +102,11 @@ struct WalWriteMethod
  *                                                not all those required for 
pg_receivewal)
  */
 WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+                                                                               
 WalCompressionMethod compression_method,
                                                                                
 int compression, bool sync);
-WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool 
sync);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase,
+                                                                  
WalCompressionMethod compression_method,
+                                                                  int 
compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
 void           FreeWalDirectoryMethod(void);
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml 
b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..f6c710bfe3 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -229,15 +229,35 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--compression-method=<replaceable 
class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables compression of write-ahead logs using the specified method.
+        Supported values are <literal>lz4</literal>, <literal>gzip</literal>
+        and <literal>none</literal>.
+        For the <productname>LZ4</productname> method to be available,
+        <productname>PostgreSQL</productname> must have been have been compiled
+        with <option>--with-lz4</option>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-Z <replaceable 
class="parameter">level</replaceable></option></term>
       <term><option>--compress=<replaceable 
class="parameter">level</replaceable></option></term>
       <listitem>
        <para>
-        Enables gzip compression of write-ahead logs, and specifies the
-        compression level (0 through 9, 0 being no compression and 9 being best
-        compression).  The suffix <filename>.gz</filename> will
-        automatically be added to all filenames.
+        Specifies the compression level (<literal>1</literal> through
+        <literal>9</literal>, <literal>1</literal> being worst compression
+        and <literal>9</literal> being best compression) for
+        <application>gzip</application> compressed WAL segments. The
+        default value is <literal>5</literal>.
+       </para>
+
+       <para>
+        This option requires <option>--compression-method</option> to be
+        specified with <literal>gzip</literal>.
        </para>
       </listitem>
      </varlistentry>
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index e4fd7b5290..c03b646727 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -350,6 +350,7 @@ XGETTEXT = @XGETTEXT@
 
 GZIP   = gzip
 BZIP2  = bzip2
+LZ4    = lz4
 
 DOWNLOAD = wget -O $@ --no-use-server-timestamps
 #DOWNLOAD = curl -o $@
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 402a6617a9..434db61fdf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2851,6 +2851,7 @@ WaitEventTimeout
 WaitPMResult
 WalCloseMethod
 WalCompression
+WalCompressionMethod
 WalLevel
 WalRcvData
 WalRcvExecResult
-- 
2.33.0

Attachment: signature.asc
Description: PGP signature

Reply via email to