Changeset: 25d775cb23fc for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/25d775cb23fc
Added Files:
        sql/test/Tests/hot_snapshot_huge_file.py
        sql/test/Tests/hot_snapshot_huge_file.timeout
Modified Files:
        ChangeLog
        sql/storage/store.c
        sql/test/Tests/All
        tools/merovingian/daemon/snapshot.c
Branch: default
Log Message:

Support hot snapshotting huge bats and log files

The regular tar format does not allow member files >8GiB because the
size field in the per-member header is 11 octal digits plus a NUL byte.
However, because we and most tar implementations don't need the NUL
byte, it actually worked up to 64GiB.

This patch adds support for binary size headers as originally introduced
by GNU tar. The new limit is 8 exabyte which is sufficient for the
foreseeable future.


diffs (truncated from 347 to 300 lines):

diff --git a/ChangeLog b/ChangeLog
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
 # ChangeLog file for devel
 # This file is updated with Maddlog
 
+* Mon Sep 16 2024 Joeri van Ruth <joeri.van.r...@monetdbsolutions.com>
+- Hot snapshot: allow member files larger than 64 GiB. By member files we mean
+  the files inside the resulting .tar file, not the tar file itself. Huge 
member
+  files are written using a GNU tar extension to the original tar format, which
+  doesn't support more than 8 GiB.
+
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -2561,13 +2561,8 @@ tar_write_header(stream *tarfile, const 
 {
        char buf[TAR_BLOCK_SIZE] = {0};
        char *cursor = buf;
-       char *chksum;
-
-       if (size > 077777777777) { // 0_777_7777_7777
-               // doesn't fit header field
-               GDKerror("error writing tar file: member %s too large", path);
-               return GDK_FAIL;
-       }
+       char *size_field;
+       char *chksum_field;
 
        // We set the uid/gid fields to 0 and the uname/gname fields to "".
        // When unpacking as a normal user, they are ignored and the files are
@@ -2580,9 +2575,10 @@ tar_write_header(stream *tarfile, const 
        tar_write_header_field(&cursor, 8, "0000644");      // mode[8]
        tar_write_header_field(&cursor, 8, "%07o", 0U);      // uid[8]
        tar_write_header_field(&cursor, 8, "%07o", 0U);      // gid[8]
-       tar_write_header_field(&cursor, 12, "%011"PRIo64, size);      // 
size[12]
+       size_field = cursor;
+       cursor += 12;                                      // size[12]
        tar_write_header_field(&cursor, 12, "%011lo", (unsigned long)mtime); // 
mtime[12]
-       chksum = cursor; // use this later to set the computed checksum
+       chksum_field = cursor; // use this later to set the computed checksum
        tar_write_header_field(&cursor, 8, "%8s", ""); // chksum[8]
        *cursor++ = '0'; // typeflag REGTYPE
        tar_write_header_field(&cursor, 100, "%s", "");  // linkname[100]
@@ -2593,14 +2589,27 @@ tar_write_header(stream *tarfile, const 
        tar_write_header_field(&cursor, 8, "%07o", 0U); // devmajor[8]
        tar_write_header_field(&cursor, 8, "%07o", 0U); // devminor[8]
        tar_write_header_field(&cursor, 155, "%s", ""); // prefix[155]
-
        assert(cursor - buf == 500);
 
+       int64_t max_oct_size = 077777777777;    // 0_777_7777_7777, 11 octal 
digits
+       // max_oct_size = 077; // for testing
+       if (size <= max_oct_size) {
+               tar_write_header_field(&size_field, 12, "%011"PRIo64, size);    
  // size[12]
+       } else {
+               uint8_t *field = (uint8_t *)size_field;
+               field[0] = 0x80;
+               for (int i = 11; i >= 4; i--) {
+                       field[i] = size & 0xFF;
+                       size >>= 8;
+               }
+       }
+
+       // checksum
        unsigned sum = 0;
        for (int i = 0; i < TAR_BLOCK_SIZE; i++)
                sum += (unsigned char) buf[i];
 
-       tar_write_header_field(&chksum, 8, "%06o", sum);
+       tar_write_header_field(&chksum_field, 8, "%06o", sum);
 
        if (mnstr_write(tarfile, buf, TAR_BLOCK_SIZE, 1) != 1) {
                GDKerror("error writing tar header %s: %s", path, 
mnstr_peek_error(tarfile));
@@ -2688,7 +2697,7 @@ tar_copy_stream(stream *tarfile, const c
 }
 
 static gdk_return __attribute__((__warn_unused_result__))
-hot_snapshot_write_tar(stream *out, const char *prefix, char *plan)
+hot_snapshot_write_tar(stream *out, const char *prefix, const char *plan)
 {
        if (plan == NULL)
                return GDK_FAIL;
@@ -2729,6 +2738,11 @@ hot_snapshot_write_tar(stream *out, cons
        src_name = abs_src_path + len - 1; // - 1 because len includes the 
trailing newline
        *src_name++ = DIR_SEP;
 
+       // When testing it's sometimes useful to include the plan in the 
snapshot file
+       // strcpy(dest_name, "_snapshot.plan");
+       // if (tar_write_data(out, dest_path, timestamp, plan, strlen(plan)) != 
GDK_SUCCEED)
+       //      goto end;
+
        char command;
        int64_t size;
        while (sscanf(p, "%c %"SCNi64" %100s\n%n", &command, &size, src_name, 
&len) == 3) {
@@ -2772,7 +2786,7 @@ hot_snapshot_write_tar(stream *out, cons
                ret = tar_write(out, descr, &a, 1);
 
 end:
-       free(plan);
+       free((char*)plan);
        GDKfree(buffer);
        if (infile)
                close_stream(infile);
diff --git a/sql/test/Tests/All b/sql/test/Tests/All
--- a/sql/test/Tests/All
+++ b/sql/test/Tests/All
@@ -130,6 +130,7 @@ HAVE_LIBBZ2&!NOWAL?hot_snapshot_bz2
 HAVE_LIBLZMA&!NOWAL?hot_snapshot_xz
 HAVE_PYTHON_LZ4&HAVE_LIBLZ4&!NOWAL?hot_snapshot_lz4
 !HAVE_PYTHON_LZ4&HAVE_LIBLZ4&!NOWAL?hot_snapshot_lz4_lite
+HAVE_PYTHON_LZ4&HAVE_LIBLZ4?hot_snapshot_huge_file
 
 # The following tests are some old tests moved from sql/test
 ## FOREIGN KEY reference to the same table
diff --git a/sql/test/Tests/hot_snapshot_huge_file.py 
b/sql/test/Tests/hot_snapshot_huge_file.py
new file mode 100644
--- /dev/null
+++ b/sql/test/Tests/hot_snapshot_huge_file.py
@@ -0,0 +1,164 @@
+# Check that hot snapshot deals correctly with very large member files, for
+# example huge BATs or a huge WAL.
+#
+# The classical tar format stores the size of the member files as an 11 digit
+# octal number so it cannot represent members >=8GiB.
+#
+# To represent larger files, hot snapshot switches to a gnu extension
+# recognizable by the first byte of the size field being >=0x80.
+
+from dataclasses import dataclass
+import logging
+import os
+import struct
+import sys
+import tempfile
+
+import lz4.frame
+import pymonetdb
+
+log_level = logging.DEBUG
+log_format = 
'%(levelname)s:t=%(relativeCreated)d:func=%(funcName)s:line=%(lineno)d:%(message)s'
+if '-v' in sys.argv:
+    log_level = logging.INFO
+logging.basicConfig(level=log_level, format=log_format)
+logging.getLogger('pymonetdb').setLevel(logging.WARNING)
+
+logging.addLevelName(logging.DEBUG, '#DEBUG')
+logging.addLevelName(logging.INFO, '#INFO')
+
+SCRATCH_PREFIX = os.getenv('TSTTRGDIR', None)
+
+
+def human_size(size):
+    unit = ''
+    if size > 1500:
+        unit = 'k'
+        size /= 1024
+    if size > 1500:
+        unit = 'M'
+        size /= 1024
+    if size > 1500:
+        unit = 'G'
+        size /= 1024
+    return f'{size:.1f}{unit}'
+
+
+def open_compressed_tar(filename):
+    if filename.endswith('.tar'):
+        return open(filename, 'rb')
+    if filename.endswith('.tar.lz4'):
+        return lz4.frame.LZ4FrameFile(filename, 'rb')
+    raise Exception(f"Don't know how to uncompress {filename}")
+
+
+def generate_huge_table(conn, size_in_gib=8.1):
+    size_in_bytes = int(size_in_gib * 1024 * 1024 * 1024)
+    size_in_rows = int(size_in_bytes / 8)
+    with conn.cursor() as c:
+        c.execute("DROP TABLE IF EXISTS foo")
+        conn.commit()
+        logging.info(f'creating table with {size_in_rows} bigint rows 
({human_size(8 * size_in_rows)} bytes)')
+
+        c.execute("CREATE TABLE foo(i BIGINT)")
+        c.execute("INSERT INTO foo VALUES (0)")
+        n = 1
+
+        while n < size_in_rows:
+            todo = size_in_rows - n
+            i = c.execute("INSERT INTO foo SELECT i FROM foo LIMIT %s", 
(todo,))
+            logging.debug(f'inserted {i} rows')
+            n += i
+
+        c.execute("SELECT COUNT(*) FROM foo")
+        row_count = c.fetchone()[0]
+        assert row_count == size_in_rows, f"row count is {row_count}, expected 
{size_in_rows}"
+        logging.info('inserted all rows, committing')
+        conn.commit()
+        logging.info('committed')
+
+
+def create_tar_file(conn, path):
+    with conn.cursor() as c:
+        logging.info('starting hot_snapshot')
+        c.execute('CALL sys.hot_snapshot(%s, 1)', [path])
+        logging.info('finished hot_snapshot')
+        size = os.stat(path).st_size
+        logging.info(f'compressed tar file {path} has size {size} 
({human_size(size)})')
+
+
+@dataclass
+class Member:
+    name: str
+    raw_size: bytes
+
+    @property
+    def size(self):
+        if self.raw_size[0] < 0x80:
+            field = self.raw_size.rstrip(b'\x00').rstrip(b' ')
+            return int(field, 8)
+        else:
+            num = struct.unpack('>Q', m.raw_size[-8:])[0]
+            return num ^ 0x8000_0000_0000_0000  # strip high bit
+
+
+def parse_tar_file(f):
+    block_size = 512
+    chunk_size = 64 * 1024
+    assert chunk_size >= block_size
+    assert (chunk_size % block_size) == 0
+    chunk = b''
+
+    def extract(block, offset, size):
+        base = block * block_size + offset
+        return chunk[base:base + size]
+
+    while True:
+        chunk = f.read(chunk_size)
+        if not chunk:
+            break
+        assert len(chunk) % block_size == 0
+        for i in range(len(chunk) // block_size):
+            ustar = extract(i, 257, 5)
+            if ustar != b'ustar':
+                continue
+            raw_name = extract(i, 0, 100)
+            name = str(raw_name.rstrip(b'\x00').rstrip(b' '), 'utf-8')
+            size = extract(i, 124, 12)
+            member = Member(name, size)
+            yield member
+
+
+if __name__ == "__main__":
+    dbname = os.getenv('TSTDB', 'demo')
+    port = int(os.getenv('MAPIPORT', '50000'))
+    with pymonetdb.connect(dbname, port=port) as conn, 
tempfile.TemporaryDirectory(dir=SCRATCH_PREFIX) as dir:
+
+        tar_path = os.path.join(dir, 'dump.tar')
+        tar_path += '.lz4'
+        logging.info(f'tar_path = {tar_path}')
+
+        generate_huge_table(conn, 8.1)    # > 8GiB
+        create_tar_file(conn, tar_path)
+
+        member_count = 0
+        huge_member_count = 0
+        with open_compressed_tar(tar_path) as f:
+            for m in parse_tar_file(f):
+                member_count += 1
+                if m.raw_size[0] >= 0x80:
+                    size = m.size
+                    logging.info(f'found huge member size={size}: {m}')
+                    huge_member_count += 1
+        logging.info(f'found {member_count} members')
+        logging.info(f'found {huge_member_count} huge members')
+
+        with conn.cursor() as c:
+            logging.info('dropping the table')
+            c.execute("DROP TABLE foo")
+        conn.commit()
+
+        assert huge_member_count == 1, f"expected 1 huge member file, not 
{huge_member_count}"
+
+    logging.info('goodbye')
+
diff --git a/sql/test/Tests/hot_snapshot_huge_file.timeout 
b/sql/test/Tests/hot_snapshot_huge_file.timeout
new file mode 100644
--- /dev/null
+++ b/sql/test/Tests/hot_snapshot_huge_file.timeout
@@ -0,0 +1,1 @@
+2
diff --git a/tools/merovingian/daemon/snapshot.c 
b/tools/merovingian/daemon/snapshot.c
--- a/tools/merovingian/daemon/snapshot.c
+++ b/tools/merovingian/daemon/snapshot.c
@@ -784,16 +784,26 @@ extract_tar_member_filename(const char *
        return buf;
 }
 
-static ssize_t
+static int64_t
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to