diff mbox

[v3,04/13] nbd: convert to using I/O channels for actual socket I/O

Message ID 1453208963-16834-5-git-send-email-berrange@redhat.com
State New
Headers show

Commit Message

Daniel P. Berrangé Jan. 19, 2016, 1:09 p.m. UTC
Now that all callers are converted to use I/O channels for
initial connection setup, it is possible to switch the core
NBD protocol handling core over to use QIOChannel APIs for
actual sockets I/O.

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
---
 block/nbd-client.c  |  19 +++----
 blockdev-nbd.c      |   6 +--
 include/block/nbd.h |  20 ++++---
 nbd/client.c        |  40 +++++++-------
 nbd/common.c        | 112 ++++++++++++++++++++++++++++++---------
 nbd/nbd-internal.h  |  18 +++----
 nbd/server.c        | 150 +++++++++++++++++++++++++++++-----------------------
 qemu-nbd.c          |  10 ++--
 8 files changed, 226 insertions(+), 149 deletions(-)

Comments

Paolo Bonzini Jan. 19, 2016, 4:08 p.m. UTC | #1
On 19/01/2016 14:09, Daniel P. Berrange wrote:
> + * This will update @iov so that its head is advanced
> + * by @skip bytes. To do this, zero or more complete
> + * elements of @iov will be skipped over. The new head
> + * of @iov will then have its base & len updated to
> + * skip the remaining number of bytes. @oldiov will
> + * hold the original data from the new head of @iov.
> + */
> +static void nbd_skip_iov(size_t skip,
> +                         struct iovec **iov,
> +                         int *niov,
> +                         struct iovec *oldiov)
>  {
> -    size_t offset = 0;
> -    int err;
> -
> -    if (qemu_in_coroutine()) {
> -        if (do_read) {
> -            return qemu_co_recv(fd, buffer, size);
> +    ssize_t done = 0;
> +    size_t i;
> +    for (i = 0; i < *niov; i++) {
> +        if ((*iov)[i].iov_len <= skip) {
> +            done += (*iov)[i].iov_len;
> +            skip -= (*iov)[i].iov_len;
>          } else {
> -            return qemu_co_send(fd, buffer, size);
> +            done += skip;
> +            oldiov->iov_base = (*iov)[i].iov_base;
> +            oldiov->iov_len = (*iov)[i].iov_len;
> +            (*iov)[i].iov_len -= skip;
> +            (*iov)[i].iov_base += skip;
> +            *iov = *iov + i;
> +            *niov = *niov - i;
> +            break;
>          }
>      }

Can you use iov_discard_front instead?

> 
> +                /* XXX see about using qio_channel_yield() in
> +                 * future if we can use normal watches instead
> +                 * of the AIO handlers */
> +                qemu_coroutine_yield();

This breaks dataplane.  The simplest solution is to change
qio_channel_yield() to just

    qio_channel_aio_yield(ioc, qemu_get_aio_context(), condition);

where qio_channel_aio_yield(QIOChannel*, AioContext *, int) is
implemented by the backends.  The implementation could then use
aio_set_fd_handler, and it would be almost trivial because it would
reuse common code just like the one in io/channel-watch.c.

Websock is the only complicated case.  I think it's fine if you simply
must not use qio_channel_{,aio_}yield() for websock channels.

That said, I'm not sure you can even rely on nbd_negotiate_continue to
run in the main I/O thread.  So, another possibility is to add an
alternative watch API similar to AioContext's:

    qio_channel_set_handler(QIOChannel*, AioContext*, void(*in)(void*),
                            void(*out)(void*), void *opaque)

whose implementation would again be simple for backends other than
websock, and probably trivial for backends other than buffer and
websock.  Again, it wouldn't be a problem to skip buffer and websock for
the time being (read, until someone actually needs it).

Converting the block layer to use GMainLoop, unfortunately, would lose
too much performance.

Paolo
Daniel P. Berrangé Jan. 19, 2016, 4:52 p.m. UTC | #2
On Tue, Jan 19, 2016 at 05:08:12PM +0100, Paolo Bonzini wrote:
> 
> 
> On 19/01/2016 14:09, Daniel P. Berrange wrote:
> > + * This will update @iov so that its head is advanced
> > + * by @skip bytes. To do this, zero or more complete
> > + * elements of @iov will be skipped over. The new head
> > + * of @iov will then have its base & len updated to
> > + * skip the remaining number of bytes. @oldiov will
> > + * hold the original data from the new head of @iov.
> > + */
> > +static void nbd_skip_iov(size_t skip,
> > +                         struct iovec **iov,
> > +                         int *niov,
> > +                         struct iovec *oldiov)
> >  {
> > -    size_t offset = 0;
> > -    int err;
> > -
> > -    if (qemu_in_coroutine()) {
> > -        if (do_read) {
> > -            return qemu_co_recv(fd, buffer, size);
> > +    ssize_t done = 0;
> > +    size_t i;
> > +    for (i = 0; i < *niov; i++) {
> > +        if ((*iov)[i].iov_len <= skip) {
> > +            done += (*iov)[i].iov_len;
> > +            skip -= (*iov)[i].iov_len;
> >          } else {
> > -            return qemu_co_send(fd, buffer, size);
> > +            done += skip;
> > +            oldiov->iov_base = (*iov)[i].iov_base;
> > +            oldiov->iov_len = (*iov)[i].iov_len;
> > +            (*iov)[i].iov_len -= skip;
> > +            (*iov)[i].iov_base += skip;
> > +            *iov = *iov + i;
> > +            *niov = *niov - i;
> > +            break;
> >          }
> >      }
> 
> Can you use iov_discard_front instead?

I didn't know that existed - i'll investigate whether it is
possible to use it.

> > +                /* XXX see about using qio_channel_yield() in
> > +                 * future if we can use normal watches instead
> > +                 * of the AIO handlers */
> > +                qemu_coroutine_yield();
> 
> This breaks dataplane.  The simplest solution is to change
> qio_channel_yield() to just
> 
>     qio_channel_aio_yield(ioc, qemu_get_aio_context(), condition);
> 
> where qio_channel_aio_yield(QIOChannel*, AioContext *, int) is
> implemented by the backends.  The implementation could then use
> aio_set_fd_handler, and it would be almost trivial because it would
> reuse common code just like the one in io/channel-watch.c.
> 
> Websock is the only complicated case.  I think it's fine if you simply
> must not use qio_channel_{,aio_}yield() for websock channels.
> 
> That said, I'm not sure you can even rely on nbd_negotiate_continue to
> run in the main I/O thread.  So, another possibility is to add an
> alternative watch API similar to AioContext's:
> 
>     qio_channel_set_handler(QIOChannel*, AioContext*, void(*in)(void*),
>                             void(*out)(void*), void *opaque)
> 
> whose implementation would again be simple for backends other than
> websock, and probably trivial for backends other than buffer and
> websock.  Again, it wouldn't be a problem to skip buffer and websock for
> the time being (read, until someone actually needs it).

I'll have a look at this and see what I can do. It shouldn't block
merging of this patch I assume, since the current call to
qemu_coroutine_yield() should be fine - it just my comment suggestion
that would not work. So I'll investigate what you suggest here wrt
to AioContext & QIOChannel as a later followup patch.

Regards,
Daniel
Paolo Bonzini Jan. 19, 2016, 6 p.m. UTC | #3
On 19/01/2016 17:52, Daniel P. Berrange wrote:
> I'll have a look at this and see what I can do. It shouldn't block
> merging of this patch I assume, since the current call to
> qemu_coroutine_yield() should be fine - it just my comment suggestion
> that would not work.

Yes, that is tied to negotiations being always in the main loop.  I'm
not entirely sure that is the case (e.g. a call bdrv_reopen could
perhaps be from somewhere else) but it's anyway no worse than the
current state.

Paolo
diff mbox

Patch

diff --git a/block/nbd-client.c b/block/nbd-client.c
index bf5b0a3..ed69d4b 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -71,7 +71,7 @@  static void nbd_reply_ready(void *opaque)
          * that another thread has done the same thing in parallel, so
          * the socket is not readable anymore.
          */
-        ret = nbd_receive_reply(s->sioc->fd, &s->reply);
+        ret = nbd_receive_reply(s->ioc, &s->reply);
         if (ret == -EAGAIN) {
             return;
         }
@@ -122,6 +122,7 @@  static int nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
+    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
     request->handle = INDEX_TO_HANDLE(s, i);
     s->send_coroutine = qemu_coroutine_self();
@@ -131,17 +132,17 @@  static int nbd_co_send_request(BlockDriverState *bs,
                        nbd_reply_ready, nbd_restart_write, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
         if (rc >= 0) {
-            ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 0);
             if (ret != request->len) {
                 rc = -EIO;
             }
         }
         qio_channel_set_cork(s->ioc, false);
     } else {
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
     }
     aio_set_fd_handler(aio_context, s->sioc->fd, false,
                        nbd_reply_ready, NULL, bs);
@@ -164,8 +165,8 @@  static void nbd_co_receive_reply(NbdClientSession *s,
         reply->error = EIO;
     } else {
         if (qiov && reply->error == 0) {
-            ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 1);
             if (ret != request->len) {
                 reply->error = EIO;
             }
@@ -372,7 +373,7 @@  void nbd_client_close(BlockDriverState *bs)
         return;
     }
 
-    nbd_send_request(client->sioc->fd, &request);
+    nbd_send_request(client->ioc, &request);
 
     nbd_teardown_connection(bs);
 }
@@ -387,7 +388,7 @@  int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc,
     logout("session init %s\n", export);
     qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 
-    ret = nbd_receive_negotiate(sioc->fd, export,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
                                 &client->nbdflags, &client->size, errp);
     if (ret < 0) {
         logout("Failed to negotiate with the NBD server\n");
diff --git a/blockdev-nbd.c b/blockdev-nbd.c
index 94b653d..7d4c415 100644
--- a/blockdev-nbd.c
+++ b/blockdev-nbd.c
@@ -26,7 +26,6 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
                            gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -34,10 +33,7 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
         return TRUE;
     }
 
-    fd = dup(cioc->fd);
-    if (fd >= 0) {
-        nbd_client_new(NULL, fd, nbd_client_put);
-    }
+    nbd_client_new(NULL, cioc, nbd_client_put);
     object_unref(OBJECT(cioc));
     return TRUE;
 }
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 7eccb41..1080ef8 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -23,6 +23,7 @@ 
 
 #include "qemu-common.h"
 #include "qemu/option.h"
+#include "io/channel-socket.h"
 
 struct nbd_request {
     uint32_t magic;
@@ -73,12 +74,17 @@  enum {
 /* Maximum size of a single READ/WRITE data buffer */
 #define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read);
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp);
-int nbd_init(int fd, int csock, uint32_t flags, off_t size);
-ssize_t nbd_send_request(int csock, struct nbd_request *request);
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size);
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request);
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply);
 int nbd_client(int fd);
 int nbd_disconnect(int fd);
 
@@ -98,7 +104,9 @@  NBDExport *nbd_export_find(const char *name);
 void nbd_export_set_name(NBDExport *exp, const char *name);
 void nbd_export_close_all(void);
 
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *));
+void nbd_client_new(NBDExport *exp,
+                    QIOChannelSocket *sioc,
+                    void (*close)(NBDClient *));
 void nbd_client_get(NBDClient *client);
 void nbd_client_put(NBDClient *client);
 
diff --git a/nbd/client.c b/nbd/client.c
index 83df7ba..07e1fed 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -70,7 +70,7 @@  static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
 
 */
 
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp)
 {
     char buf[256];
@@ -82,7 +82,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
 
     rc = -EINVAL;
 
-    if (read_sync(csock, buf, 8) != 8) {
+    if (read_sync(ioc, buf, 8) != 8) {
         error_setg(errp, "Failed to read data");
         goto fail;
     }
@@ -108,7 +108,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
         goto fail;
     }
 
-    if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         error_setg(errp, "Failed to read magic");
         goto fail;
     }
@@ -129,35 +129,35 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
             }
             goto fail;
         }
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read server flags");
             goto fail;
         }
         *flags = be16_to_cpu(tmp) << 16;
         /* reserved for future use */
-        if (write_sync(csock, &reserved, sizeof(reserved)) !=
+        if (write_sync(ioc, &reserved, sizeof(reserved)) !=
             sizeof(reserved)) {
             error_setg(errp, "Failed to read reserved field");
             goto fail;
         }
         /* write the export name */
         magic = cpu_to_be64(magic);
-        if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
             error_setg(errp, "Failed to send export name magic");
             goto fail;
         }
         opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
-        if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+        if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
             error_setg(errp, "Failed to send export name option number");
             goto fail;
         }
         namesize = cpu_to_be32(strlen(name));
-        if (write_sync(csock, &namesize, sizeof(namesize)) !=
+        if (write_sync(ioc, &namesize, sizeof(namesize)) !=
             sizeof(namesize)) {
             error_setg(errp, "Failed to send export name length");
             goto fail;
         }
-        if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
+        if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) {
             error_setg(errp, "Failed to send export name");
             goto fail;
         }
@@ -174,7 +174,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
         }
     }
 
-    if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
+    if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) {
         error_setg(errp, "Failed to read export length");
         goto fail;
     }
@@ -182,19 +182,19 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
     TRACE("Size is %" PRIu64, *size);
 
     if (!name) {
-        if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
+        if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags = be32_to_cpup(flags);
     } else {
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags |= be16_to_cpu(tmp);
     }
-    if (read_sync(csock, &buf, 124) != 124) {
+    if (read_sync(ioc, &buf, 124) != 124) {
         error_setg(errp, "Failed to read reserved block");
         goto fail;
     }
@@ -205,11 +205,11 @@  fail:
 }
 
 #ifdef __linux__
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size)
 {
     TRACE("Setting NBD socket");
 
-    if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
+    if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) {
         int serrno = errno;
         LOG("Failed to set NBD socket");
         return -serrno;
@@ -282,7 +282,7 @@  int nbd_client(int fd)
     return ret;
 }
 #else
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *ioc, uint32_t flags, off_t size)
 {
     return -ENOTSUP;
 }
@@ -293,7 +293,7 @@  int nbd_client(int fd)
 }
 #endif
 
-ssize_t nbd_send_request(int csock, struct nbd_request *request)
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     ssize_t ret;
@@ -308,7 +308,7 @@  ssize_t nbd_send_request(int csock, struct nbd_request *request)
           "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
           request->from, request->len, request->handle, request->type);
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -320,13 +320,13 @@  ssize_t nbd_send_request(int csock, struct nbd_request *request)
     return 0;
 }
 
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
diff --git a/nbd/common.c b/nbd/common.c
index 7b089b0..9f89372 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -18,47 +18,107 @@ 
 
 #include "nbd-internal.h"
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
+/**
+ * @skip: number of bytes to advance head of @iov
+ * @iov: pointer to iov array, updated on success
+ * @niov: number of elements in @iov, updated on success
+ * @oldiov: pointer single element to hold old info from @iov
+ *
+ * This will update @iov so that its head is advanced
+ * by @skip bytes. To do this, zero or more complete
+ * elements of @iov will be skipped over. The new head
+ * of @iov will then have its base & len updated to
+ * skip the remaining number of bytes. @oldiov will
+ * hold the original data from the new head of @iov.
+ */
+static void nbd_skip_iov(size_t skip,
+                         struct iovec **iov,
+                         int *niov,
+                         struct iovec *oldiov)
 {
-    size_t offset = 0;
-    int err;
-
-    if (qemu_in_coroutine()) {
-        if (do_read) {
-            return qemu_co_recv(fd, buffer, size);
+    ssize_t done = 0;
+    size_t i;
+    for (i = 0; i < *niov; i++) {
+        if ((*iov)[i].iov_len <= skip) {
+            done += (*iov)[i].iov_len;
+            skip -= (*iov)[i].iov_len;
         } else {
-            return qemu_co_send(fd, buffer, size);
+            done += skip;
+            oldiov->iov_base = (*iov)[i].iov_base;
+            oldiov->iov_len = (*iov)[i].iov_len;
+            (*iov)[i].iov_len -= skip;
+            (*iov)[i].iov_base += skip;
+            *iov = *iov + i;
+            *niov = *niov - i;
+            break;
         }
     }
+}
+
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read)
+{
+    ssize_t done = 0;
+    size_t iovlen = iov_size(iov, niov);
+    struct iovec oldiov = { NULL, 0 };
+    Error *local_err = NULL;
+
+    /* Shouldn't happen but just to be sure */
+    if (offset > iovlen) {
+        return -EINVAL;
+    }
+    iovlen -= offset;
+    if (length > iovlen) {
+        return -EINVAL;
+    }
 
-    while (offset < size) {
+    while (done < length) {
         ssize_t len;
+        struct iovec *cur = iov;
+        int curcnt = niov;
+
+        nbd_skip_iov(offset + done, &cur, &curcnt, &oldiov);
 
         if (do_read) {
-            len = qemu_recv(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_readv(ioc, cur, curcnt, &local_err);
         } else {
-            len = send(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_writev(ioc, cur, curcnt, &local_err);
         }
-
-        if (len < 0) {
-            err = socket_error();
-
-            /* recoverable error */
-            if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) {
-                continue;
+        if (oldiov.iov_base) {
+            /* Restore original caller's info in @iov */
+            cur[0].iov_base = oldiov.iov_base;
+            cur[0].iov_len = oldiov.iov_len;
+            oldiov.iov_base = NULL;
+            oldiov.iov_len = 0;
+        }
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                /* XXX see about using qio_channel_yield() in
+                 * future if we can use normal watches instead
+                 * of the AIO handlers */
+                qemu_coroutine_yield();
+            } else {
+                qio_channel_wait(ioc,
+                                 do_read ? G_IO_IN : G_IO_OUT);
             }
-
-            /* unrecoverable error */
-            return -err;
+            continue;
+        }
+        if (len < 0) {
+            TRACE("I/O error: %s", error_get_pretty(local_err));
+            error_free(local_err);
+            /* XXX handle Error objects */
+            return -EIO;
         }
 
-        /* eof */
-        if (len == 0) {
+        if (do_read && len == 0) {
             break;
         }
 
-        offset += len;
+        done += len;
     }
-
-    return offset;
+    return done;
 }
diff --git a/nbd/nbd-internal.h b/nbd/nbd-internal.h
index c0a6575..9960034 100644
--- a/nbd/nbd-internal.h
+++ b/nbd/nbd-internal.h
@@ -13,6 +13,7 @@ 
 #include "sysemu/block-backend.h"
 
 #include "qemu/coroutine.h"
+#include "qemu/iov.h"
 
 #include <errno.h>
 #include <string.h>
@@ -29,7 +30,6 @@ 
 #include <linux/fs.h>
 #endif
 
-#include "qemu/sockets.h"
 #include "qemu/queue.h"
 #include "qemu/main-loop.h"
 
@@ -90,24 +90,22 @@ 
 #define NBD_EINVAL     22
 #define NBD_ENOSPC     28
 
-static inline ssize_t read_sync(int fd, void *buffer, size_t size)
+static inline ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
     /* Sockets are kept in blocking mode in the negotiation phase.  After
      * that, a non-readable socket simply means that another thread stole
      * our request/reply.  Synchronization is done with recv_coroutine, so
      * that this is coroutine-safe.
      */
-    return nbd_wr_sync(fd, buffer, size, true);
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, true);
 }
 
-static inline ssize_t write_sync(int fd, void *buffer, size_t size)
+static inline ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
-    int ret;
-    do {
-        /* For writes, we do expect the socket to be writable.  */
-        ret = nbd_wr_sync(fd, buffer, size, false);
-    } while (ret == -EAGAIN);
-    return ret;
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
+
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, false);
 }
 
 #endif
diff --git a/nbd/server.c b/nbd/server.c
index c29ba5f..929db9c 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -73,7 +73,8 @@  struct NBDClient {
     void (*close)(NBDClient *client);
 
     NBDExport *exp;
-    int sock;
+    QIOChannelSocket *sioc; /* The underlying data channel */
+    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 
     Coroutine *recv_coroutine;
 
@@ -93,45 +94,56 @@  static void nbd_set_handlers(NBDClient *client);
 static void nbd_unset_handlers(NBDClient *client);
 static void nbd_update_can_read(NBDClient *client);
 
-static void nbd_negotiate_continue(void *opaque)
+static gboolean nbd_negotiate_continue(QIOChannel *ioc,
+                                       GIOCondition condition,
+                                       void *opaque)
 {
     qemu_coroutine_enter(opaque, NULL);
+    return TRUE;
 }
 
-static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size)
 {
     ssize_t ret;
+    guint watch;
 
     assert(qemu_in_coroutine());
     /* Negotiation are always in main loop. */
-    qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
-                        qemu_coroutine_self());
-    ret = read_sync(fd, buffer, size);
-    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    watch = qio_channel_add_watch(ioc,
+                                  G_IO_IN,
+                                  nbd_negotiate_continue,
+                                  qemu_coroutine_self(),
+                                  NULL);
+    ret = read_sync(ioc, buffer, size);
+    g_source_remove(watch);
     return ret;
 
 }
 
-static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size)
 {
     ssize_t ret;
+    guint watch;
 
     assert(qemu_in_coroutine());
     /* Negotiation are always in main loop. */
-    qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
-                        qemu_coroutine_self());
-    ret = write_sync(fd, buffer, size);
-    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    watch = qio_channel_add_watch(ioc,
+                                  G_IO_OUT,
+                                  nbd_negotiate_continue,
+                                  qemu_coroutine_self(),
+                                  NULL);
+    ret = write_sync(ioc, buffer, size);
+    g_source_remove(watch);
     return ret;
 }
 
-static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
+static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size)
 {
     ssize_t ret, dropped = size;
     uint8_t *buffer = g_malloc(MIN(65536, size));
 
     while (size > 0) {
-        ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
+        ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size));
         if (ret < 0) {
             g_free(buffer);
             return ret;
@@ -172,66 +184,66 @@  static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
 
 */
 
-static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
 {
     uint64_t magic;
     uint32_t len;
 
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (rep magic)");
         return -EINVAL;
     }
     opt = cpu_to_be32(opt);
-    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (rep opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(type);
-    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (rep type)");
         return -EINVAL;
     }
     len = cpu_to_be32(0);
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (rep data length)");
         return -EINVAL;
     }
     return 0;
 }
 
-static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
+static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp)
 {
     uint64_t magic, name_len;
     uint32_t opt, type, len;
 
     name_len = strlen(exp->name);
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (magic)");
         return -EINVAL;
      }
     opt = cpu_to_be32(NBD_OPT_LIST);
-    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(NBD_REP_SERVER);
-    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (reply type)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len + sizeof(len));
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len);
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
-    if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
+    if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) {
         LOG("write failed (buffer)");
         return -EINVAL;
     }
@@ -240,30 +252,29 @@  static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
 
 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
 {
-    int csock;
     NBDExport *exp;
 
-    csock = client->sock;
     if (length) {
-        if (nbd_negotiate_drop_sync(csock, length) != length) {
+        if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
             return -EIO;
         }
-        return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
+        return nbd_negotiate_send_rep(client->ioc,
+                                      NBD_REP_ERR_INVALID, NBD_OPT_LIST);
     }
 
     /* For each export, send a NBD_REP_SERVER reply. */
     QTAILQ_FOREACH(exp, &exports, next) {
-        if (nbd_negotiate_send_rep_list(csock, exp)) {
+        if (nbd_negotiate_send_rep_list(client->ioc, exp)) {
             return -EINVAL;
         }
     }
     /* Finish with a NBD_REP_ACK. */
-    return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+    return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST);
 }
 
 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
 {
-    int rc = -EINVAL, csock = client->sock;
+    int rc = -EINVAL;
     char name[256];
 
     /* Client sends:
@@ -274,7 +285,7 @@  static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
         LOG("Bad length received");
         goto fail;
     }
-    if (nbd_negotiate_read(csock, name, length) != length) {
+    if (nbd_negotiate_read(client->ioc, name, length) != length) {
         LOG("read failed");
         goto fail;
     }
@@ -295,7 +306,6 @@  fail:
 
 static int nbd_negotiate_options(NBDClient *client)
 {
-    int csock = client->sock;
     uint32_t flags;
 
     /* Client sends:
@@ -312,7 +322,8 @@  static int nbd_negotiate_options(NBDClient *client)
         ...           Rest of request
     */
 
-    if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+    if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) !=
+        sizeof(flags)) {
         LOG("read failed");
         return -EIO;
     }
@@ -328,7 +339,8 @@  static int nbd_negotiate_options(NBDClient *client)
         uint32_t tmp, length;
         uint64_t magic;
 
-        if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) !=
+            sizeof(magic)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -338,13 +350,13 @@  static int nbd_negotiate_options(NBDClient *client)
             return -EINVAL;
         }
 
-        if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (nbd_negotiate_read(client->ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             LOG("read failed");
             return -EINVAL;
         }
 
-        if (nbd_negotiate_read(csock, &length,
-                               sizeof(length)) != sizeof(length)) {
+        if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) !=
+            sizeof(length)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -368,7 +380,7 @@  static int nbd_negotiate_options(NBDClient *client)
         default:
             tmp = be32_to_cpu(tmp);
             LOG("Unsupported option 0x%x", tmp);
-            nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+            nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp);
             return -EINVAL;
         }
     }
@@ -382,7 +394,6 @@  typedef struct {
 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
 {
     NBDClient *client = data->client;
-    int csock = client->sock;
     char buf[8 + 8 + 8 + 128];
     int rc;
     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
@@ -407,6 +418,7 @@  static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
         [28 .. 151]   reserved     (0)
      */
 
+    qio_channel_set_blocking(client->ioc, false, NULL);
     rc = -EINVAL;
 
     TRACE("Beginning negotiation.");
@@ -423,12 +435,12 @@  static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
     }
 
     if (client->exp) {
-        if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
+        if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) {
             LOG("write failed");
             goto fail;
         }
     } else {
-        if (nbd_negotiate_write(csock, buf, 18) != 18) {
+        if (nbd_negotiate_write(client->ioc, buf, 18) != 18) {
             LOG("write failed");
             goto fail;
         }
@@ -441,8 +453,8 @@  static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
         assert ((client->exp->nbdflags & ~65535) == 0);
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
-        if (nbd_negotiate_write(csock, buf + 18,
-                                sizeof(buf) - 18) != sizeof(buf) - 18) {
+        if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) !=
+            sizeof(buf) - 18) {
             LOG("write failed");
             goto fail;
         }
@@ -472,13 +484,13 @@  int nbd_disconnect(int fd)
 }
 #endif
 
-static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
+static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -513,7 +525,7 @@  static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
     return 0;
 }
 
-static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
+static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     ssize_t ret;
@@ -531,7 +543,7 @@  static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
 
     TRACE("Sending response to client");
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -559,8 +571,8 @@  void nbd_client_put(NBDClient *client)
         assert(client->closing);
 
         nbd_unset_handlers(client);
-        close(client->sock);
-        client->sock = -1;
+        object_unref(OBJECT(client->sioc));
+        object_unref(OBJECT(client->ioc));
         if (client->exp) {
             QTAILQ_REMOVE(&client->exp->clients, client, next);
             nbd_export_put(client->exp);
@@ -580,7 +592,8 @@  static void client_close(NBDClient *client)
     /* Force requests to finish.  They will drop their own references,
      * then we'll close the socket and free the NBDClient.
      */
-    shutdown(client->sock, 2);
+    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
+                         NULL);
 
     /* Also tell the client, so that they release their reference.  */
     if (client->close) {
@@ -773,25 +786,25 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
                                  int len)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
     ssize_t rc, ret;
 
+    g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine = qemu_coroutine_self();
     nbd_set_handlers(client);
 
     if (!len) {
-        rc = nbd_send_reply(csock, reply);
+        rc = nbd_send_reply(client->ioc, reply);
     } else {
-        socket_set_cork(csock, 1);
-        rc = nbd_send_reply(csock, reply);
+        qio_channel_set_cork(client->ioc, true);
+        rc = nbd_send_reply(client->ioc, reply);
         if (rc >= 0) {
-            ret = qemu_co_send(csock, req->data, len);
+            ret = write_sync(client->ioc, req->data, len);
             if (ret != len) {
                 rc = -EIO;
             }
         }
-        socket_set_cork(csock, 0);
+        qio_channel_set_cork(client->ioc, false);
     }
 
     client->send_coroutine = NULL;
@@ -803,14 +816,14 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
     uint32_t command;
     ssize_t rc;
 
+    g_assert(qemu_in_coroutine());
     client->recv_coroutine = qemu_coroutine_self();
     nbd_update_can_read(client);
 
-    rc = nbd_receive_request(csock, request);
+    rc = nbd_receive_request(client->ioc, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
             rc = -EIO;
@@ -845,7 +858,7 @@  static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
     if (command == NBD_CMD_WRITE) {
         TRACE("Reading %u byte(s)", request->len);
 
-        if (qemu_co_recv(csock, req->data, request->len) != request->len) {
+        if (read_sync(client->ioc, req->data, request->len) != request->len) {
             LOG("reading from socket failed");
             rc = -EIO;
             goto out;
@@ -1040,7 +1053,7 @@  static void nbd_restart_write(void *opaque)
 static void nbd_set_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true,
                            client->can_read ? nbd_read : NULL,
                            client->send_coroutine ? nbd_restart_write : NULL,
@@ -1051,7 +1064,7 @@  static void nbd_set_handlers(NBDClient *client)
 static void nbd_unset_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true, NULL, NULL, NULL);
     }
 }
@@ -1093,7 +1106,9 @@  out:
     g_free(data);
 }
 
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
+void nbd_client_new(NBDExport *exp,
+                    QIOChannelSocket *sioc,
+                    void (*close_fn)(NBDClient *))
 {
     NBDClient *client;
     NBDClientNewData *data = g_new(NBDClientNewData, 1);
@@ -1101,7 +1116,10 @@  void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
     client = g_malloc0(sizeof(NBDClient));
     client->refcount = 1;
     client->exp = exp;
-    client->sock = csock;
+    client->sioc = sioc;
+    object_ref(OBJECT(client->sioc));
+    client->ioc = QIO_CHANNEL(sioc);
+    object_ref(OBJECT(client->ioc));
     client->can_read = true;
     client->close = close_fn;
 
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 1c9e3ea..2346e99 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -252,7 +252,7 @@  static void *nbd_client_thread(void *arg)
         goto out;
     }
 
-    ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags,
                                 &size, &local_error);
     if (ret < 0) {
         if (local_error) {
@@ -268,7 +268,7 @@  static void *nbd_client_thread(void *arg)
         goto out_socket;
     }
 
-    ret = nbd_init(fd, sioc->fd, nbdflags, size);
+    ret = nbd_init(fd, sioc, nbdflags, size);
     if (ret < 0) {
         goto out_fd;
     }
@@ -328,7 +328,6 @@  static void nbd_client_closed(NBDClient *client)
 static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -343,10 +342,7 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
 
     nb_fds++;
     nbd_update_server_watch();
-    fd = dup(cioc->fd);
-    if (fd >= 0) {
-        nbd_client_new(exp, fd, nbd_client_closed);
-    }
+    nbd_client_new(exp, cioc, nbd_client_closed);
     object_unref(OBJECT(cioc));
 
     return TRUE;