diff mbox

[11/12] nbd: switch to asynchronous operation

Message ID 1315495505-28906-12-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Sept. 8, 2011, 3:25 p.m. UTC
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c |  167 ++++++++++++++++++++++++++++++++++++++--------------------
 nbd.c       |    8 +++
 2 files changed, 117 insertions(+), 58 deletions(-)

Comments

Nicholas Thomas Sept. 9, 2011, 2:52 p.m. UTC | #1
On 08/09/11 16:25, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c |  167 ++++++++++++++++++++++++++++++++++++++--------------------
>  nbd.c       |    8 +++
>  2 files changed, 117 insertions(+), 58 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 964caa8..5a75263 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -52,6 +52,9 @@ typedef struct BDRVNBDState {
>      size_t blocksize;
>      char *export_name; /* An NBD server may export several devices */
>  
> +    CoMutex mutex;
> +    Coroutine *coroutine;
> +
>      /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
>       * it's a string of the form <hostname|ip4|\[ip6\]>:port
>       */
> @@ -104,6 +107,37 @@ out:
>      return err;
>  }
>  
> +static void nbd_coroutine_start(BDRVNBDState *s)
> +{
> +    qemu_co_mutex_lock(&s->mutex);
> +    s->coroutine = qemu_coroutine_self();
> +}
> +
> +static void nbd_coroutine_enter(void *opaque)
> +{
> +    BDRVNBDState *s = opaque;
> +    qemu_coroutine_enter(s->coroutine, NULL);
> +}
> +
> +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request)
> +{
> +    qemu_aio_set_fd_handler(s->sock, NULL, nbd_coroutine_enter, NULL, NULL, s);
> +    return nbd_send_request(s->sock, request);
> +}
> +
> +static int nbd_co_receive_reply(BDRVNBDState *s, struct nbd_reply *reply)
> +{
> +    qemu_aio_set_fd_handler(s->sock, nbd_coroutine_enter, NULL, NULL, NULL, s);
> +    return nbd_receive_reply(s->sock, reply);
> +}
> +
> +static void nbd_coroutine_end(BDRVNBDState *s)
> +{
> +    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, s);
> +    s->coroutine = NULL;
> +    qemu_co_mutex_unlock(&s->mutex);
> +}
> +
>  static int nbd_establish_connection(BlockDriverState *bs)
>  {
>      BDRVNBDState *s = bs->opaque;
> @@ -163,6 +197,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
>      BDRVNBDState *s = bs->opaque;
>      int result;
>  
> +    qemu_co_mutex_init(&s->mutex);
> +
>      /* Pop the config into our state object. Exit if invalid. */
>      result = nbd_config(s, filename, flags);
>      if (result != 0) {
> @@ -177,8 +213,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
>      return result;
>  }
>  
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> -                    uint8_t *buf, int nb_sectors)
> +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
> +                        int nb_sectors, QEMUIOVector *qiov)
>  {
>      BDRVNBDState *s = bs->opaque;
>      struct nbd_request request;
> @@ -189,30 +225,39 @@ static int nbd_read(BlockDriverState *bs, int64_t sector_num,
>      request.from = sector_num * 512;;
>      request.len = nb_sectors * 512;
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> -        return -errno;
> -
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> -        return -errno;
> -
> -    if (reply.error !=0)
> -        return -reply.error;
> -
> -    if (reply.handle != request.handle)
> -        return -EIO;
> +    nbd_coroutine_start(s);
> +    if (nbd_co_send_request(s, &request) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    if (nbd_co_receive_reply(s, &reply) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    if (reply.error != 0) {
> +        goto done;
> +    }
> +    if (reply.handle != request.handle) {
> +        reply.error = EIO;
> +        goto done;
> +    }
> +    if (qemu_co_recvv(s->sock, qiov->iov, request.len, 0) != request.len) {
> +        reply.error = EIO;
> +    }
>  
> -    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> -        return -EIO;
> +done:
> +    nbd_coroutine_end(s);
> +    return -reply.error;
>  
> -    return 0;
>  }

I'm a bit unsure here, actually. So you lock a mutex, send a request,
wait for a response, then unlock the mutex. Surely this code doesn't
allow more than one request to be in flight at a time?

My approach was to write the request to the socket as soon as possible.
IIRC, QEMU can have up to 16 IOs outstanding, so this gave us "some"
speedup, although I don't have formal before/after benchmarks. For
testing, speed isn't too important, but we're using NBD in production to
run VMs on a 10GigE network with SAS and SSD storage, so we're somewhat
interested in performance :).

If this *is* letting > 1 request be on the wire at a time, then
request.handle needs to be unique to the request. I don't think:

    request.handle = (uint64_t)(intptr_t)bs;

is.


> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> -                     const uint8_t *buf, int nb_sectors)
> +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
> +                         int nb_sectors, QEMUIOVector *qiov)
>  {
>      BDRVNBDState *s = bs->opaque;
>      struct nbd_request request;
>      struct nbd_reply reply;
> +    int ret;
>  
>      request.type = NBD_CMD_WRITE;
>      if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) {
> @@ -223,25 +268,30 @@ static int nbd_write(BlockDriverState *bs, int64_t sector_num,
>      request.from = sector_num * 512;;
>      request.len = nb_sectors * 512;
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> -        return -errno;
> -
> -    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> -        return -EIO;
> -
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> -        return -errno;
> -
> -    if (reply.error !=0)
> -        return -reply.error;
> -
> -    if (reply.handle != request.handle)
> -        return -EIO;
> +    nbd_coroutine_start(s);
> +    if (nbd_co_send_request(s, &request) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    ret = qemu_co_sendv(s->sock, qiov->iov, request.len, 0);
> +    if (ret != request.len) {
> +        reply.error = EIO;
> +        goto done;
> +    }
> +    if (nbd_co_receive_reply(s, &reply) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    if (reply.handle != request.handle) {
> +        reply.error = EIO;
> +    }
>  
> -    return 0;
> +done:
> +    nbd_coroutine_end(s);
> +    return -reply.error;
>  }
>  
> -static int nbd_flush(BlockDriverState *bs)
> +static int nbd_co_flush(BlockDriverState *bs)
>  {
>      BDRVNBDState *s = bs->opaque;
>      struct nbd_request request;
> @@ -260,19 +310,22 @@ static int nbd_flush(BlockDriverState *bs)
>      request.from = 0;
>      request.len = 0;
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> -        return -errno;
> -
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> -        return -errno;
> -
> -    if (reply.error !=0)
> -        return -reply.error;
> -
> -    if (reply.handle != request.handle)
> -        return -EIO;
> +    nbd_coroutine_start(s);
> +    if (nbd_co_send_request(s, &request) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    if (nbd_co_receive_reply(s, &reply) == -1) {
> +        reply.error = errno;
> +        goto done;
> +    }
> +    if (reply.error == 0 && reply.handle != request.handle) {
> +        reply.error = EIO;
> +    }
>  
> -    return 0;
> +done:
> +    nbd_coroutine_end(s);
> +    return -reply.error;
>  }
>  
>  static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
> @@ -290,19 +343,17 @@ static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
>      request.from = sector_num * 512;;
>      request.len = nb_sectors * 512;
>  
> -    if (nbd_send_request(s->sock, &request) == -1)
> +    if (nbd_send_request(s->sock, &request) == -1) {
>          return -errno;
> -
> -    if (nbd_receive_reply(s->sock, &reply) == -1)
> +    }
> +    if (nbd_receive_reply(s->sock, &reply) == -1) {
>          return -errno;
> -
> -    if (reply.error !=0)
> -        return -reply.error;
> -
> -    if (reply.handle != request.handle)
> +    }
> +    if (reply.error == 0 && reply.handle != request.handle) {
>          return -EIO;
> +    }
>  
> -    return 0;
> +    return -reply.error;
>  }
>  
>  static void nbd_close(BlockDriverState *bs)
> @@ -325,10 +376,10 @@ static BlockDriver bdrv_nbd = {
>      .format_name	= "nbd",
>      .instance_size	= sizeof(BDRVNBDState),
>      .bdrv_file_open	= nbd_open,
> -    .bdrv_read		= nbd_read,
> -    .bdrv_write		= nbd_write,
> +    .bdrv_co_readv	= nbd_co_readv,
> +    .bdrv_co_writev	= nbd_co_writev,
>      .bdrv_close		= nbd_close,
> -    .bdrv_flush		= nbd_flush,
> +    .bdrv_co_flush	= nbd_co_flush,
>      .bdrv_discard	= nbd_discard,
>      .bdrv_getlength	= nbd_getlength,
>      .protocol_name	= "nbd",
> diff --git a/nbd.c b/nbd.c
> index f089904..2f4c6b3 100644
> --- a/nbd.c
> +++ b/nbd.c
> @@ -80,6 +80,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
>  {
>      size_t offset = 0;
>  
> +    if (qemu_in_coroutine()) {
> +        if (do_read) {
> +            return qemu_co_recv(fd, buffer, size);
> +        } else {
> +            return qemu_co_send(fd, buffer, size);
> +        }
> +    }
> +
>      while (offset < size) {
>          ssize_t len;
>
Paolo Bonzini Sept. 9, 2011, 3:03 p.m. UTC | #2
On 09/09/2011 04:52 PM, Nicholas Thomas wrote:
> I'm a bit unsure here, actually. So you lock a mutex, send a request,
> wait for a response, then unlock the mutex. Surely this code doesn't
> allow more than one request to be in flight at a time?

No, it doesn't.  It shouldn't be hard to do it though. You could have 
two mutexes, one for sending and one for receiving.  You yield after 
sending, and let nbd_coroutine_restart read the reply.  It can then 
reenter that reply's coroutine based on the handle in the reply.  I 
still prefer to do it in a separate patch.

Paolo
Paolo Bonzini Sept. 9, 2011, 3:34 p.m. UTC | #3
On 09/09/2011 05:03 PM, Paolo Bonzini wrote:
>
>> I'm a bit unsure here, actually. So you lock a mutex, send a request,
>> wait for a response, then unlock the mutex. Surely this code doesn't
>> allow more than one request to be in flight at a time?
>
> No, it doesn't.  It shouldn't be hard to do it though. You could have
> two mutexes, one for sending and one for receiving.  You yield after
> sending, and let nbd_coroutine_restart read the reply.  It can then
> reenter that reply's coroutine based on the handle in the reply.  I
> still prefer to do it in a separate patch.

There is a problem with discard requests because they are not coroutine 
based (yet).  But it would be the same even in your AIO implementation, 
unfortunately.

Paolo
diff mbox

Patch

diff --git a/block/nbd.c b/block/nbd.c
index 964caa8..5a75263 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -52,6 +52,9 @@  typedef struct BDRVNBDState {
     size_t blocksize;
     char *export_name; /* An NBD server may export several devices */
 
+    CoMutex mutex;
+    Coroutine *coroutine;
+
     /* If it begins with  '/', this is a UNIX domain socket. Otherwise,
      * it's a string of the form <hostname|ip4|\[ip6\]>:port
      */
@@ -104,6 +107,37 @@  out:
     return err;
 }
 
+static void nbd_coroutine_start(BDRVNBDState *s)
+{
+    qemu_co_mutex_lock(&s->mutex);
+    s->coroutine = qemu_coroutine_self();
+}
+
+static void nbd_coroutine_enter(void *opaque)
+{
+    BDRVNBDState *s = opaque;
+    qemu_coroutine_enter(s->coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request)
+{
+    qemu_aio_set_fd_handler(s->sock, NULL, nbd_coroutine_enter, NULL, NULL, s);
+    return nbd_send_request(s->sock, request);
+}
+
+static int nbd_co_receive_reply(BDRVNBDState *s, struct nbd_reply *reply)
+{
+    qemu_aio_set_fd_handler(s->sock, nbd_coroutine_enter, NULL, NULL, NULL, s);
+    return nbd_receive_reply(s->sock, reply);
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s)
+{
+    qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, s);
+    s->coroutine = NULL;
+    qemu_co_mutex_unlock(&s->mutex);
+}
+
 static int nbd_establish_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
@@ -163,6 +197,8 @@  static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
     BDRVNBDState *s = bs->opaque;
     int result;
 
+    qemu_co_mutex_init(&s->mutex);
+
     /* Pop the config into our state object. Exit if invalid. */
     result = nbd_config(s, filename, flags);
     if (result != 0) {
@@ -177,8 +213,8 @@  static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
     return result;
 }
 
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
-                    uint8_t *buf, int nb_sectors)
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
@@ -189,30 +225,39 @@  static int nbd_read(BlockDriverState *bs, int64_t sector_num,
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
+    nbd_coroutine_start(s);
+    if (nbd_co_send_request(s, &request) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    if (nbd_co_receive_reply(s, &reply) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    if (reply.error != 0) {
+        goto done;
+    }
+    if (reply.handle != request.handle) {
+        reply.error = EIO;
+        goto done;
+    }
+    if (qemu_co_recvv(s->sock, qiov->iov, request.len, 0) != request.len) {
+        reply.error = EIO;
+    }
 
-    if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
-        return -EIO;
+done:
+    nbd_coroutine_end(s);
+    return -reply.error;
 
-    return 0;
 }
 
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
-                     const uint8_t *buf, int nb_sectors)
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                         int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
     struct nbd_reply reply;
+    int ret;
 
     request.type = NBD_CMD_WRITE;
     if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) {
@@ -223,25 +268,30 @@  static int nbd_write(BlockDriverState *bs, int64_t sector_num,
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
-        return -EIO;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
+    nbd_coroutine_start(s);
+    if (nbd_co_send_request(s, &request) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    ret = qemu_co_sendv(s->sock, qiov->iov, request.len, 0);
+    if (ret != request.len) {
+        reply.error = EIO;
+        goto done;
+    }
+    if (nbd_co_receive_reply(s, &reply) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    if (reply.handle != request.handle) {
+        reply.error = EIO;
+    }
 
-    return 0;
+done:
+    nbd_coroutine_end(s);
+    return -reply.error;
 }
 
-static int nbd_flush(BlockDriverState *bs)
+static int nbd_co_flush(BlockDriverState *bs)
 {
     BDRVNBDState *s = bs->opaque;
     struct nbd_request request;
@@ -260,19 +310,22 @@  static int nbd_flush(BlockDriverState *bs)
     request.from = 0;
     request.len = 0;
 
-    if (nbd_send_request(s->sock, &request) == -1)
-        return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
-        return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
-        return -EIO;
+    nbd_coroutine_start(s);
+    if (nbd_co_send_request(s, &request) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    if (nbd_co_receive_reply(s, &reply) == -1) {
+        reply.error = errno;
+        goto done;
+    }
+    if (reply.error == 0 && reply.handle != request.handle) {
+        reply.error = EIO;
+    }
 
-    return 0;
+done:
+    nbd_coroutine_end(s);
+    return -reply.error;
 }
 
 static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
@@ -290,19 +343,17 @@  static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
     request.from = sector_num * 512;;
     request.len = nb_sectors * 512;
 
-    if (nbd_send_request(s->sock, &request) == -1)
+    if (nbd_send_request(s->sock, &request) == -1) {
         return -errno;
-
-    if (nbd_receive_reply(s->sock, &reply) == -1)
+    }
+    if (nbd_receive_reply(s->sock, &reply) == -1) {
         return -errno;
-
-    if (reply.error !=0)
-        return -reply.error;
-
-    if (reply.handle != request.handle)
+    }
+    if (reply.error == 0 && reply.handle != request.handle) {
         return -EIO;
+    }
 
-    return 0;
+    return -reply.error;
 }
 
 static void nbd_close(BlockDriverState *bs)
@@ -325,10 +376,10 @@  static BlockDriver bdrv_nbd = {
     .format_name	= "nbd",
     .instance_size	= sizeof(BDRVNBDState),
     .bdrv_file_open	= nbd_open,
-    .bdrv_read		= nbd_read,
-    .bdrv_write		= nbd_write,
+    .bdrv_co_readv	= nbd_co_readv,
+    .bdrv_co_writev	= nbd_co_writev,
     .bdrv_close		= nbd_close,
-    .bdrv_flush		= nbd_flush,
+    .bdrv_co_flush	= nbd_co_flush,
     .bdrv_discard	= nbd_discard,
     .bdrv_getlength	= nbd_getlength,
     .protocol_name	= "nbd",
diff --git a/nbd.c b/nbd.c
index f089904..2f4c6b3 100644
--- a/nbd.c
+++ b/nbd.c
@@ -80,6 +80,14 @@  size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
 {
     size_t offset = 0;
 
+    if (qemu_in_coroutine()) {
+        if (do_read) {
+            return qemu_co_recv(fd, buffer, size);
+        } else {
+            return qemu_co_send(fd, buffer, size);
+        }
+    }
+
     while (offset < size) {
         ssize_t len;