Patchwork [v2,03/15] sheepdog: move coroutine send/recv function to generic code

login
register
mail settings
Submitter Paolo Bonzini
Date Sept. 16, 2011, 2:25 p.m.
Message ID <1316183152-5481-4-git-send-email-pbonzini@redhat.com>
Download mbox | patch
Permalink /patch/114931/
State New
Headers show

Comments

Paolo Bonzini - Sept. 16, 2011, 2:25 p.m.
Outside coroutines, avoid busy waiting on EAGAIN by temporarily
making the socket blocking.

The API of qemu_recvv/qemu_sendv is slightly different from
do_readv/do_writev because they do not handle coroutines.  It
returns the number of bytes written before encountering an
EAGAIN.  The specificity of yielding on EAGAIN is entirely in
qemu-coroutine.c.

Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/sheepdog.c |  225 ++++++------------------------------------------------
 cutils.c         |  177 ++++++++++++++++++++++++++++++++++++++++++
 qemu-common.h    |   30 +++++++
 3 files changed, 230 insertions(+), 202 deletions(-)
MORITA Kazutaka - Sept. 17, 2011, 6:29 a.m.
At Fri, 16 Sep 2011 16:25:40 +0200,
Paolo Bonzini wrote:
> 
> Outside coroutines, avoid busy waiting on EAGAIN by temporarily
> making the socket blocking.
> 
> The API of qemu_recvv/qemu_sendv is slightly different from
> do_readv/do_writev because they do not handle coroutines.  It
> returns the number of bytes written before encountering an
> EAGAIN.  The specificity of yielding on EAGAIN is entirely in
> qemu-coroutine.c.
> 
> Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/sheepdog.c |  225 ++++++------------------------------------------------
>  cutils.c         |  177 ++++++++++++++++++++++++++++++++++++++++++
>  qemu-common.h    |   30 +++++++
>  3 files changed, 230 insertions(+), 202 deletions(-)

It seems this patch causes a compile error of qemu-ga.

Other things I noticed:

>  static int send_req(int sockfd, SheepdogReq *hdr, void *data,
>                      unsigned int *wlen)
>  {
> @@ -691,10 +509,9 @@ static int send_req(int sockfd, SheepdogReq *hdr, void *data,
>          iov[1].iov_len = *wlen;
>      }
>  
> -    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
> -    if (ret) {
> +    ret = qemu_sendv(sockfd, iov, sizeof(*hdr) + *wlen, 0);

This is wrong because qemu_sendv() may return a smaller value than
(sizeof(*hdr) + *wlen).  We need to do things like qemu_write_full()
here.

> +    if (ret < 0) {
>          error_report("failed to send a req, %s", strerror(errno));
> -        ret = -1;
>      }
>  
>      return ret;
> @@ -704,17 +521,19 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
>                    unsigned int *wlen, unsigned int *rlen)
>  {
>      int ret;
> +    struct iovec iov;
>  
> +    socket_set_block(sockfd);
>      ret = send_req(sockfd, hdr, data, wlen);
> -    if (ret) {
> -        ret = -1;
> +    if (ret < 0) {
>          goto out;
>      }
>  
> -    ret = do_read(sockfd, hdr, sizeof(*hdr));
> -    if (ret) {
> +    iov.iov_base = hdr;
> +    iov.iov_len = sizeof(*hdr);
> +    ret = qemu_recvv(sockfd, &iov, sizeof(*hdr), 0);

qemu_recvv() may also return a smaller value than sizeof(*hdr) here.

> +    if (ret < 0) {
>          error_report("failed to get a rsp, %s", strerror(errno));
> -        ret = -1;
>          goto out;
>      }
>  
> @@ -723,15 +542,17 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
>      }
>  
>      if (*rlen) {
> -        ret = do_read(sockfd, data, *rlen);
> -        if (ret) {
> +        iov.iov_base = data;
> +        iov.iov_len = *rlen;
> +        ret = qemu_recvv(sockfd, &iov, *rlen, 0);

Same here.

> +        if (ret < 0) {
>              error_report("failed to get the data, %s", strerror(errno));
> -            ret = -1;
>              goto out;
>          }
>      }
>      ret = 0;
>  out:
> +    socket_set_nonblock(sockfd);
>      return ret;
>  }
>  

[snip]

> +
> +/*
> + * Send/recv data with iovec buffers
> + *
> + * This function send/recv data from/to the iovec buffer directly.
> + * The first `offset' bytes in the iovec buffer are skipped and next
> + * `len' bytes are used.
> + *
> + * For example,
> + *
> + *   do_sendv_recvv(sockfd, iov, len, offset, 1);
> + *
> + * is equal to
> + *
> + *   char *buf = malloc(size);
> + *   iov_to_buf(iov, iovcnt, buf, offset, size);
> + *   send(sockfd, buf, size, 0);
> + *   free(buf);
> + */
> +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
> +                          int do_sendv)
> +{
> +    int ret, diff, iovlen;
> +    struct iovec *last_iov;
> +
> +    /* last_iov is inclusive, so count from one.  */
> +    iovlen = 1;
> +    last_iov = iov;
> +    len += offset;
> +
> +    while (last_iov->iov_len < len) {
> +        len -= last_iov->iov_len;
> +
> +        last_iov++;
> +        iovlen++;
> +    }
> +
> +    diff = last_iov->iov_len - len;
> +    last_iov->iov_len -= diff;
> +
> +    while (iov->iov_len <= offset) {
> +        offset -= iov->iov_len;
> +
> +        iov++;
> +        iovlen--;
> +    }
> +
> +    iov->iov_base = (char *) iov->iov_base + offset;
> +    iov->iov_len -= offset;
> +
> +    {
> +#ifdef CONFIG_IOVEC
> +        struct msghdr msg;
> +        memset(&msg, 0, sizeof(msg));
> +        msg.msg_iov = iov;
> +        msg.msg_iovlen = iovlen;
> +
> +        do {
> +            if (do_sendv) {
> +                ret = sendmsg(sockfd, &msg, 0);
> +            } else {
> +                ret = recvmsg(sockfd, &msg, 0);
> +            }
> +        } while (ret == -1 && errno == EINTR);
> +#else
> +        struct iovec *p = iov;
> +        ret = 0;
> +        while (iovlen > 0) {
> +            int rc;
> +            if (do_sendv) {
> +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
> +            } else {
> +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
> +            }
> +            if (rc == -1) {
> +                if (errno == EINTR) {
> +                    continue;
> +                }
> +                if (ret == 0) {
> +                    ret = -1;
> +                }
> +                break;
> +            }
> +            iovlen--, p++;
> +            ret += rc;
> +        }

This code can be called inside coroutines with a non-blocking fd, so
should we avoid busy waiting?


Thanks,

Kazutaka
Paolo Bonzini - Sept. 17, 2011, 2:49 p.m.
On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
>> >  +#else
>> >  +        struct iovec *p = iov;
>> >  +        ret = 0;
>> >  +        while (iovlen>  0) {
>> >  +            int rc;
>> >  +            if (do_sendv) {
>> >  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
>> >  +            } else {
>> >  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
>> >  +            }
>> >  +            if (rc == -1) {
>> >  +                if (errno == EINTR) {
>> >  +                    continue;
>> >  +                }
>> >  +                if (ret == 0) {
>> >  +                    ret = -1;
>> >  +                }
>> >  +                break;
>> >  +            }
>> >  +            iovlen--, p++;
>> >  +            ret += rc;
>> >  +        }
> This code can be called inside coroutines with a non-blocking fd, so
> should we avoid busy waiting?

It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 
first hunk of patch 4, which is needed.

qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
do_readv_writev.  I probably moved it there inadvertently while moving 
code around to cutils.c, but in order to fix qemu-ga I need to create a 
new file qemu-coroutine-io.c.

Kevin, do you want me to resubmit everything, or are you going to apply 
some more patches to the block branch (5 to 12 should be fine)?

Paolo
MORITA Kazutaka - Sept. 17, 2011, 5:16 p.m.
At Sat, 17 Sep 2011 16:49:22 +0200,
Paolo Bonzini wrote:
> 
> On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
> >> >  +#else
> >> >  +        struct iovec *p = iov;
> >> >  +        ret = 0;
> >> >  +        while (iovlen>  0) {
> >> >  +            int rc;
> >> >  +            if (do_sendv) {
> >> >  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
> >> >  +            } else {
> >> >  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
> >> >  +            }
> >> >  +            if (rc == -1) {
> >> >  +                if (errno == EINTR) {
> >> >  +                    continue;
> >> >  +                }
> >> >  +                if (ret == 0) {
> >> >  +                    ret = -1;
> >> >  +                }
> >> >  +                break;
> >> >  +            }
> >> >  +            iovlen--, p++;
> >> >  +            ret += rc;
> >> >  +        }
> > This code can be called inside coroutines with a non-blocking fd, so
> > should we avoid busy waiting?
> 
> It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 

Oops, you're right.  Sorry for the noise.

Thanks,

Kazutaka


> first hunk of patch 4, which is needed.
> 
> qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
> do_readv_writev.  I probably moved it there inadvertently while moving 
> code around to cutils.c, but in order to fix qemu-ga I need to create a 
> new file qemu-coroutine-io.c.
> 
> Kevin, do you want me to resubmit everything, or are you going to apply 
> some more patches to the block branch (5 to 12 should be fine)?
> 
> Paolo
>
Kevin Wolf - Sept. 19, 2011, 7:47 a.m.
Am 17.09.2011 16:49, schrieb Paolo Bonzini:
> On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
>>>>  +#else
>>>>  +        struct iovec *p = iov;
>>>>  +        ret = 0;
>>>>  +        while (iovlen>  0) {
>>>>  +            int rc;
>>>>  +            if (do_sendv) {
>>>>  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
>>>>  +            } else {
>>>>  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
>>>>  +            }
>>>>  +            if (rc == -1) {
>>>>  +                if (errno == EINTR) {
>>>>  +                    continue;
>>>>  +                }
>>>>  +                if (ret == 0) {
>>>>  +                    ret = -1;
>>>>  +                }
>>>>  +                break;
>>>>  +            }
>>>>  +            iovlen--, p++;
>>>>  +            ret += rc;
>>>>  +        }
>> This code can be called inside coroutines with a non-blocking fd, so
>> should we avoid busy waiting?
> 
> It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 
> first hunk of patch 4, which is needed.
> 
> qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
> do_readv_writev.  I probably moved it there inadvertently while moving 
> code around to cutils.c, but in order to fix qemu-ga I need to create a 
> new file qemu-coroutine-io.c.
> 
> Kevin, do you want me to resubmit everything, or are you going to apply 
> some more patches to the block branch (5 to 12 should be fine)?

As long as it's clear what the current version is, I don't mind. Do I
understand right that there will be a v3 for patches 3 and 4?

Kevin
Paolo Bonzini - Sept. 19, 2011, 9:34 a.m.
On 09/19/2011 09:47 AM, Kevin Wolf wrote:
> As long as it's clear what the current version is, I don't mind. Do I
> understand right that there will be a v3 for patches 3 and 4?

Yes.

Paolo

Patch

diff --git a/block/sheepdog.c b/block/sheepdog.c
index af696a5..94e62a3 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -443,129 +443,6 @@  static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     return acb;
 }
 
-#ifdef _WIN32
-
-struct msghdr {
-    struct iovec *msg_iov;
-    size_t        msg_iovlen;
-};
-
-static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-
-    ret = send(s, buf, size, flags);
-
-    g_free(buf);
-    return ret;
-}
-
-static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    ret = qemu_recv(s, buf, size, flags);
-    if (ret < 0) {
-        goto out;
-    }
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-out:
-    g_free(buf);
-    return ret;
-}
-
-#endif
-
-/*
- * Send/recv data with iovec buffers
- *
- * This function send/recv data from/to the iovec buffer directly.
- * The first `offset' bytes in the iovec buffer are skipped and next
- * `len' bytes are used.
- *
- * For example,
- *
- *   do_send_recv(sockfd, iov, len, offset, 1);
- *
- * is equals to
- *
- *   char *buf = malloc(size);
- *   iov_to_buf(iov, iovcnt, buf, offset, size);
- *   send(sockfd, buf, size, 0);
- *   free(buf);
- */
-static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
-                        int write)
-{
-    struct msghdr msg;
-    int ret, diff;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_iov = iov;
-    msg.msg_iovlen = 1;
-
-    len += offset;
-
-    while (iov->iov_len < len) {
-        len -= iov->iov_len;
-
-        iov++;
-        msg.msg_iovlen++;
-    }
-
-    diff = iov->iov_len - len;
-    iov->iov_len -= diff;
-
-    while (msg.msg_iov->iov_len <= offset) {
-        offset -= msg.msg_iov->iov_len;
-
-        msg.msg_iov++;
-        msg.msg_iovlen--;
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
-    msg.msg_iov->iov_len -= offset;
-
-    if (write) {
-        ret = sendmsg(sockfd, &msg, 0);
-    } else {
-        ret = recvmsg(sockfd, &msg, 0);
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
-    msg.msg_iov->iov_len += offset;
-
-    iov->iov_len += diff;
-    return ret;
-}
-
 static int connect_to_sdog(const char *addr, const char *port)
 {
     char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@@ -618,65 +495,6 @@  success:
     return fd;
 }
 
-static int do_readv_writev(int sockfd, struct iovec *iov, int len,
-                           int iov_offset, int write)
-{
-    int ret;
-again:
-    ret = do_send_recv(sockfd, iov, len, iov_offset, write);
-    if (ret < 0) {
-        if (errno == EINTR) {
-            goto again;
-        }
-        if (errno == EAGAIN) {
-            if (qemu_in_coroutine()) {
-                qemu_coroutine_yield();
-            }
-            goto again;
-        }
-        error_report("failed to recv a rsp, %s", strerror(errno));
-        return 1;
-    }
-
-    iov_offset += ret;
-    len -= ret;
-    if (len) {
-        goto again;
-    }
-
-    return 0;
-}
-
-static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 0);
-}
-
-static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 1);
-}
-
-static int do_read_write(int sockfd, void *buf, int len, int write)
-{
-    struct iovec iov;
-
-    iov.iov_base = buf;
-    iov.iov_len = len;
-
-    return do_readv_writev(sockfd, &iov, len, 0, write);
-}
-
-static int do_read(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 0);
-}
-
-static int do_write(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 1);
-}
-
 static int send_req(int sockfd, SheepdogReq *hdr, void *data,
                     unsigned int *wlen)
 {
@@ -691,10 +509,9 @@  static int send_req(int sockfd, SheepdogReq *hdr, void *data,
         iov[1].iov_len = *wlen;
     }
 
-    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
-    if (ret) {
+    ret = qemu_sendv(sockfd, iov, sizeof(*hdr) + *wlen, 0);
+    if (ret < 0) {
         error_report("failed to send a req, %s", strerror(errno));
-        ret = -1;
     }
 
     return ret;
@@ -704,17 +521,19 @@  static int do_req(int sockfd, SheepdogReq *hdr, void *data,
                   unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
+    struct iovec iov;
 
+    socket_set_block(sockfd);
     ret = send_req(sockfd, hdr, data, wlen);
-    if (ret) {
-        ret = -1;
+    if (ret < 0) {
         goto out;
     }
 
-    ret = do_read(sockfd, hdr, sizeof(*hdr));
-    if (ret) {
+    iov.iov_base = hdr;
+    iov.iov_len = sizeof(*hdr);
+    ret = qemu_recvv(sockfd, &iov, sizeof(*hdr), 0);
+    if (ret < 0) {
         error_report("failed to get a rsp, %s", strerror(errno));
-        ret = -1;
         goto out;
     }
 
@@ -723,15 +542,17 @@  static int do_req(int sockfd, SheepdogReq *hdr, void *data,
     }
 
     if (*rlen) {
-        ret = do_read(sockfd, data, *rlen);
-        if (ret) {
+        iov.iov_base = data;
+        iov.iov_len = *rlen;
+        ret = qemu_recvv(sockfd, &iov, *rlen, 0);
+        if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
-            ret = -1;
             goto out;
         }
     }
     ret = 0;
 out:
+    socket_set_nonblock(sockfd);
     return ret;
 }
 
@@ -793,8 +614,8 @@  static void coroutine_fn aio_read_response(void *opaque)
     }
 
     /* read a header */
-    ret = do_read(fd, &rsp, sizeof(rsp));
-    if (ret) {
+    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
+    if (ret < 0) {
         error_report("failed to get the header, %s", strerror(errno));
         goto out;
     }
@@ -839,9 +660,9 @@  static void coroutine_fn aio_read_response(void *opaque)
         }
         break;
     case AIOCB_READ_UDATA:
-        ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
-                       aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
+                            aio_req->iov_offset);
+        if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
             goto out;
         }
@@ -1114,15 +935,15 @@  static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     set_cork(s->fd, 1);
 
     /* send a header */
-    ret = do_write(s->fd, &hdr, sizeof(hdr));
-    if (ret) {
+    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
+    if (ret < 0) {
         error_report("failed to send a req, %s", strerror(errno));
         return -EIO;
     }
 
     if (wlen) {
-        ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
+        if (ret < 0) {
             error_report("failed to send a data, %s", strerror(errno));
             return -EIO;
         }
diff --git a/cutils.c b/cutils.c
index c91f887..b302020 100644
--- a/cutils.c
+++ b/cutils.c
@@ -25,6 +25,9 @@ 
 #include "host-utils.h"
 #include <math.h>
 
+#include "qemu_socket.h"
+#include "qemu-coroutine.h"
+
 void pstrcpy(char *buf, int buf_size, const char *str)
 {
     int c;
@@ -415,3 +418,177 @@  int64_t strtosz(const char *nptr, char **end)
 {
     return strtosz_suffix(nptr, end, STRTOSZ_DEFSUFFIX_MB);
 }
+
+/*
+ * Send/recv data with iovec buffers
+ *
+ * This function send/recv data from/to the iovec buffer directly.
+ * The first `offset' bytes in the iovec buffer are skipped and next
+ * `len' bytes are used.
+ *
+ * For example,
+ *
+ *   do_sendv_recvv(sockfd, iov, len, offset, 1);
+ *
+ * is equal to
+ *
+ *   char *buf = malloc(size);
+ *   iov_to_buf(iov, iovcnt, buf, offset, size);
+ *   send(sockfd, buf, size, 0);
+ *   free(buf);
+ */
+static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
+                          int do_sendv)
+{
+    int ret, diff, iovlen;
+    struct iovec *last_iov;
+
+    /* last_iov is inclusive, so count from one.  */
+    iovlen = 1;
+    last_iov = iov;
+    len += offset;
+
+    while (last_iov->iov_len < len) {
+        len -= last_iov->iov_len;
+
+        last_iov++;
+        iovlen++;
+    }
+
+    diff = last_iov->iov_len - len;
+    last_iov->iov_len -= diff;
+
+    while (iov->iov_len <= offset) {
+        offset -= iov->iov_len;
+
+        iov++;
+        iovlen--;
+    }
+
+    iov->iov_base = (char *) iov->iov_base + offset;
+    iov->iov_len -= offset;
+
+    {
+#ifdef CONFIG_IOVEC
+        struct msghdr msg;
+        memset(&msg, 0, sizeof(msg));
+        msg.msg_iov = iov;
+        msg.msg_iovlen = iovlen;
+
+        do {
+            if (do_sendv) {
+                ret = sendmsg(sockfd, &msg, 0);
+            } else {
+                ret = recvmsg(sockfd, &msg, 0);
+            }
+        } while (ret == -1 && errno == EINTR);
+#else
+        struct iovec *p = iov;
+        ret = 0;
+        while (iovlen > 0) {
+            int rc;
+            if (do_sendv) {
+                rc = send(sockfd, p->iov_base, p->iov_len, 0);
+            } else {
+                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
+            }
+            if (rc == -1) {
+                if (errno == EINTR) {
+                    continue;
+                }
+                if (ret == 0) {
+                    ret = -1;
+                }
+                break;
+            }
+            iovlen--, p++;
+            ret += rc;
+        }
+#endif
+    }
+
+    /* Undo the changes above */
+    iov->iov_base = (char *) iov->iov_base - offset;
+    iov->iov_len += offset;
+    last_iov->iov_len += diff;
+    return ret;
+}
+
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+    return do_sendv_recvv(sockfd, iov, len, iov_offset, 0);
+}
+
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+    return do_sendv_recvv(sockfd, iov, len, iov_offset, 1);
+}
+
+int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov,
+                               int len, int iov_offset)
+{
+    int total = 0;
+    int ret;
+    while (len) {
+        ret = qemu_recvv(sockfd, iov, len, iov_offset + total);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                qemu_coroutine_yield();
+                continue;
+            }
+            if (total == 0) {
+                total = -1;
+            }
+            break;
+        }
+        if (ret == 0) {
+            break;
+        }
+        total += ret, len -= ret;
+    }
+
+    return total;
+}
+
+int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov,
+                               int len, int iov_offset)
+{
+    int total = 0;
+    int ret;
+    while (len) {
+        ret = qemu_sendv(sockfd, iov, len, iov_offset + total);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                qemu_coroutine_yield();
+                continue;
+            }
+            if (total == 0) {
+                total = -1;
+            }
+            break;
+        }
+        total += ret, len -= ret;
+    }
+
+    return total;
+}
+
+int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len)
+{
+    struct iovec iov;
+
+    iov.iov_base = buf;
+    iov.iov_len = len;
+
+    return qemu_co_recvv(sockfd, &iov, len, 0);
+}
+
+int coroutine_fn qemu_co_send(int sockfd, void *buf, int len)
+{
+    struct iovec iov;
+
+    iov.iov_base = buf;
+    iov.iov_len = len;
+
+    return qemu_co_sendv(sockfd, &iov, len, 0);
+}
diff --git a/qemu-common.h b/qemu-common.h
index 404c421..d7ee4a8 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -203,6 +203,9 @@  int qemu_pipe(int pipefd[2]);
 #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags)
 #endif
 
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset);
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset);
+
 /* Error handling.  */
 
 void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2);
@@ -302,6 +305,33 @@  struct qemu_work_item {
 void qemu_init_vcpu(void *env);
 #endif
 
+/**
+ * Sends an iovec (or optionally a part of it) down a socket, yielding
+ * when the socket is full.
+ */
+int qemu_co_sendv(int sockfd, struct iovec *iov,
+                  int len, int iov_offset);
+
+/**
+ * Receives data into an iovec (or optionally into a part of it) from
+ * a socket, yielding when there is no data in the socket.
+ */
+int qemu_co_recvv(int sockfd, struct iovec *iov,
+                  int len, int iov_offset);
+
+
+/**
+ * Sends a buffer down a socket, yielding when the socket is full.
+ */
+int qemu_co_send(int sockfd, void *buf, int len);
+
+/**
+ * Receives data into a buffer from a socket, yielding when there
+ * is no data in the socket.
+ */
+int qemu_co_recv(int sockfd, void *buf, int len);
+
+
 typedef struct QEMUIOVector {
     struct iovec *iov;
     int niov;