[PULL,104/111] virtiofsd: process requests in a thread pool
diff mbox series

Message ID 20200123115841.138849-105-dgilbert@redhat.com
State New
Headers show
Series
  • virtiofs queue
Related show

Commit Message

Dr. David Alan Gilbert Jan. 23, 2020, 11:58 a.m. UTC
From: Stefan Hajnoczi <stefanha@redhat.com>

Introduce a thread pool so that fv_queue_thread() just pops
VuVirtqElements and hands them to the thread pool.  For the time being
only one worker thread is allowed since passthrough_ll.c is not
thread-safe yet.  Future patches will lift this restriction so that
multiple FUSE requests can be processed in parallel.

The main new concept is struct FVRequest, which contains both
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
fv_QueueInfo's fields are moved into FVRequest because they are
per-request.  The name FVRequest conforms to QEMU coding style and I
expect the struct fv_* types will be renamed in a future refactoring.

This patch series is not optimal.  fbuf reuse is dropped so each request
does malloc(se->bufsize), but there is no clean and cheap way to keep
this with a thread pool.  The vq_lock mutex is held for longer than
necessary, especially during the eventfd_write() syscall.  Performance
can be improved in the future.

prctl(2) had to be added to the seccomp whitelist because glib invokes
it.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
---
 tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++---------------
 1 file changed, 201 insertions(+), 158 deletions(-)

Patch
diff mbox series

diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index f6242f9338..0dcf2ef57a 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -22,6 +22,7 @@ 
 
 #include <assert.h>
 #include <errno.h>
+#include <glib.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -37,17 +38,28 @@ 
 struct fv_VuDev;
 struct fv_QueueInfo {
     pthread_t thread;
+    /*
+     * This lock protects the VuVirtq preventing races between
+     * fv_queue_thread() and fv_queue_worker().
+     */
+    pthread_mutex_t vq_lock;
+
     struct fv_VuDev *virtio_dev;
 
     /* Our queue index, corresponds to array position */
     int qidx;
     int kick_fd;
     int kill_fd; /* For killing the thread */
+};
 
-    /* The element for the command currently being processed */
-    VuVirtqElement *qe;
+/* A FUSE request */
+typedef struct {
+    VuVirtqElement elem;
+    struct fuse_chan ch;
+
+    /* Used to complete requests that involve no reply */
     bool reply_sent;
-};
+} FVRequest;
 
 /*
  * We pass the dev element into libvhost-user
@@ -191,8 +203,11 @@  static void copy_iov(struct iovec *src_iov, int src_count,
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
                     struct iovec *iov, int count)
 {
-    VuVirtqElement *elem;
-    VuVirtq *q;
+    FVRequest *req = container_of(ch, FVRequest, ch);
+    struct fv_QueueInfo *qi = ch->qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
+    VuVirtqElement *elem = &req->elem;
     int ret = 0;
 
     assert(count >= 1);
@@ -205,11 +220,7 @@  int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 
     /* unique == 0 is notification, which we don't support */
     assert(out->unique);
-    /* For virtio we always have ch */
-    assert(ch);
-    assert(!ch->qi->reply_sent);
-    elem = ch->qi->qe;
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+    assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
     unsigned int in_num = elem->in_num;
@@ -236,9 +247,15 @@  int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
     }
 
     copy_iov(iov, count, in_sg, in_num, tosend_len);
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-    vu_queue_notify(&se->virtio_dev->dev, q);
-    ch->qi->reply_sent = true;
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+    req->reply_sent = true;
 
 err:
     return ret;
@@ -254,9 +271,12 @@  int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
                          size_t len)
 {
+    FVRequest *req = container_of(ch, FVRequest, ch);
+    struct fv_QueueInfo *qi = ch->qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
+    VuVirtqElement *elem = &req->elem;
     int ret = 0;
-    VuVirtqElement *elem;
-    VuVirtq *q;
 
     assert(count >= 1);
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
@@ -275,11 +295,7 @@  int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
     /* unique == 0 is notification which we don't support */
     assert(out->unique);
 
-    /* For virtio we always have ch */
-    assert(ch);
-    assert(!ch->qi->reply_sent);
-    elem = ch->qi->qe;
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
+    assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
     unsigned int in_num = elem->in_num;
@@ -395,33 +411,175 @@  int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
 
     ret = 0;
 
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
-    vu_queue_notify(&se->virtio_dev->dev, q);
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
 
 err:
     if (ret == 0) {
-        ch->qi->reply_sent = true;
+        req->reply_sent = true;
     }
 
     return ret;
 }
 
+/* Process one FVRequest in a thread pool */
+static void fv_queue_worker(gpointer data, gpointer user_data)
+{
+    struct fv_QueueInfo *qi = user_data;
+    struct fuse_session *se = qi->virtio_dev->se;
+    struct VuDev *dev = &qi->virtio_dev->dev;
+    FVRequest *req = data;
+    VuVirtqElement *elem = &req->elem;
+    struct fuse_buf fbuf = {};
+    bool allocated_bufv = false;
+    struct fuse_bufvec bufv;
+    struct fuse_bufvec *pbufv;
+
+    assert(se->bufsize > sizeof(struct fuse_in_header));
+
+    /*
+     * An element contains one request and the space to send our response
+     * They're spread over multiple descriptors in a scatter/gather set
+     * and we can't trust the guest to keep them still; so copy in/out.
+     */
+    fbuf.mem = malloc(se->bufsize);
+    assert(fbuf.mem);
+
+    fuse_mutex_init(&req->ch.lock);
+    req->ch.fd = -1;
+    req->ch.qi = qi;
+
+    /* The 'out' part of the elem is from qemu */
+    unsigned int out_num = elem->out_num;
+    struct iovec *out_sg = elem->out_sg;
+    size_t out_len = iov_size(out_sg, out_num);
+    fuse_log(FUSE_LOG_DEBUG,
+             "%s: elem %d: with %d out desc of length %zd\n",
+             __func__, elem->index, out_num, out_len);
+
+    /*
+     * The elem should contain a 'fuse_in_header' (in to fuse)
+     * plus the data based on the len in the header.
+     */
+    if (out_len < sizeof(struct fuse_in_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
+                 __func__, elem->index);
+        assert(0); /* TODO */
+    }
+    if (out_len > se->bufsize) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
+                 elem->index);
+        assert(0); /* TODO */
+    }
+    /* Copy just the first element and look at it */
+    copy_from_iov(&fbuf, 1, out_sg);
+
+    pbufv = NULL; /* Compiler thinks an unitialised path */
+    if (out_num > 2 &&
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
+        /*
+         * For a write we don't actually need to copy the
+         * data, we can just do it straight out of guest memory
+         * but we must still copy the headers in case the guest
+         * was nasty and changed them while we were using them.
+         */
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
+
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
+        fbuf.mem += out_sg->iov_len;
+        copy_from_iov(&fbuf, 1, out_sg + 1);
+        fbuf.mem -= out_sg->iov_len;
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
+
+        /* Allocate the bufv, with space for the rest of the iov */
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
+                       sizeof(struct fuse_buf) * (out_num - 2));
+        if (!pbufv) {
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
+                    __func__);
+            goto out;
+        }
+
+        allocated_bufv = true;
+        pbufv->count = 1;
+        pbufv->buf[0] = fbuf;
+
+        size_t iovindex, pbufvindex;
+        iovindex = 2; /* 2 headers, separate iovs */
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
+
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
+            pbufv->count++;
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
+            pbufv->buf[pbufvindex].flags = 0;
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
+        }
+    } else {
+        /* Normal (non fast write) path */
+
+        /* Copy the rest of the buffer */
+        fbuf.mem += out_sg->iov_len;
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
+        fbuf.mem -= out_sg->iov_len;
+        fbuf.size = out_len;
+
+        /* TODO! Endianness of header */
+
+        /* TODO: Add checks for fuse_session_exited */
+        bufv.buf[0] = fbuf;
+        bufv.count = 1;
+        pbufv = &bufv;
+    }
+    pbufv->idx = 0;
+    pbufv->off = 0;
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
+
+out:
+    if (allocated_bufv) {
+        free(pbufv);
+    }
+
+    /* If the request has no reply, still recycle the virtqueue element */
+    if (!req->reply_sent) {
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
+
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
+                 elem->index);
+
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+        pthread_mutex_lock(&qi->vq_lock);
+        vu_queue_push(dev, q, elem, 0);
+        vu_queue_notify(dev, q);
+        pthread_mutex_unlock(&qi->vq_lock);
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    }
+
+    pthread_mutex_destroy(&req->ch.lock);
+    free(fbuf.mem);
+    free(req);
+}
+
 /* Thread function for individual queues, created when a queue is 'started' */
 static void *fv_queue_thread(void *opaque)
 {
     struct fv_QueueInfo *qi = opaque;
     struct VuDev *dev = &qi->virtio_dev->dev;
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
-    struct fuse_session *se = qi->virtio_dev->se;
-    struct fuse_chan ch;
-    struct fuse_buf fbuf;
+    GThreadPool *pool;
 
-    fbuf.mem = NULL;
-    fbuf.flags = 0;
-
-    fuse_mutex_init(&ch.lock);
-    ch.fd = (int)0xdaff0d111;
-    ch.qi = qi;
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
+                             TRUE, NULL);
+    if (!pool) {
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
+        return NULL;
+    }
 
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
              qi->qidx, qi->kick_fd);
@@ -478,6 +636,7 @@  static void *fv_queue_thread(void *opaque)
         /* Mutual exclusion with virtio_loop() */
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
         assert(ret == 0); /* there is no possible error case */
+        pthread_mutex_lock(&qi->vq_lock);
         /* out is from guest, in is too guest */
         unsigned int in_bytes, out_bytes;
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
@@ -486,141 +645,22 @@  static void *fv_queue_thread(void *opaque)
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
 
-
         while (1) {
-            bool allocated_bufv = false;
-            struct fuse_bufvec bufv;
-            struct fuse_bufvec *pbufv;
-
-            /*
-             * An element contains one request and the space to send our
-             * response They're spread over multiple descriptors in a
-             * scatter/gather set and we can't trust the guest to keep them
-             * still; so copy in/out.
-             */
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
-            if (!elem) {
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
+            if (!req) {
                 break;
             }
 
-            qi->qe = elem;
-            qi->reply_sent = false;
+            req->reply_sent = false;
 
-            if (!fbuf.mem) {
-                fbuf.mem = malloc(se->bufsize);
-                assert(fbuf.mem);
-                assert(se->bufsize > sizeof(struct fuse_in_header));
-            }
-            /* The 'out' part of the elem is from qemu */
-            unsigned int out_num = elem->out_num;
-            struct iovec *out_sg = elem->out_sg;
-            size_t out_len = iov_size(out_sg, out_num);
-            fuse_log(FUSE_LOG_DEBUG,
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
-                     elem->index, out_num, out_len);
-
-            /*
-             * The elem should contain a 'fuse_in_header' (in to fuse)
-             * plus the data based on the len in the header.
-             */
-            if (out_len < sizeof(struct fuse_in_header)) {
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
-                         __func__, elem->index);
-                assert(0); /* TODO */
-            }
-            if (out_len > se->bufsize) {
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
-                         __func__, elem->index);
-                assert(0); /* TODO */
-            }
-            /* Copy just the first element and look at it */
-            copy_from_iov(&fbuf, 1, out_sg);
-
-            if (out_num > 2 &&
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
-                /*
-                 * For a write we don't actually need to copy the
-                 * data, we can just do it straight out of guest memory
-                 * but we must still copy the headers in case the guest
-                 * was nasty and changed them while we were using them.
-                 */
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
-
-                /* copy the fuse_write_in header after the fuse_in_header */
-                fbuf.mem += out_sg->iov_len;
-                copy_from_iov(&fbuf, 1, out_sg + 1);
-                fbuf.mem -= out_sg->iov_len;
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
-
-                /* Allocate the bufv, with space for the rest of the iov */
-                allocated_bufv = true;
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
-                               sizeof(struct fuse_buf) * (out_num - 2));
-                if (!pbufv) {
-                    vu_queue_unpop(dev, q, elem, 0);
-                    free(elem);
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
-                             __func__);
-                    goto out;
-                }
-
-                pbufv->count = 1;
-                pbufv->buf[0] = fbuf;
-
-                size_t iovindex, pbufvindex;
-                iovindex = 2; /* 2 headers, separate iovs */
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
-
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
-                    pbufv->count++;
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
-                    pbufv->buf[pbufvindex].flags = 0;
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
-                }
-            } else {
-                /* Normal (non fast write) path */
-
-                /* Copy the rest of the buffer */
-                fbuf.mem += out_sg->iov_len;
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
-                fbuf.mem -= out_sg->iov_len;
-                fbuf.size = out_len;
-
-                /* TODO! Endianness of header */
-
-                /* TODO: Add checks for fuse_session_exited */
-                bufv.buf[0] = fbuf;
-                bufv.count = 1;
-                pbufv = &bufv;
-            }
-            pbufv->idx = 0;
-            pbufv->off = 0;
-            fuse_session_process_buf_int(se, pbufv, &ch);
-
-            if (allocated_bufv) {
-                free(pbufv);
-            }
-
-            if (!qi->reply_sent) {
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
-                         __func__, elem->index);
-                /* I think we've still got to recycle the element */
-                vu_queue_push(dev, q, elem, 0);
-                vu_queue_notify(dev, q);
-            }
-            qi->qe = NULL;
-            free(elem);
-            elem = NULL;
+            g_thread_pool_push(pool, req, NULL);
         }
 
+        pthread_mutex_unlock(&qi->vq_lock);
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
     }
-out:
-    pthread_mutex_destroy(&ch.lock);
-    free(fbuf.mem);
+
+    g_thread_pool_free(pool, FALSE, TRUE);
 
     return NULL;
 }
@@ -643,6 +683,7 @@  static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
                  __func__, qidx, ret);
     }
+    pthread_mutex_destroy(&ourqi->vq_lock);
     close(ourqi->kill_fd);
     ourqi->kick_fd = -1;
     free(vud->qi[qidx]);
@@ -696,6 +737,8 @@  static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
         assert(ourqi->kill_fd != -1);
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
+
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
                      __func__, qidx);