[Sorry, can't use git-send-email at the moment ...] These patches implement a sliding window for the streaming plugin[1] in nbdkit.
I would like to be able to stream a filesystem from tools such as 'virt-make-fs'[2]. This is a fairly frequently requested feature. Unfortunately: (a) The patches make the code significantly more complex and therefore likely to have bugs. (b) They are not practically useful. 'parted' likes to write to the beginning and end of a disk, even when creating a simple MBR, and of course 'mkfs' scribbles the group headers across the whole disk when creating a filesystem. A simple window approach is obviously not sufficient. A better approach might be something like a sparse, size-limited map recording writes at any point in the disk. But that has the problem that you don't know when you can commit a write to the stream -- some heuristic would have to be used. I'm posting them to the mailing list for the record and in case anyone has any better ideas. Rich. [1] http://rwmj.wordpress.com/2014/10/14/streaming-nbd-server/#content [2] http://libguestfs.org/virt-make-fs.1.html -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com Fedora Windows cross-compiler. Compile Windows programs, test, and build Windows installers. Over 100 libraries supported. http://fedoraproject.org/wiki/MinGW
>From be039f70da0c3ece9075724bf5ff29a45038dce5 Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" <rjo...@redhat.com> Date: Tue, 14 Oct 2014 16:38:50 +0200 Subject: [PATCH 1/2] streaming: Implement sliding window and add window=SIZE parameter. --- plugins/streaming/nbdkit-streaming-plugin.pod | 17 +- plugins/streaming/streaming.c | 290 +++++++++++++++++++++----- 2 files changed, 256 insertions(+), 51 deletions(-) diff --git a/plugins/streaming/nbdkit-streaming-plugin.pod b/plugins/streaming/nbdkit-streaming-plugin.pod index a21ed4f..635af69 100644 --- a/plugins/streaming/nbdkit-streaming-plugin.pod +++ b/plugins/streaming/nbdkit-streaming-plugin.pod @@ -6,7 +6,7 @@ nbdkit-streaming-plugin - nbdkit streaming plugin =head1 SYNOPSIS - nbdkit streaming pipe=FILENAME [size=SIZE] + nbdkit streaming pipe=FILENAME [size=SIZE] [window=SIZE] =head1 DESCRIPTION @@ -50,12 +50,23 @@ Whether you need to specify this parameter depends on the client. Some clients don't check the size and just write/stream, others do checks or calculations based on the apparent size. +=item B<window=SIZE> + +Specify a sliding window of data, allowing limited seeking backwards +and reads. You can use any size specifier permitted by +C<nbdkit_parse_size>, eg. C<window=1M>. + +Note that this is disabled (set to 0) by default, since enabling it +causes writes to be delayed until the client moves the window forward +or until nbdkit exits. + =back =head1 TO DO -This plugin would be much nicer if it supported the concept of a -"window" of data, allowing limited reverse seeks and reads. +Separate read and write windows would make more sense, allowing a +large read window and a small write window. The smaller (or zero) +write window would mean that writes are not delayed. =head1 SEE ALSO diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c index f58fa46..2d08803 100644 --- a/plugins/streaming/streaming.c +++ b/plugins/streaming/streaming.c @@ -41,20 +41,28 @@ #include <sys/types.h> #include <sys/stat.h> #include <errno.h> +#include <assert.h> #include <nbdkit-plugin.h> +#define min(a,b) ((a)<(b)?(a):(b)) + static char *filename = NULL; static int fd = -1; /* In theory INT64_MAX, but it breaks qemu's NBD driver. */ static int64_t size = INT64_MAX/2; -/* Flag if we have entered the unrecoverable error state because of - * a seek backwards. +/* Flag if we have entered the unrecoverable error state because of a + * seek backwards beyond the window. */ static int errorstate = 0; +/* Window. */ +static int64_t window_max_size = 0; /* window= parameter */ +static int64_t window_size = 0; /* current size */ +static char *window = NULL; + /* Highest byte (+1) that has been written in the data stream. */ static uint64_t highestwrite = 0; @@ -73,6 +81,11 @@ streaming_config (const char *key, const char *value) if (size == -1) return -1; } + else if (strcmp (key, "window") == 0) { + window_max_size = nbdkit_parse_size (value); + if (window_max_size == -1) + return -1; + } else { nbdkit_error ("unknown parameter '%s'", key); return -1; @@ -110,18 +123,10 @@ streaming_config_complete (void) return 0; } -/* nbdkit is shutting down. */ -static void -streaming_unload (void) -{ - if (fd >= 0) - close (fd); - free (filename); -} - #define streaming_config_help \ "pipe=<FILENAME> (required) The filename to serve.\n" \ - "size=<SIZE> (optional) Stream size." + "size=<SIZE> (optional) Stream size.\n" \ + "window=<SIZE> (optional) Window size." /* Create the per-connection handle. */ static void * @@ -160,13 +165,66 @@ streaming_get_size (void *handle) return size; } +static int +xwrite (int fd, const char *buf, size_t n) +{ + ssize_t r; + + while (n > 0) { + r = write (fd, buf, n); + if (r == -1) { + nbdkit_error ("write: %m"); + return -1; + } + buf += r; + n -= r; + } + return 0; +} + +static int +xwrite_zeroes (int fd, size_t n) +{ + ssize_t r; + char buf[4096]; + + memset (buf, 0, sizeof buf); + + while (n > 0) { + r = write (fd, buf, min (n, sizeof buf)); + if (r == -1) { + nbdkit_error ("write: %m"); + return -1; + } + n -= r; + } + return 0; +} + +/* +This diagram should help when trying to understand the pread and +pwrite calls below. + +Note that we recursively split read and write calls to make the cases +tractable. + + |<------- window_max_size ------->| + |<---- window_size ----->| + +------------------------+------------------------+--------+---------- + ^ ^ ^ ^ + 0 windowstart highestwrite maxwindow + + */ + /* Write data to the stream. */ static int streaming_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) { - size_t n; - ssize_t r; + int r; + uint64_t windowstart; + uint64_t maxwindow; + int64_t delta; if (errorstate) { nbdkit_error ("unrecoverable error state"); @@ -174,63 +232,199 @@ streaming_pwrite (void *handle, const void *buf, return -1; } - if (offset < highestwrite) { - nbdkit_error ("client tried to seek backwards and write: the streaming plugin does not currently support this"); + /* This just makes the recursive case easier to reason about. */ + if (count == 0) + return 0; + + windowstart = highestwrite - window_size; + + if (offset < windowstart) { + nbdkit_error ("client seeked backwards > window size: you must increase the window size"); errorstate = 1; errno = EIO; return -1; } - /* Need to write some zeroes. */ - if (offset > highestwrite) { - int64_t size = offset - highestwrite; - char buf[4096]; - - memset (buf, 0, sizeof buf); - - while (size > 0) { - n = size > sizeof buf ? sizeof buf : size; - r = write (fd, buf, n); - if (r == -1) { - nbdkit_error ("write: %m"); - errorstate = 1; - return -1; - } - highestwrite += r; - size -= r; - } + /* Split writes across highestwrite and maxwindow boundaries. + * Splitting here means we do not have to deal with writes across + * the boundary in the code below. + */ + if (offset < highestwrite && offset + count > highestwrite) { + uint64_t size = highestwrite - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + maxwindow = windowstart + window_max_size; + + if (offset < maxwindow && offset + count > maxwindow) { + uint64_t size = maxwindow - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + /* Handle a write entirely within the current window. */ + if (offset < highestwrite) { + uint64_t windowoffset = window_size - (highestwrite - offset); + memcpy (&window[windowoffset], buf, count); + return 0; } - /* Write the data. */ - while (count > 0) { - r = write (fd, buf, count); - if (r == -1) { - nbdkit_error ("write: %m"); - errorstate = 1; + /* A write after highestwrite but not larger than maxwindow causes + * the window to be extended but not moved. + */ + if (offset < maxwindow) { + uint64_t new_highestwrite = offset + count; + uint64_t new_size = new_highestwrite - windowstart; + char *new_window; + + new_window = realloc (window, new_size); + if (new_window == NULL) { + nbdkit_error ("realloc: %m"); return -1; } - buf += r; - highestwrite += r; - count -= r; + window = new_window; + /* Make sure the extended window is zeroes to start with. */ + memset (&window[window_size], 0, new_size - window_size); + highestwrite = new_highestwrite; + /* Copy the buffer to the new window. */ + memcpy (&window[offset - windowstart], buf, count); + return 0; + } + + /* Split writes after maxwindow at highestwrite + window_max_size. */ + if (offset < highestwrite + window_max_size && + offset + count > highestwrite + window_max_size) { + uint64_t size = highestwrite + window_max_size - offset; + + r = streaming_pwrite (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pwrite (handle, buf, count, offset); + } + + /* Any write here is going to cause the window to move. Delta is + * the amount by which the window will move (NB: might be greater + * than the window size). + */ + delta = offset + count - highestwrite; + + if (delta <= window_size) { + /* Write out the oldest part of the window. */ + if (xwrite (fd, window, delta) == -1) + return -1; + + /* Move the data in the window down. */ + memmove (window, window + delta, window_size - delta); + + /* Copy the buffer to the new window. */ + memcpy (window + window_size - count, buf, count); + highestwrite += delta; + return 0; } - return 0; + /* The window will move by more than a single window size. Write out + * the whole of the old window, then write zeroes, then continue the + * write. + */ + if (xwrite (fd, window, window_size) == -1) + return -1; + memset (window, 0, window_size); + + if (xwrite_zeroes (fd, delta - window_size) == -1) + return -1; + + highestwrite += delta - window_size; + + return streaming_pwrite (handle, buf, count, offset); } /* Read data back from the stream. */ static int streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset) { + uint64_t windowstart; + int r; + if (errorstate) { nbdkit_error ("unrecoverable error state"); errno = EIO; return -1; } - nbdkit_error ("client tried to read: the streaming plugin does not currently support this"); - errorstate = 1; - errno = EIO; - return -1; + /* This just makes the recursive case easier to reason about. */ + if (count == 0) + return 0; + + windowstart = highestwrite - window_size; + + if (offset < windowstart) { + nbdkit_error ("client seeked backwards > window size: you must increase the window size"); + errorstate = 1; + errno = EIO; + return -1; + } + + /* Split reads across highestwrite boundary. Splitting here means + * we do not have to deal with reads across the boundary in the code + * below. + */ + if (offset < highestwrite && offset + count > highestwrite) { + uint64_t size = highestwrite - offset; + + r = streaming_pread (handle, buf, size, offset); + if (r == -1) + return -1; + buf += size; + offset += size; + count -= size; + return streaming_pread (handle, buf, count, offset); + } + + /* Handle a read entirely within the window by simply reading the + * window contents. + */ + if (offset < highestwrite) { + uint64_t windowoffset = window_size - (highestwrite - offset); + memcpy (buf, &window[windowoffset], count); + return 0; + } + + /* Else any read ahead of the current highest write is returned as + * all zeroes. + */ + memset (buf, 0, count); + return 0; +} + +/* nbdkit is shutting down - the rest of the window should be written out. */ +static void +streaming_unload (void) +{ + if (fd >= 0) { + /* XXX impossible to report an error to the client here */ + xwrite (fd, window, window_size); + + close (fd); + } + + free (window); + free (filename); } static struct nbdkit_plugin plugin = { -- 2.0.4
>From 0c4ffccc7258dcff94cc40abcf470a3c5ad788c3 Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" <rjo...@redhat.com> Date: Tue, 14 Oct 2014 14:31:08 +0200 Subject: [PATCH 2/2] tests: Enable streaming test. --- plugins/streaming/streaming.c | 13 +++++++++++-- tests/Makefile.am | 17 +++++------------ tests/test-streaming.c | 16 +++++++--------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c index 2d08803..da4de63 100644 --- a/plugins/streaming/streaming.c +++ b/plugins/streaming/streaming.c @@ -35,6 +35,8 @@ #include <stdio.h> #include <stdlib.h> +#include <stdint.h> +#include <inttypes.h> #include <string.h> #include <fcntl.h> #include <unistd.h> @@ -232,6 +234,10 @@ streaming_pwrite (void *handle, const void *buf, return -1; } + nbdkit_debug ("pwrite: offset=%" PRIi64 " count=%" PRIu32 + " highestwrite=%" PRIu64, + offset, count, highestwrite); + /* This just makes the recursive case easier to reason about. */ if (count == 0) return 0; @@ -239,7 +245,8 @@ streaming_pwrite (void *handle, const void *buf, windowstart = highestwrite - window_size; if (offset < windowstart) { - nbdkit_error ("client seeked backwards > window size: you must increase the window size"); + nbdkit_error ("pwrite: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")", + highestwrite, window_size); errorstate = 1; errno = EIO; return -1; @@ -298,6 +305,7 @@ streaming_pwrite (void *handle, const void *buf, window = new_window; /* Make sure the extended window is zeroes to start with. */ memset (&window[window_size], 0, new_size - window_size); + window_size = new_size; highestwrite = new_highestwrite; /* Copy the buffer to the new window. */ memcpy (&window[offset - windowstart], buf, count); @@ -374,7 +382,8 @@ streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset) windowstart = highestwrite - window_size; if (offset < windowstart) { - nbdkit_error ("client seeked backwards > window size: you must increase the window size"); + nbdkit_error ("pread: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")", + highestwrite, window_size); errorstate = 1; errno = EIO; return -1; diff --git a/tests/Makefile.am b/tests/Makefile.am index a50e26b..cccd45b 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -172,18 +172,11 @@ test_python_LDADD = libtest.la $(LIBGUESTFS_LIBS) endif # streaming plugin test. +check_PROGRAMS += test-streaming +TESTS += test-streaming -# This is disabled at the moment because the libguestfs appliance -# kernel tries to read from the device (eg to read the partition -# table) and the current streaming plugin cannot handle this. -# Implementing a sliding window in the plugin would fix this. (XXX) -EXTRA_DIST += test-streaming.c - -#check_PROGRAMS += test-streaming -#TESTS += test-streaming -# -#test_streaming_SOURCES = test-streaming.c test.h -#test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) -#test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS) +test_streaming_SOURCES = test-streaming.c test.h +test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) +test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS) endif diff --git a/tests/test-streaming.c b/tests/test-streaming.c index 1631c19..4610fb9 100644 --- a/tests/test-streaming.c +++ b/tests/test-streaming.c @@ -48,7 +48,7 @@ #include "test.h" -static char data[4096]; +static char data[1024]; int main (int argc, char *argv[]) @@ -69,6 +69,8 @@ main (int argc, char *argv[]) if (test_start_nbdkit (NBDKIT_PLUGIN ("streaming"), "pipe=streaming.fifo", + "size=128k", + "window=128k", NULL) == -1) exit (EXIT_FAILURE); @@ -121,14 +123,10 @@ main (int argc, char *argv[]) exit (EXIT_FAILURE); /* Write linearly to the virtual disk. */ - for (i = 0; i < 10; ++i) { - memset (data, i+1, sizeof data); - - /* Note that we deliberately skip forwards, in order to - * exercise seeking code in the streaming plugin. - */ + memset (data, 1, sizeof data); + for (i = 0; i < 32; ++i) { guestfs_pwrite_device (g, "/dev/sda", data, sizeof data, - (2 * i) * sizeof data); + i * sizeof data); } if (guestfs_shutdown (g) == -1) @@ -148,7 +146,7 @@ main (int argc, char *argv[]) } md5[32] = '\0'; - if (strcmp (md5, "0123456789abcdef0123456789abcdef") != 0) { + if (strcmp (md5, "51ae9fa5fb90e9d51c4f1b4260285c99") != 0) { fprintf (stderr, "unexpected hash: %s\n", md5); exit (EXIT_FAILURE); } -- 2.0.4
_______________________________________________ Libguestfs mailing list Libguestfs@redhat.com https://www.redhat.com/mailman/listinfo/libguestfs