For the upcoming support to fixed-ram migration with multifd, we need
to be able to accept an iovec array with non-contiguous data.

Add a pwritev and preadv version that splits the array into contiguous
segments before writing. With that we can have the ram code continue
to add pages in any order and the multifd code continue to send large
arrays for reading and writing.

Signed-off-by: Fabiano Rosas <faro...@suse.de>
---
- split the API that was merged into a single function
- use uintptr_t for compatibility with 32-bit
---
 include/io/channel.h | 26 ++++++++++++++++
 io/channel.c         | 70 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 96 insertions(+)

diff --git a/include/io/channel.h b/include/io/channel.h
index 7986c49c71..25383db5aa 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -559,6 +559,19 @@ int qio_channel_close(QIOChannel *ioc,
 ssize_t qio_channel_pwritev(QIOChannel *ioc, const struct iovec *iov,
                             size_t niov, off_t offset, Error **errp);
 
+/**
+ * qio_channel_pwritev_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @offset: the iovec offset in the file where to write the data
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Returns: 0 if all bytes were written, or -1 on error
+ */
+int qio_channel_pwritev_all(QIOChannel *ioc, const struct iovec *iov,
+                            size_t niov, off_t offset, Error **errp);
+
 /**
  * qio_channel_pwrite
  * @ioc: the channel object
@@ -595,6 +608,19 @@ ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, 
size_t buflen,
 ssize_t qio_channel_preadv(QIOChannel *ioc, const struct iovec *iov,
                            size_t niov, off_t offset, Error **errp);
 
+/**
+ * qio_channel_preadv_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to read data to
+ * @niov: the length of the @iov array
+ * @offset: the iovec offset in the file from where to read the data
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Returns: 0 if all bytes were read, or -1 on error
+ */
+int qio_channel_preadv_all(QIOChannel *ioc, const struct iovec *iov,
+                           size_t niov, off_t offset, Error **errp);
+
 /**
  * qio_channel_pread
  * @ioc: the channel object
diff --git a/io/channel.c b/io/channel.c
index a1f12f8e90..2f1745d052 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -472,6 +472,69 @@ ssize_t qio_channel_pwritev(QIOChannel *ioc, const struct 
iovec *iov,
     return klass->io_pwritev(ioc, iov, niov, offset, errp);
 }
 
+static int qio_channel_preadv_pwritev_contiguous(QIOChannel *ioc,
+                                                 const struct iovec *iov,
+                                                 size_t niov, off_t offset,
+                                                 bool is_write, Error **errp)
+{
+    ssize_t ret = -1;
+    int i, slice_idx, slice_num;
+    uintptr_t base, next, file_offset;
+    size_t len;
+
+    slice_idx = 0;
+    slice_num = 1;
+
+    /*
+     * If the iov array doesn't have contiguous elements, we need to
+     * split it in slices because we only have one (file) 'offset' for
+     * the whole iov. Do this here so callers don't need to break the
+     * iov array themselves.
+     */
+    for (i = 0; i < niov; i++, slice_num++) {
+        base = (uintptr_t) iov[i].iov_base;
+
+        if (i != niov - 1) {
+            len = iov[i].iov_len;
+            next = (uintptr_t) iov[i + 1].iov_base;
+
+            if (base + len == next) {
+                continue;
+            }
+        }
+
+        /*
+         * Use the offset of the first element of the segment that
+         * we're sending.
+         */
+        file_offset = offset + (uintptr_t) iov[slice_idx].iov_base;
+
+        if (is_write) {
+            ret = qio_channel_pwritev(ioc, &iov[slice_idx], slice_num,
+                                      file_offset, errp);
+        } else {
+            ret = qio_channel_preadv(ioc, &iov[slice_idx], slice_num,
+                                     file_offset, errp);
+        }
+
+        if (ret < 0) {
+            break;
+        }
+
+        slice_idx += slice_num;
+        slice_num = 0;
+    }
+
+    return (ret < 0) ? -1 : 0;
+}
+
+int qio_channel_pwritev_all(QIOChannel *ioc, const struct iovec *iov,
+                            size_t niov, off_t offset, Error **errp)
+{
+    return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
+                                                 offset, true, errp);
+}
+
 ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, size_t buflen,
                            off_t offset, Error **errp)
 {
@@ -501,6 +564,13 @@ ssize_t qio_channel_preadv(QIOChannel *ioc, const struct 
iovec *iov,
     return klass->io_preadv(ioc, iov, niov, offset, errp);
 }
 
+int qio_channel_preadv_all(QIOChannel *ioc, const struct iovec *iov,
+                           size_t niov, off_t offset, Error **errp)
+{
+    return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
+                                                 offset, false, errp);
+}
+
 ssize_t qio_channel_pread(QIOChannel *ioc, char *buf, size_t buflen,
                           off_t offset, Error **errp)
 {
-- 
2.35.3


Reply via email to