diff mbox series

[RFC,v1,20/26] io: Add a pwritev/preadv version that takes a discontiguous iovec

Message ID 20230330180336.2791-21-farosas@suse.de
State New
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas March 30, 2023, 6:03 p.m. UTC
For the upcoming support to fixed-ram migration with multifd, we need
to be able to accept an iovec array with non-contiguous data.

Add a pwritev and preadv version that splits the array into contiguous
segments before writing. With that we can have the ram code continue
to add pages in any order and the multifd code continue to send large
arrays for reading and writing.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
Since iovs can be non contiguous, we'd need a separate array on the
side to carry an extra file offset for each of them, so I'm relying on
the fact that iovs are all within a same host page and passing in an
encoded offset that takes the host page into account.
---
 include/io/channel.h | 50 +++++++++++++++++++++++++++
 io/channel.c         | 82 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 132 insertions(+)
diff mbox series

Patch

diff --git a/include/io/channel.h b/include/io/channel.h
index 28bce7ef17..7c4d2432cf 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -33,8 +33,10 @@  OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
 #define QIO_CHANNEL_ERR_BLOCK -2
 
 #define QIO_CHANNEL_WRITE_FLAG_ZERO_COPY 0x1
+#define QIO_CHANNEL_WRITE_FLAG_WITH_OFFSET 0x2
 
 #define QIO_CHANNEL_READ_FLAG_MSG_PEEK 0x1
+#define QIO_CHANNEL_READ_FLAG_WITH_OFFSET 0x2
 
 typedef enum QIOChannelFeature QIOChannelFeature;
 
@@ -541,6 +543,30 @@  int qio_channel_close(QIOChannel *ioc,
 ssize_t qio_channel_pwritev_full(QIOChannel *ioc, const struct iovec *iov,
                                  size_t niov, off_t offset, Error **errp);
 
+/**
+ * qio_channel_write_full_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @offset: the iovec offset in the file where to write the data
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
+ * @errp: pointer to a NULL-initialized error object
+ *
+ *
+ * Selects between a writev or pwritev channel writer function.
+ *
+ * If QIO_CHANNEL_WRITE_FLAG_OFFSET is passed in flags, pwritev is
+ * used and @offset is expected to be a meaningful value, @fds and
+ * @nfds are ignored; otherwise uses writev and @offset is ignored.
+ *
+ * Returns: 0 if all bytes were written, or -1 on error
+ */
+int qio_channel_write_full_all(QIOChannel *ioc, const struct iovec *iov,
+                               size_t niov, off_t offset, int *fds, size_t nfds,
+                               int flags, Error **errp);
+
 /**
  * qio_channel_pwritev
  * @ioc: the channel object
@@ -577,6 +603,30 @@  ssize_t qio_channel_pwritev(QIOChannel *ioc, char *buf, size_t buflen,
 ssize_t qio_channel_preadv_full(QIOChannel *ioc, const struct iovec *iov,
                                 size_t niov, off_t offset, Error **errp);
 
+/**
+ * qio_channel_read_full_all:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to read data to
+ * @niov: the length of the @iov array
+ * @offset: the iovec offset in the file from where to read the data
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @flags: read flags (QIO_CHANNEL_READ_FLAG_*)
+ * @errp: pointer to a NULL-initialized error object
+ *
+ *
+ * Selects between a readv or preadv channel reader function.
+ *
+ * If QIO_CHANNEL_READ_FLAG_OFFSET is passed in flags, preadv is
+ * used and @offset is expected to be a meaningful value, @fds and
+ * @nfds are ignored; otherwise uses readv and @offset is ignored.
+ *
+ * Returns: 0 if all bytes were read, or -1 on error
+ */
+int qio_channel_read_full_all(QIOChannel *ioc, const struct iovec *iov,
+                              size_t niov, off_t offset,
+                              int flags, Error **errp);
+
 /**
  * qio_channel_preadv
  * @ioc: the channel object
diff --git a/io/channel.c b/io/channel.c
index 312445b3aa..64b26040c2 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -463,6 +463,76 @@  ssize_t qio_channel_pwritev_full(QIOChannel *ioc, const struct iovec *iov,
     return klass->io_pwritev(ioc, iov, niov, offset, errp);
 }
 
+static int qio_channel_preadv_pwritev_contiguous(QIOChannel *ioc,
+                                                 const struct iovec *iov,
+                                                 size_t niov, off_t offset,
+                                                 bool is_write, Error **errp)
+{
+    ssize_t ret;
+    int i, slice_idx, slice_num;
+    uint64_t base, next, file_offset;
+    size_t len;
+
+    slice_idx = 0;
+    slice_num = 1;
+
+    /*
+     * If the iov array doesn't have contiguous elements, we need to
+     * split it in slices because we only have one (file) 'offset' for
+     * the whole iov. Do this here so callers don't need to break the
+     * iov array themselves.
+     */
+    for (i = 0; i < niov; i++, slice_num++) {
+        base = (uint64_t) iov[i].iov_base;
+
+        if (i != niov - 1) {
+            len = iov[i].iov_len;
+            next = (uint64_t) iov[i + 1].iov_base;
+
+            if (base + len == next) {
+                continue;
+            }
+        }
+
+        /*
+         * Use the offset of the first element of the segment that
+         * we're sending.
+         */
+        file_offset = offset + (uint64_t) iov[slice_idx].iov_base;
+
+        if (is_write) {
+            ret = qio_channel_pwritev_full(ioc, &iov[slice_idx], slice_num,
+                                           file_offset, errp);
+        } else {
+            ret = qio_channel_preadv_full(ioc, &iov[slice_idx], slice_num,
+                                          file_offset, errp);
+        }
+
+        if (ret < 0) {
+            break;
+        }
+
+        slice_idx += slice_num;
+        slice_num = 0;
+    }
+
+    return (ret < 0) ? -1 : 0;
+}
+
+int qio_channel_write_full_all(QIOChannel *ioc,
+                                const struct iovec *iov,
+                                size_t niov, off_t offset,
+                                int *fds, size_t nfds,
+                                int flags, Error **errp)
+{
+    if (flags & QIO_CHANNEL_WRITE_FLAG_WITH_OFFSET) {
+        return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
+                                                     offset, true, errp);
+    }
+
+    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, flags, errp);
+}
+
 ssize_t qio_channel_pwritev(QIOChannel *ioc, char *buf, size_t buflen,
                             off_t offset, Error **errp)
 {
@@ -492,6 +562,18 @@  ssize_t qio_channel_preadv_full(QIOChannel *ioc, const struct iovec *iov,
     return klass->io_preadv(ioc, iov, niov, offset, errp);
 }
 
+int qio_channel_read_full_all(QIOChannel *ioc, const struct iovec *iov,
+                              size_t niov, off_t offset,
+                              int flags, Error **errp)
+{
+    if (flags & QIO_CHANNEL_READ_FLAG_WITH_OFFSET) {
+        return qio_channel_preadv_pwritev_contiguous(ioc, iov, niov,
+                                                     offset, false, errp);
+    }
+
+    return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
+}
+
 ssize_t qio_channel_preadv(QIOChannel *ioc, char *buf, size_t buflen,
                            off_t offset, Error **errp)
 {