Patchwork [v3,13/15] nbd: switch to asynchronous operation

login
register
mail settings
Submitter Paolo Bonzini
Date Oct. 5, 2011, 7:17 a.m.
Message ID <1317799065-29668-14-git-send-email-pbonzini@redhat.com>
Download mbox | patch
Permalink /patch/117765/
State New
Headers show

Comments

Paolo Bonzini - Oct. 5, 2011, 7:17 a.m.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c |  218 +++++++++++++++++++++++++++++++++++++++--------------------
 nbd.c       |    8 ++
 2 files changed, 152 insertions(+), 74 deletions(-)

Patch

diff --git a/block/nbd.c b/block/nbd.c
index 35c15c8..90c4f2f 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -53,6 +53,11 @@  typedef struct BDRVNBDState {
     size_t blocksize;
     char *export_name; /* An NBD server may export several devices */
 
+    CoMutex mutex;
+    Coroutine *coroutine;
+
+    struct nbd_reply reply;
+
     /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
      * it's a string of the form <hostname|ip4|\[ip6\]>:port
      */
@@ -105,6 +110,95 @@  out:
     return err;
 }
 
+static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
+{
+    qemu_co_mutex_lock(&s->mutex);
+    s->coroutine = qemu_coroutine_self();
+    request->handle = (uint64_t)(intptr_t)s;
+}
+
+static int nbd_have_request(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    return !!s->coroutine;
+}
+
+static void nbd_reply_ready(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+
+    if (s->reply.handle == 0) {
+        /* No reply already in flight.  Fetch a header.  */
+        if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+            s->reply.handle = 0;
+        }
+    }
+
+    /* There's no need for a mutex on the receive side, because the
+     * handler acts as a synchronization point and ensures that only
+     * one coroutine is called until the reply finishes.  */
+    if (s->coroutine) {
+        qemu_coroutine_enter(s->coroutine, NULL);
+    }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    qemu_coroutine_enter(s->coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
+                               struct iovec *iov, int offset)
+{
+    int rc, ret;
+
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
+                            nbd_have_request, NULL, s);
+    rc = nbd_send_request(s->sock, request);
+    if (rc != -1 && iov) {
+        ret = qemu_co_sendv(s->sock, iov, request->len, offset);
+        if (ret != request->len) {
+            errno = -EIO;
+            rc = -1;
+        }
+    }
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+                            nbd_have_request, NULL, s);
+    return rc;
+}
+
+static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
+                                 struct nbd_reply *reply,
+                                 struct iovec *iov, int offset)
+{
+    int ret;
+
+    /* Wait until we're woken up by the read handler.  */
+    qemu_coroutine_yield();
+    *reply = s->reply;
+    if (reply->handle != request->handle) {
+        reply->error = EIO;
+    } else {
+        if (iov && reply->error == 0) {
+            ret = qemu_co_recvv(s->sock, iov, request->len, offset);
+            if (ret != request->len) {
+                reply->error = EIO;
+            }
+        }
+
+        /* Tell the read handler to read another header.  */
+        s->reply.handle = 0;
+    }
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
+{
+    s->coroutine = NULL;
+    qemu_co_mutex_unlock(&s->mutex);
+}
+
 static int nbd_establish_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
@@ -134,8 +228,11 @@  static int nbd_establish_connection(BlockDriverState *bs)
         return -errno;
     }
 
-    /* Now that we're connected, set the socket to be non-blocking */
+    /* Now that we're connected, set the socket to be non-blocking and
+     * kick the reply mechanism.  */
     socket_set_nonblock(sock);
+    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+                            nbd_have_request, NULL, s);
 
     s->sock = sock;
     s->size = size;
@@ -151,11 +248,11 @@  static void nbd_teardown_connection(BlockDriverState *bs)
     struct nbd_request request;
 
     request.type = NBD_CMD_DISC;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = 0;
     request.len = 0;
     nbd_send_request(s->sock, &request);
 
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
     closesocket(s->sock);
 }
 
@@ -164,6 +261,8 @@  static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
     BDRVNBDState *s = bs->opaque;
     int result;
 
+    qemu_co_mutex_init(&s->mutex);
+
     /* Pop the config into our state object. Exit if invalid. */
     result = nbd_config(s, filename, flags);
     if (result != 0) {
@@ -178,38 +277,30 @@  static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
     return result;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
     struct nbd_reply reply;
 
     request.type = NBD_CMD_READ;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
-        return -EIO;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 
-    return 0;
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                         int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
@@ -220,29 +311,20 @@  static int nbd_write(BlockDriverState *bs, int64_t sector_num,
         request.type |= NBD_CMD_FLAG_FUA;
     }
 
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
-        return -EIO;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    return 0;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 }
 
-static int nbd_flush(BlockDriverState *bs)
+static int nbd_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
@@ -257,27 +339,21 @@  static int nbd_flush(BlockDriverState *bs)
         request.type |= NBD_CMD_FLAG_FUA;
     }
 
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = 0;
     request.len = 0;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error != 0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    return 0;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 }
 
-static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
-                       int nb_sectors)
+static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num,
+                          int nb_sectors)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
@@ -287,23 +363,17 @@  static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
         return 0;
     }
     request.type = NBD_CMD_TRIM;
-    request.handle = (uint64_t)(intptr_t)bs;
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
-
-    return 0;
+    nbd_coroutine_start(s, &request);
+    if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+        reply.error = errno;
+    } else {
+        nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+    }
+    nbd_coroutine_end(s, &request);
+    return -reply.error;
 }
 
 static void nbd_close(BlockDriverState *bs)
@@ -326,11 +396,11 @@  static BlockDriver bdrv_nbd = {
     .format_name	= "nbd",
     .instance_size	= sizeof(BDRVNBDState),
     .bdrv_file_open	= nbd_open,
-    .bdrv_read		= nbd_read,
-    .bdrv_write		= nbd_write,
+    .bdrv_co_readv	= nbd_co_readv,
+    .bdrv_co_writev	= nbd_co_writev,
     .bdrv_close		= nbd_close,
-    .bdrv_flush		= nbd_flush,
-    .bdrv_discard	= nbd_discard,
+    .bdrv_co_flush	= nbd_co_flush,
+    .bdrv_co_discard	= nbd_co_discard,
     .bdrv_getlength	= nbd_getlength,
     .protocol_name	= "nbd",
 };
diff --git a/nbd.c b/nbd.c
index 418b020..6a96878 100644
--- a/nbd.c
+++ b/nbd.c
@@ -81,6 +81,14 @@  size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
 {
     size_t offset = 0;
 
+    if (qemu_in_coroutine()) {
+        if (do_read) {
+            return qemu_co_recv(fd, buffer, size);
+        } else {
+            return qemu_co_send(fd, buffer, size);
+        }
+    }
+
     while (offset < size) {
         ssize_t len;