diff mbox

[v2,03/15] sheepdog: move coroutine send/recv function to generic code

Message ID 1316183152-5481-4-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Sept. 16, 2011, 2:25 p.m. UTC
Outside coroutines, avoid busy waiting on EAGAIN by temporarily
making the socket blocking.

The API of qemu_recvv/qemu_sendv is slightly different from
do_readv/do_writev because they do not handle coroutines.  It
returns the number of bytes written before encountering an
EAGAIN.  The specificity of yielding on EAGAIN is entirely in
qemu-coroutine.c.

Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/sheepdog.c |  225 ++++++------------------------------------------------
 cutils.c         |  177 ++++++++++++++++++++++++++++++++++++++++++
 qemu-common.h    |   30 +++++++
 3 files changed, 230 insertions(+), 202 deletions(-)

Comments

MORITA Kazutaka Sept. 17, 2011, 6:29 a.m. UTC | #1
At Fri, 16 Sep 2011 16:25:40 +0200,
Paolo Bonzini wrote:
> 
> Outside coroutines, avoid busy waiting on EAGAIN by temporarily
> making the socket blocking.
> 
> The API of qemu_recvv/qemu_sendv is slightly different from
> do_readv/do_writev because they do not handle coroutines.  It
> returns the number of bytes written before encountering an
> EAGAIN.  The specificity of yielding on EAGAIN is entirely in
> qemu-coroutine.c.
> 
> Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/sheepdog.c |  225 ++++++------------------------------------------------
>  cutils.c         |  177 ++++++++++++++++++++++++++++++++++++++++++
>  qemu-common.h    |   30 +++++++
>  3 files changed, 230 insertions(+), 202 deletions(-)

It seems this patch causes a compile error of qemu-ga.

Other things I noticed:

>  static int send_req(int sockfd, SheepdogReq *hdr, void *data,
>                      unsigned int *wlen)
>  {
> @@ -691,10 +509,9 @@ static int send_req(int sockfd, SheepdogReq *hdr, void *data,
>          iov[1].iov_len = *wlen;
>      }
>  
> -    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
> -    if (ret) {
> +    ret = qemu_sendv(sockfd, iov, sizeof(*hdr) + *wlen, 0);

This is wrong because qemu_sendv() may return a smaller value than
(sizeof(*hdr) + *wlen).  We need to do things like qemu_write_full()
here.

> +    if (ret < 0) {
>          error_report("failed to send a req, %s", strerror(errno));
> -        ret = -1;
>      }
>  
>      return ret;
> @@ -704,17 +521,19 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
>                    unsigned int *wlen, unsigned int *rlen)
>  {
>      int ret;
> +    struct iovec iov;
>  
> +    socket_set_block(sockfd);
>      ret = send_req(sockfd, hdr, data, wlen);
> -    if (ret) {
> -        ret = -1;
> +    if (ret < 0) {
>          goto out;
>      }
>  
> -    ret = do_read(sockfd, hdr, sizeof(*hdr));
> -    if (ret) {
> +    iov.iov_base = hdr;
> +    iov.iov_len = sizeof(*hdr);
> +    ret = qemu_recvv(sockfd, &iov, sizeof(*hdr), 0);

qemu_recvv() may also return a smaller value than sizeof(*hdr) here.

> +    if (ret < 0) {
>          error_report("failed to get a rsp, %s", strerror(errno));
> -        ret = -1;
>          goto out;
>      }
>  
> @@ -723,15 +542,17 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
>      }
>  
>      if (*rlen) {
> -        ret = do_read(sockfd, data, *rlen);
> -        if (ret) {
> +        iov.iov_base = data;
> +        iov.iov_len = *rlen;
> +        ret = qemu_recvv(sockfd, &iov, *rlen, 0);

Same here.

> +        if (ret < 0) {
>              error_report("failed to get the data, %s", strerror(errno));
> -            ret = -1;
>              goto out;
>          }
>      }
>      ret = 0;
>  out:
> +    socket_set_nonblock(sockfd);
>      return ret;
>  }
>  

[snip]

> +
> +/*
> + * Send/recv data with iovec buffers
> + *
> + * This function send/recv data from/to the iovec buffer directly.
> + * The first `offset' bytes in the iovec buffer are skipped and next
> + * `len' bytes are used.
> + *
> + * For example,
> + *
> + *   do_sendv_recvv(sockfd, iov, len, offset, 1);
> + *
> + * is equal to
> + *
> + *   char *buf = malloc(size);
> + *   iov_to_buf(iov, iovcnt, buf, offset, size);
> + *   send(sockfd, buf, size, 0);
> + *   free(buf);
> + */
> +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
> +                          int do_sendv)
> +{
> +    int ret, diff, iovlen;
> +    struct iovec *last_iov;
> +
> +    /* last_iov is inclusive, so count from one.  */
> +    iovlen = 1;
> +    last_iov = iov;
> +    len += offset;
> +
> +    while (last_iov->iov_len < len) {
> +        len -= last_iov->iov_len;
> +
> +        last_iov++;
> +        iovlen++;
> +    }
> +
> +    diff = last_iov->iov_len - len;
> +    last_iov->iov_len -= diff;
> +
> +    while (iov->iov_len <= offset) {
> +        offset -= iov->iov_len;
> +
> +        iov++;
> +        iovlen--;
> +    }
> +
> +    iov->iov_base = (char *) iov->iov_base + offset;
> +    iov->iov_len -= offset;
> +
> +    {
> +#ifdef CONFIG_IOVEC
> +        struct msghdr msg;
> +        memset(&msg, 0, sizeof(msg));
> +        msg.msg_iov = iov;
> +        msg.msg_iovlen = iovlen;
> +
> +        do {
> +            if (do_sendv) {
> +                ret = sendmsg(sockfd, &msg, 0);
> +            } else {
> +                ret = recvmsg(sockfd, &msg, 0);
> +            }
> +        } while (ret == -1 && errno == EINTR);
> +#else
> +        struct iovec *p = iov;
> +        ret = 0;
> +        while (iovlen > 0) {
> +            int rc;
> +            if (do_sendv) {
> +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
> +            } else {
> +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
> +            }
> +            if (rc == -1) {
> +                if (errno == EINTR) {
> +                    continue;
> +                }
> +                if (ret == 0) {
> +                    ret = -1;
> +                }
> +                break;
> +            }
> +            iovlen--, p++;
> +            ret += rc;
> +        }

This code can be called inside coroutines with a non-blocking fd, so
should we avoid busy waiting?


Thanks,

Kazutaka
Paolo Bonzini Sept. 17, 2011, 2:49 p.m. UTC | #2
On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
>> >  +#else
>> >  +        struct iovec *p = iov;
>> >  +        ret = 0;
>> >  +        while (iovlen>  0) {
>> >  +            int rc;
>> >  +            if (do_sendv) {
>> >  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
>> >  +            } else {
>> >  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
>> >  +            }
>> >  +            if (rc == -1) {
>> >  +                if (errno == EINTR) {
>> >  +                    continue;
>> >  +                }
>> >  +                if (ret == 0) {
>> >  +                    ret = -1;
>> >  +                }
>> >  +                break;
>> >  +            }
>> >  +            iovlen--, p++;
>> >  +            ret += rc;
>> >  +        }
> This code can be called inside coroutines with a non-blocking fd, so
> should we avoid busy waiting?

It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 
first hunk of patch 4, which is needed.

qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
do_readv_writev.  I probably moved it there inadvertently while moving 
code around to cutils.c, but in order to fix qemu-ga I need to create a 
new file qemu-coroutine-io.c.

Kevin, do you want me to resubmit everything, or are you going to apply 
some more patches to the block branch (5 to 12 should be fine)?

Paolo
MORITA Kazutaka Sept. 17, 2011, 5:16 p.m. UTC | #3
At Sat, 17 Sep 2011 16:49:22 +0200,
Paolo Bonzini wrote:
> 
> On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
> >> >  +#else
> >> >  +        struct iovec *p = iov;
> >> >  +        ret = 0;
> >> >  +        while (iovlen>  0) {
> >> >  +            int rc;
> >> >  +            if (do_sendv) {
> >> >  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
> >> >  +            } else {
> >> >  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
> >> >  +            }
> >> >  +            if (rc == -1) {
> >> >  +                if (errno == EINTR) {
> >> >  +                    continue;
> >> >  +                }
> >> >  +                if (ret == 0) {
> >> >  +                    ret = -1;
> >> >  +                }
> >> >  +                break;
> >> >  +            }
> >> >  +            iovlen--, p++;
> >> >  +            ret += rc;
> >> >  +        }
> > This code can be called inside coroutines with a non-blocking fd, so
> > should we avoid busy waiting?
> 
> It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 

Oops, you're right.  Sorry for the noise.

Thanks,

Kazutaka


> first hunk of patch 4, which is needed.
> 
> qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
> do_readv_writev.  I probably moved it there inadvertently while moving 
> code around to cutils.c, but in order to fix qemu-ga I need to create a 
> new file qemu-coroutine-io.c.
> 
> Kevin, do you want me to resubmit everything, or are you going to apply 
> some more patches to the block branch (5 to 12 should be fine)?
> 
> Paolo
>
Kevin Wolf Sept. 19, 2011, 7:47 a.m. UTC | #4
Am 17.09.2011 16:49, schrieb Paolo Bonzini:
> On 09/17/2011 08:29 AM, MORITA Kazutaka wrote:
>>>>  +#else
>>>>  +        struct iovec *p = iov;
>>>>  +        ret = 0;
>>>>  +        while (iovlen>  0) {
>>>>  +            int rc;
>>>>  +            if (do_sendv) {
>>>>  +                rc = send(sockfd, p->iov_base, p->iov_len, 0);
>>>>  +            } else {
>>>>  +                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
>>>>  +            }
>>>>  +            if (rc == -1) {
>>>>  +                if (errno == EINTR) {
>>>>  +                    continue;
>>>>  +                }
>>>>  +                if (ret == 0) {
>>>>  +                    ret = -1;
>>>>  +                }
>>>>  +                break;
>>>>  +            }
>>>>  +            iovlen--, p++;
>>>>  +            ret += rc;
>>>>  +        }
>> This code can be called inside coroutines with a non-blocking fd, so
>> should we avoid busy waiting?
> 
> It doesn't busy wait, it exits with EAGAIN.  I'll squash in here the 
> first hunk of patch 4, which is needed.
> 
> qemu_co_recvv already handles reads that return zero, unlike sheepdog's 
> do_readv_writev.  I probably moved it there inadvertently while moving 
> code around to cutils.c, but in order to fix qemu-ga I need to create a 
> new file qemu-coroutine-io.c.
> 
> Kevin, do you want me to resubmit everything, or are you going to apply 
> some more patches to the block branch (5 to 12 should be fine)?

As long as it's clear what the current version is, I don't mind. Do I
understand right that there will be a v3 for patches 3 and 4?

Kevin
Paolo Bonzini Sept. 19, 2011, 9:34 a.m. UTC | #5
On 09/19/2011 09:47 AM, Kevin Wolf wrote:
> As long as it's clear what the current version is, I don't mind. Do I
> understand right that there will be a v3 for patches 3 and 4?

Yes.

Paolo
diff mbox

Patch

diff --git a/block/sheepdog.c b/block/sheepdog.c
index af696a5..94e62a3 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -443,129 +443,6 @@  static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
     return acb;
 }
 
-#ifdef _WIN32
-
-struct msghdr {
-    struct iovec *msg_iov;
-    size_t        msg_iovlen;
-};
-
-static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-
-    ret = send(s, buf, size, flags);
-
-    g_free(buf);
-    return ret;
-}
-
-static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
-{
-    size_t size = 0;
-    char *buf, *p;
-    int i, ret;
-
-    /* count the msg size */
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        size += msg->msg_iov[i].iov_len;
-    }
-    buf = g_malloc(size);
-
-    ret = qemu_recv(s, buf, size, flags);
-    if (ret < 0) {
-        goto out;
-    }
-
-    p = buf;
-    for (i = 0; i < msg->msg_iovlen; i++) {
-        memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
-        p += msg->msg_iov[i].iov_len;
-    }
-out:
-    g_free(buf);
-    return ret;
-}
-
-#endif
-
-/*
- * Send/recv data with iovec buffers
- *
- * This function send/recv data from/to the iovec buffer directly.
- * The first `offset' bytes in the iovec buffer are skipped and next
- * `len' bytes are used.
- *
- * For example,
- *
- *   do_send_recv(sockfd, iov, len, offset, 1);
- *
- * is equals to
- *
- *   char *buf = malloc(size);
- *   iov_to_buf(iov, iovcnt, buf, offset, size);
- *   send(sockfd, buf, size, 0);
- *   free(buf);
- */
-static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
-                        int write)
-{
-    struct msghdr msg;
-    int ret, diff;
-
-    memset(&msg, 0, sizeof(msg));
-    msg.msg_iov = iov;
-    msg.msg_iovlen = 1;
-
-    len += offset;
-
-    while (iov->iov_len < len) {
-        len -= iov->iov_len;
-
-        iov++;
-        msg.msg_iovlen++;
-    }
-
-    diff = iov->iov_len - len;
-    iov->iov_len -= diff;
-
-    while (msg.msg_iov->iov_len <= offset) {
-        offset -= msg.msg_iov->iov_len;
-
-        msg.msg_iov++;
-        msg.msg_iovlen--;
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
-    msg.msg_iov->iov_len -= offset;
-
-    if (write) {
-        ret = sendmsg(sockfd, &msg, 0);
-    } else {
-        ret = recvmsg(sockfd, &msg, 0);
-    }
-
-    msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
-    msg.msg_iov->iov_len += offset;
-
-    iov->iov_len += diff;
-    return ret;
-}
-
 static int connect_to_sdog(const char *addr, const char *port)
 {
     char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@@ -618,65 +495,6 @@  success:
     return fd;
 }
 
-static int do_readv_writev(int sockfd, struct iovec *iov, int len,
-                           int iov_offset, int write)
-{
-    int ret;
-again:
-    ret = do_send_recv(sockfd, iov, len, iov_offset, write);
-    if (ret < 0) {
-        if (errno == EINTR) {
-            goto again;
-        }
-        if (errno == EAGAIN) {
-            if (qemu_in_coroutine()) {
-                qemu_coroutine_yield();
-            }
-            goto again;
-        }
-        error_report("failed to recv a rsp, %s", strerror(errno));
-        return 1;
-    }
-
-    iov_offset += ret;
-    len -= ret;
-    if (len) {
-        goto again;
-    }
-
-    return 0;
-}
-
-static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 0);
-}
-
-static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
-    return do_readv_writev(sockfd, iov, len, iov_offset, 1);
-}
-
-static int do_read_write(int sockfd, void *buf, int len, int write)
-{
-    struct iovec iov;
-
-    iov.iov_base = buf;
-    iov.iov_len = len;
-
-    return do_readv_writev(sockfd, &iov, len, 0, write);
-}
-
-static int do_read(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 0);
-}
-
-static int do_write(int sockfd, void *buf, int len)
-{
-    return do_read_write(sockfd, buf, len, 1);
-}
-
 static int send_req(int sockfd, SheepdogReq *hdr, void *data,
                     unsigned int *wlen)
 {
@@ -691,10 +509,9 @@  static int send_req(int sockfd, SheepdogReq *hdr, void *data,
         iov[1].iov_len = *wlen;
     }
 
-    ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
-    if (ret) {
+    ret = qemu_sendv(sockfd, iov, sizeof(*hdr) + *wlen, 0);
+    if (ret < 0) {
         error_report("failed to send a req, %s", strerror(errno));
-        ret = -1;
     }
 
     return ret;
@@ -704,17 +521,19 @@  static int do_req(int sockfd, SheepdogReq *hdr, void *data,
                   unsigned int *wlen, unsigned int *rlen)
 {
     int ret;
+    struct iovec iov;
 
+    socket_set_block(sockfd);
     ret = send_req(sockfd, hdr, data, wlen);
-    if (ret) {
-        ret = -1;
+    if (ret < 0) {
         goto out;
     }
 
-    ret = do_read(sockfd, hdr, sizeof(*hdr));
-    if (ret) {
+    iov.iov_base = hdr;
+    iov.iov_len = sizeof(*hdr);
+    ret = qemu_recvv(sockfd, &iov, sizeof(*hdr), 0);
+    if (ret < 0) {
         error_report("failed to get a rsp, %s", strerror(errno));
-        ret = -1;
         goto out;
     }
 
@@ -723,15 +542,17 @@  static int do_req(int sockfd, SheepdogReq *hdr, void *data,
     }
 
     if (*rlen) {
-        ret = do_read(sockfd, data, *rlen);
-        if (ret) {
+        iov.iov_base = data;
+        iov.iov_len = *rlen;
+        ret = qemu_recvv(sockfd, &iov, *rlen, 0);
+        if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
-            ret = -1;
             goto out;
         }
     }
     ret = 0;
 out:
+    socket_set_nonblock(sockfd);
     return ret;
 }
 
@@ -793,8 +614,8 @@  static void coroutine_fn aio_read_response(void *opaque)
     }
 
     /* read a header */
-    ret = do_read(fd, &rsp, sizeof(rsp));
-    if (ret) {
+    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
+    if (ret < 0) {
         error_report("failed to get the header, %s", strerror(errno));
         goto out;
     }
@@ -839,9 +660,9 @@  static void coroutine_fn aio_read_response(void *opaque)
         }
         break;
     case AIOCB_READ_UDATA:
-        ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
-                       aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
+                            aio_req->iov_offset);
+        if (ret < 0) {
             error_report("failed to get the data, %s", strerror(errno));
             goto out;
         }
@@ -1114,15 +935,15 @@  static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     set_cork(s->fd, 1);
 
     /* send a header */
-    ret = do_write(s->fd, &hdr, sizeof(hdr));
-    if (ret) {
+    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
+    if (ret < 0) {
         error_report("failed to send a req, %s", strerror(errno));
         return -EIO;
     }
 
     if (wlen) {
-        ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
-        if (ret) {
+        ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
+        if (ret < 0) {
             error_report("failed to send a data, %s", strerror(errno));
             return -EIO;
         }
diff --git a/cutils.c b/cutils.c
index c91f887..b302020 100644
--- a/cutils.c
+++ b/cutils.c
@@ -25,6 +25,9 @@ 
 #include "host-utils.h"
 #include <math.h>
 
+#include "qemu_socket.h"
+#include "qemu-coroutine.h"
+
 void pstrcpy(char *buf, int buf_size, const char *str)
 {
     int c;
@@ -415,3 +418,177 @@  int64_t strtosz(const char *nptr, char **end)
 {
     return strtosz_suffix(nptr, end, STRTOSZ_DEFSUFFIX_MB);
 }
+
+/*
+ * Send/recv data with iovec buffers
+ *
+ * This function send/recv data from/to the iovec buffer directly.
+ * The first `offset' bytes in the iovec buffer are skipped and next
+ * `len' bytes are used.
+ *
+ * For example,
+ *
+ *   do_sendv_recvv(sockfd, iov, len, offset, 1);
+ *
+ * is equal to
+ *
+ *   char *buf = malloc(size);
+ *   iov_to_buf(iov, iovcnt, buf, offset, size);
+ *   send(sockfd, buf, size, 0);
+ *   free(buf);
+ */
+static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
+                          int do_sendv)
+{
+    int ret, diff, iovlen;
+    struct iovec *last_iov;
+
+    /* last_iov is inclusive, so count from one.  */
+    iovlen = 1;
+    last_iov = iov;
+    len += offset;
+
+    while (last_iov->iov_len < len) {
+        len -= last_iov->iov_len;
+
+        last_iov++;
+        iovlen++;
+    }
+
+    diff = last_iov->iov_len - len;
+    last_iov->iov_len -= diff;
+
+    while (iov->iov_len <= offset) {
+        offset -= iov->iov_len;
+
+        iov++;
+        iovlen--;
+    }
+
+    iov->iov_base = (char *) iov->iov_base + offset;
+    iov->iov_len -= offset;
+
+    {
+#ifdef CONFIG_IOVEC
+        struct msghdr msg;
+        memset(&msg, 0, sizeof(msg));
+        msg.msg_iov = iov;
+        msg.msg_iovlen = iovlen;
+
+        do {
+            if (do_sendv) {
+                ret = sendmsg(sockfd, &msg, 0);
+            } else {
+                ret = recvmsg(sockfd, &msg, 0);
+            }
+        } while (ret == -1 && errno == EINTR);
+#else
+        struct iovec *p = iov;
+        ret = 0;
+        while (iovlen > 0) {
+            int rc;
+            if (do_sendv) {
+                rc = send(sockfd, p->iov_base, p->iov_len, 0);
+            } else {
+                rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
+            }
+            if (rc == -1) {
+                if (errno == EINTR) {
+                    continue;
+                }
+                if (ret == 0) {
+                    ret = -1;
+                }
+                break;
+            }
+            iovlen--, p++;
+            ret += rc;
+        }
+#endif
+    }
+
+    /* Undo the changes above */
+    iov->iov_base = (char *) iov->iov_base - offset;
+    iov->iov_len += offset;
+    last_iov->iov_len += diff;
+    return ret;
+}
+
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+    return do_sendv_recvv(sockfd, iov, len, iov_offset, 0);
+}
+
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+    return do_sendv_recvv(sockfd, iov, len, iov_offset, 1);
+}
+
+int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov,
+                               int len, int iov_offset)
+{
+    int total = 0;
+    int ret;
+    while (len) {
+        ret = qemu_recvv(sockfd, iov, len, iov_offset + total);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                qemu_coroutine_yield();
+                continue;
+            }
+            if (total == 0) {
+                total = -1;
+            }
+            break;
+        }
+        if (ret == 0) {
+            break;
+        }
+        total += ret, len -= ret;
+    }
+
+    return total;
+}
+
+int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov,
+                               int len, int iov_offset)
+{
+    int total = 0;
+    int ret;
+    while (len) {
+        ret = qemu_sendv(sockfd, iov, len, iov_offset + total);
+        if (ret < 0) {
+            if (errno == EAGAIN) {
+                qemu_coroutine_yield();
+                continue;
+            }
+            if (total == 0) {
+                total = -1;
+            }
+            break;
+        }
+        total += ret, len -= ret;
+    }
+
+    return total;
+}
+
+int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len)
+{
+    struct iovec iov;
+
+    iov.iov_base = buf;
+    iov.iov_len = len;
+
+    return qemu_co_recvv(sockfd, &iov, len, 0);
+}
+
+int coroutine_fn qemu_co_send(int sockfd, void *buf, int len)
+{
+    struct iovec iov;
+
+    iov.iov_base = buf;
+    iov.iov_len = len;
+
+    return qemu_co_sendv(sockfd, &iov, len, 0);
+}
diff --git a/qemu-common.h b/qemu-common.h
index 404c421..d7ee4a8 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -203,6 +203,9 @@  int qemu_pipe(int pipefd[2]);
 #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags)
 #endif
 
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset);
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset);
+
 /* Error handling.  */
 
 void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2);
@@ -302,6 +305,33 @@  struct qemu_work_item {
 void qemu_init_vcpu(void *env);
 #endif
 
+/**
+ * Sends an iovec (or optionally a part of it) down a socket, yielding
+ * when the socket is full.
+ */
+int qemu_co_sendv(int sockfd, struct iovec *iov,
+                  int len, int iov_offset);
+
+/**
+ * Receives data into an iovec (or optionally into a part of it) from
+ * a socket, yielding when there is no data in the socket.
+ */
+int qemu_co_recvv(int sockfd, struct iovec *iov,
+                  int len, int iov_offset);
+
+
+/**
+ * Sends a buffer down a socket, yielding when the socket is full.
+ */
+int qemu_co_send(int sockfd, void *buf, int len);
+
+/**
+ * Receives data into a buffer from a socket, yielding when there
+ * is no data in the socket.
+ */
+int qemu_co_recv(int sockfd, void *buf, int len);
+
+
 typedef struct QEMUIOVector {
     struct iovec *iov;
     int niov;