diff mbox

[v2,06/15] nbd: convert to using I/O channels for actual socket I/O

Message ID 1452524459-4132-7-git-send-email-berrange@redhat.com
State New
Headers show

Commit Message

Daniel P. Berrangé Jan. 11, 2016, 3 p.m. UTC
Now that all callers are converted to use I/O channels for
initial connection setup, it is possible to switch the core
NBD protocol handling core over to use QIOChannel APIs for
actual sockets I/O.

Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
---
 block/nbd-client.c  |  19 ++--
 blockdev-nbd.c      |   7 +-
 include/block/nbd.h |  19 ++--
 nbd.c               | 289 ++++++++++++++++++++++++++++++++--------------------
 qemu-nbd.c          |   9 +-
 5 files changed, 205 insertions(+), 138 deletions(-)
diff mbox

Patch

diff --git a/block/nbd-client.c b/block/nbd-client.c
index bf5b0a3..ed69d4b 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -71,7 +71,7 @@  static void nbd_reply_ready(void *opaque)
          * that another thread has done the same thing in parallel, so
          * the socket is not readable anymore.
          */
-        ret = nbd_receive_reply(s->sioc->fd, &s->reply);
+        ret = nbd_receive_reply(s->ioc, &s->reply);
         if (ret == -EAGAIN) {
             return;
         }
@@ -122,6 +122,7 @@  static int nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
+    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
     request->handle = INDEX_TO_HANDLE(s, i);
     s->send_coroutine = qemu_coroutine_self();
@@ -131,17 +132,17 @@  static int nbd_co_send_request(BlockDriverState *bs,
                        nbd_reply_ready, nbd_restart_write, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
         if (rc >= 0) {
-            ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 0);
             if (ret != request->len) {
                 rc = -EIO;
             }
         }
         qio_channel_set_cork(s->ioc, false);
     } else {
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
     }
     aio_set_fd_handler(aio_context, s->sioc->fd, false,
                        nbd_reply_ready, NULL, bs);
@@ -164,8 +165,8 @@  static void nbd_co_receive_reply(NbdClientSession *s,
         reply->error = EIO;
     } else {
         if (qiov && reply->error == 0) {
-            ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 1);
             if (ret != request->len) {
                 reply->error = EIO;
             }
@@ -372,7 +373,7 @@  void nbd_client_close(BlockDriverState *bs)
         return;
     }
 
-    nbd_send_request(client->sioc->fd, &request);
+    nbd_send_request(client->ioc, &request);
 
     nbd_teardown_connection(bs);
 }
@@ -387,7 +388,7 @@  int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc,
     logout("session init %s\n", export);
     qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 
-    ret = nbd_receive_negotiate(sioc->fd, export,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
                                 &client->nbdflags, &client->size, errp);
     if (ret < 0) {
         logout("Failed to negotiate with the NBD server\n");
diff --git a/blockdev-nbd.c b/blockdev-nbd.c
index 71ee021..7d4c415 100644
--- a/blockdev-nbd.c
+++ b/blockdev-nbd.c
@@ -26,7 +26,6 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
                            gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -34,11 +33,7 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition,
         return TRUE;
     }
 
-    fd = dup(cioc->fd);
-    if (fd >= 0 &&
-        !nbd_client_new(NULL, fd, nbd_client_put)) {
-        close(fd);
-    }
+    nbd_client_new(NULL, cioc, nbd_client_put);
     object_unref(OBJECT(cioc));
     return TRUE;
 }
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 65f409d..0230c7f 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -23,6 +23,7 @@ 
 
 #include "qemu-common.h"
 #include "qemu/option.h"
+#include "io/channel-socket.h"
 
 struct nbd_request {
     uint32_t magic;
@@ -73,12 +74,17 @@  enum {
 /* Maximum size of a single READ/WRITE data buffer */
 #define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read);
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp);
-int nbd_init(int fd, int csock, uint32_t flags, off_t size);
-ssize_t nbd_send_request(int csock, struct nbd_request *request);
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size);
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request);
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply);
 int nbd_client(int fd);
 int nbd_disconnect(int fd);
 
@@ -98,7 +104,8 @@  NBDExport *nbd_export_find(const char *name);
 void nbd_export_set_name(NBDExport *exp, const char *name);
 void nbd_export_close_all(void);
 
-NBDClient *nbd_client_new(NBDExport *exp, int csock,
+NBDClient *nbd_client_new(NBDExport *exp,
+                          QIOChannelSocket *sioc,
                           void (*close)(NBDClient *));
 void nbd_client_get(NBDClient *client);
 void nbd_client_put(NBDClient *client);
diff --git a/nbd.c b/nbd.c
index b3d9654..701b951 100644
--- a/nbd.c
+++ b/nbd.c
@@ -20,6 +20,7 @@ 
 #include "sysemu/block-backend.h"
 
 #include "qemu/coroutine.h"
+#include "qemu/iov.h"
 
 #include <errno.h>
 #include <string.h>
@@ -36,7 +37,6 @@ 
 #include <linux/fs.h>
 #endif
 
-#include "qemu/sockets.h"
 #include "qemu/queue.h"
 #include "qemu/main-loop.h"
 
@@ -171,7 +171,8 @@  struct NBDClient {
     void (*close)(NBDClient *client);
 
     NBDExport *exp;
-    int sock;
+    QIOChannelSocket *sioc; /* The underlying data channel */
+    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 
     Coroutine *recv_coroutine;
 
@@ -191,68 +192,127 @@  static void nbd_set_handlers(NBDClient *client);
 static void nbd_unset_handlers(NBDClient *client);
 static void nbd_update_can_read(NBDClient *client);
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
+/**
+ * @skip: number of bytes to advance head of @iov
+ * @iov: pointer to iov array, updated on success
+ * @niov: number of elements in @iov, updated on success
+ * @oldiov: pointer single element to hold old info from @iov
+ *
+ * This will update @iov so that its head is advanced
+ * by @skip bytes. To do this, zero or more complete
+ * elements of @iov will be skipped over. The new head
+ * of @iov will then have its base & len updated to
+ * skip the remaining number of bytes. @oldiov will
+ * hold the original data from the new head of @iov.
+ */
+static void nbd_skip_iov(size_t skip,
+                         struct iovec **iov,
+                         int *niov,
+                         struct iovec *oldiov)
 {
-    size_t offset = 0;
-    int err;
-
-    if (qemu_in_coroutine()) {
-        if (do_read) {
-            return qemu_co_recv(fd, buffer, size);
+    ssize_t done = 0;
+    size_t i;
+    for (i = 0; i < *niov; i++) {
+        if ((*iov)[i].iov_len <= skip) {
+            done += (*iov)[i].iov_len;
+            skip -= (*iov)[i].iov_len;
         } else {
-            return qemu_co_send(fd, buffer, size);
+            done += skip;
+            oldiov->iov_base = (*iov)[i].iov_base;
+            oldiov->iov_len = (*iov)[i].iov_len;
+            (*iov)[i].iov_len -= skip;
+            (*iov)[i].iov_base += skip;
+            *iov = *iov + i;
+            *niov = *niov - i;
+            break;
         }
     }
+}
+
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read)
+{
+    ssize_t done = 0;
+    size_t iovlen = iov_size(iov, niov);
+    struct iovec oldiov = { NULL, 0 };
+
+    /* Shouldn't happen but just to be sure */
+    if (offset > iovlen) {
+        return -EINVAL;
+    }
+    iovlen -= offset;
+    if (length > iovlen) {
+        return -EINVAL;
+    }
 
-    while (offset < size) {
+    while (done < length) {
         ssize_t len;
+        struct iovec *cur = iov;
+        int curcnt = niov;
+
+        nbd_skip_iov(offset + done, &cur, &curcnt, &oldiov);
 
         if (do_read) {
-            len = qemu_recv(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_readv(ioc, cur, curcnt, NULL);
         } else {
-            len = send(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_writev(ioc, cur, curcnt, NULL);
         }
-
-        if (len < 0) {
-            err = socket_error();
-
-            /* recoverable error */
-            if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) {
-                continue;
+        if (oldiov.iov_base) {
+            /* Restore original caller's info in @iov */
+            cur[0].iov_base = oldiov.iov_base;
+            cur[0].iov_len = oldiov.iov_len;
+            oldiov.iov_base = NULL;
+            oldiov.iov_len = 0;
+        }
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                /* XXX see about using qio_channel_yield() in
+                 * future if we can use normal watches instead
+                 * of the AIO handlers */
+                qemu_coroutine_yield();
+            } else {
+                qio_channel_wait(ioc,
+                                 do_read ? G_IO_IN : G_IO_OUT);
             }
-
-            /* unrecoverable error */
-            return -err;
+            continue;
+        }
+        if (len < 0) {
+            /* XXX handle Error objects */
+            return -EIO;
         }
 
-        /* eof */
-        if (len == 0) {
+        if (do_read && len == 0) {
             break;
         }
 
-        offset += len;
+        done += len;
     }
-
-    return offset;
+    return done;
 }
 
-static ssize_t read_sync(int fd, void *buffer, size_t size)
+
+static ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
     /* Sockets are kept in blocking mode in the negotiation phase.  After
      * that, a non-readable socket simply means that another thread stole
      * our request/reply.  Synchronization is done with recv_coroutine, so
      * that this is coroutine-safe.
      */
-    return nbd_wr_sync(fd, buffer, size, true);
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, true);
 }
 
-static ssize_t drop_sync(int fd, size_t size)
+static ssize_t drop_sync(QIOChannel *ioc, size_t size)
 {
     ssize_t ret, dropped = size;
     uint8_t *buffer = g_malloc(MIN(65536, size));
 
     while (size > 0) {
-        ret = read_sync(fd, buffer, MIN(65536, size));
+        ret = read_sync(ioc, buffer, MIN(65536, size));
         if (ret < 0) {
             g_free(buffer);
             return ret;
@@ -266,14 +326,11 @@  static ssize_t drop_sync(int fd, size_t size)
     return dropped;
 }
 
-static ssize_t write_sync(int fd, void *buffer, size_t size)
+static ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
-    int ret;
-    do {
-        /* For writes, we do expect the socket to be writable.  */
-        ret = nbd_wr_sync(fd, buffer, size, false);
-    } while (ret == -EAGAIN);
-    return ret;
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
+
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, false);
 }
 
 /* Basic flow for negotiation
@@ -303,66 +360,66 @@  static ssize_t write_sync(int fd, void *buffer, size_t size)
 
 */
 
-static int nbd_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
 {
     uint64_t magic;
     uint32_t len;
 
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (rep magic)");
         return -EINVAL;
     }
     opt = cpu_to_be32(opt);
-    if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (rep opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(type);
-    if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (write_sync(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (rep type)");
         return -EINVAL;
     }
     len = cpu_to_be32(0);
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (write_sync(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (rep data length)");
         return -EINVAL;
     }
     return 0;
 }
 
-static int nbd_send_rep_list(int csock, NBDExport *exp)
+static int nbd_send_rep_list(QIOChannel *ioc, NBDExport *exp)
 {
     uint64_t magic, name_len;
     uint32_t opt, type, len;
 
     name_len = strlen(exp->name);
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (magic)");
         return -EINVAL;
      }
     opt = cpu_to_be32(NBD_OPT_LIST);
-    if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(NBD_REP_SERVER);
-    if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (write_sync(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (reply type)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len + sizeof(len));
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (write_sync(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len);
-    if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (write_sync(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
-    if (write_sync(csock, exp->name, name_len) != name_len) {
+    if (write_sync(ioc, exp->name, name_len) != name_len) {
         LOG("write failed (buffer)");
         return -EINVAL;
     }
@@ -371,30 +428,31 @@  static int nbd_send_rep_list(int csock, NBDExport *exp)
 
 static int nbd_handle_list(NBDClient *client, uint32_t length)
 {
-    int csock;
+    QIOChannel *ioc;
     NBDExport *exp;
 
-    csock = client->sock;
+    ioc = client->ioc;
     if (length) {
-        if (drop_sync(csock, length) != length) {
+        if (drop_sync(ioc, length) != length) {
             return -EIO;
         }
-        return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
+        return nbd_send_rep(ioc, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
     }
 
     /* For each export, send a NBD_REP_SERVER reply. */
     QTAILQ_FOREACH(exp, &exports, next) {
-        if (nbd_send_rep_list(csock, exp)) {
+        if (nbd_send_rep_list(ioc, exp)) {
             return -EINVAL;
         }
     }
     /* Finish with a NBD_REP_ACK. */
-    return nbd_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+    return nbd_send_rep(ioc, NBD_REP_ACK, NBD_OPT_LIST);
 }
 
 static int nbd_handle_export_name(NBDClient *client, uint32_t length)
 {
-    int rc = -EINVAL, csock = client->sock;
+    int rc = -EINVAL;
+    QIOChannel *ioc = client->ioc;
     char name[256];
 
     /* Client sends:
@@ -405,7 +463,7 @@  static int nbd_handle_export_name(NBDClient *client, uint32_t length)
         LOG("Bad length received");
         goto fail;
     }
-    if (read_sync(csock, name, length) != length) {
+    if (read_sync(ioc, name, length) != length) {
         LOG("read failed");
         goto fail;
     }
@@ -426,7 +484,7 @@  fail:
 
 static int nbd_receive_options(NBDClient *client)
 {
-    int csock = client->sock;
+    QIOChannel *ioc = client->ioc;
     uint32_t flags;
 
     /* Client sends:
@@ -443,7 +501,7 @@  static int nbd_receive_options(NBDClient *client)
         ...           Rest of request
     */
 
-    if (read_sync(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+    if (read_sync(ioc, &flags, sizeof(flags)) != sizeof(flags)) {
         LOG("read failed");
         return -EIO;
     }
@@ -459,7 +517,7 @@  static int nbd_receive_options(NBDClient *client)
         uint32_t tmp, length;
         uint64_t magic;
 
-        if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -469,12 +527,12 @@  static int nbd_receive_options(NBDClient *client)
             return -EINVAL;
         }
 
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             LOG("read failed");
             return -EINVAL;
         }
 
-        if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
+        if (read_sync(ioc, &length, sizeof(length)) != sizeof(length)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -498,7 +556,7 @@  static int nbd_receive_options(NBDClient *client)
         default:
             tmp = be32_to_cpu(tmp);
             LOG("Unsupported option 0x%x", tmp);
-            nbd_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+            nbd_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp);
             return -EINVAL;
         }
     }
@@ -506,7 +564,7 @@  static int nbd_receive_options(NBDClient *client)
 
 static int nbd_send_negotiate(NBDClient *client)
 {
-    int csock = client->sock;
+    QIOChannel *ioc = client->ioc;
     char buf[8 + 8 + 8 + 128];
     int rc;
     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
@@ -531,7 +589,7 @@  static int nbd_send_negotiate(NBDClient *client)
         [28 .. 151]   reserved     (0)
      */
 
-    qemu_set_block(csock);
+    qio_channel_set_blocking(ioc, true, NULL);
     rc = -EINVAL;
 
     TRACE("Beginning negotiation.");
@@ -548,12 +606,12 @@  static int nbd_send_negotiate(NBDClient *client)
     }
 
     if (client->exp) {
-        if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+        if (write_sync(ioc, buf, sizeof(buf)) != sizeof(buf)) {
             LOG("write failed");
             goto fail;
         }
     } else {
-        if (write_sync(csock, buf, 18) != 18) {
+        if (write_sync(ioc, buf, 18) != 18) {
             LOG("write failed");
             goto fail;
         }
@@ -566,7 +624,7 @@  static int nbd_send_negotiate(NBDClient *client)
         assert ((client->exp->nbdflags & ~65535) == 0);
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
-        if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
+        if (write_sync(ioc, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
             LOG("write failed");
             goto fail;
         }
@@ -575,11 +633,11 @@  static int nbd_send_negotiate(NBDClient *client)
     TRACE("Negotiation succeeded.");
     rc = 0;
 fail:
-    qemu_set_nonblock(csock);
+    qio_channel_set_blocking(ioc, false, NULL);
     return rc;
 }
 
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp)
 {
     char buf[256];
@@ -591,7 +649,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
 
     rc = -EINVAL;
 
-    if (read_sync(csock, buf, 8) != 8) {
+    if (read_sync(ioc, buf, 8) != 8) {
         error_setg(errp, "Failed to read data");
         goto fail;
     }
@@ -617,7 +675,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
         goto fail;
     }
 
-    if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         error_setg(errp, "Failed to read magic");
         goto fail;
     }
@@ -638,35 +696,35 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
             }
             goto fail;
         }
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read server flags");
             goto fail;
         }
         *flags = be16_to_cpu(tmp) << 16;
         /* reserved for future use */
-        if (write_sync(csock, &reserved, sizeof(reserved)) !=
+        if (write_sync(ioc, &reserved, sizeof(reserved)) !=
             sizeof(reserved)) {
             error_setg(errp, "Failed to read reserved field");
             goto fail;
         }
         /* write the export name */
         magic = cpu_to_be64(magic);
-        if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
             error_setg(errp, "Failed to send export name magic");
             goto fail;
         }
         opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
-        if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+        if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
             error_setg(errp, "Failed to send export name option number");
             goto fail;
         }
         namesize = cpu_to_be32(strlen(name));
-        if (write_sync(csock, &namesize, sizeof(namesize)) !=
+        if (write_sync(ioc, &namesize, sizeof(namesize)) !=
             sizeof(namesize)) {
             error_setg(errp, "Failed to send export name length");
             goto fail;
         }
-        if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
+        if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) {
             error_setg(errp, "Failed to send export name");
             goto fail;
         }
@@ -683,7 +741,7 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
         }
     }
 
-    if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
+    if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) {
         error_setg(errp, "Failed to read export length");
         goto fail;
     }
@@ -691,19 +749,19 @@  int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
     TRACE("Size is %" PRIu64, *size);
 
     if (!name) {
-        if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
+        if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags = be32_to_cpup(flags);
     } else {
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags |= be16_to_cpu(tmp);
     }
-    if (read_sync(csock, &buf, 124) != 124) {
+    if (read_sync(ioc, &buf, 124) != 124) {
         error_setg(errp, "Failed to read reserved block");
         goto fail;
     }
@@ -714,11 +772,11 @@  fail:
 }
 
 #ifdef __linux__
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size)
 {
     TRACE("Setting NBD socket");
 
-    if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
+    if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) {
         int serrno = errno;
         LOG("Failed to set NBD socket");
         return -serrno;
@@ -799,7 +857,7 @@  int nbd_client(int fd)
     return ret;
 }
 #else
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannel *ioc, uint32_t flags, off_t size)
 {
     return -ENOTSUP;
 }
@@ -815,7 +873,7 @@  int nbd_client(int fd)
 }
 #endif
 
-ssize_t nbd_send_request(int csock, struct nbd_request *request)
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     ssize_t ret;
@@ -830,7 +888,7 @@  ssize_t nbd_send_request(int csock, struct nbd_request *request)
           "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
           request->from, request->len, request->handle, request->type);
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -842,13 +900,13 @@  ssize_t nbd_send_request(int csock, struct nbd_request *request)
     return 0;
 }
 
-static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
+static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -883,13 +941,13 @@  static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
     return 0;
 }
 
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -922,7 +980,7 @@  ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
     return 0;
 }
 
-static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
+static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     ssize_t ret;
@@ -940,7 +998,7 @@  static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
 
     TRACE("Sending response to client");
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -968,8 +1026,8 @@  void nbd_client_put(NBDClient *client)
         assert(client->closing);
 
         nbd_unset_handlers(client);
-        close(client->sock);
-        client->sock = -1;
+        object_unref(OBJECT(client->sioc));
+        object_unref(OBJECT(client->ioc));
         if (client->exp) {
             QTAILQ_REMOVE(&client->exp->clients, client, next);
             nbd_export_put(client->exp);
@@ -989,7 +1047,8 @@  static void client_close(NBDClient *client)
     /* Force requests to finish.  They will drop their own references,
      * then we'll close the socket and free the NBDClient.
      */
-    shutdown(client->sock, 2);
+    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
+                         NULL);
 
     /* Also tell the client, so that they release their reference.  */
     if (client->close) {
@@ -1182,25 +1241,26 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
                                  int len)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
+    QIOChannel *ioc = client->ioc;
     ssize_t rc, ret;
 
+    g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine = qemu_coroutine_self();
     nbd_set_handlers(client);
 
     if (!len) {
-        rc = nbd_send_reply(csock, reply);
+        rc = nbd_send_reply(ioc, reply);
     } else {
-        socket_set_cork(csock, 1);
-        rc = nbd_send_reply(csock, reply);
+        qio_channel_set_cork(ioc, true);
+        rc = nbd_send_reply(ioc, reply);
         if (rc >= 0) {
-            ret = qemu_co_send(csock, req->data, len);
+            ret = write_sync(ioc, req->data, len);
             if (ret != len) {
                 rc = -EIO;
             }
         }
-        socket_set_cork(csock, 0);
+        qio_channel_set_cork(ioc, false);
     }
 
     client->send_coroutine = NULL;
@@ -1212,14 +1272,15 @@  static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
+    QIOChannel *ioc = client->ioc;
     uint32_t command;
     ssize_t rc;
 
+    g_assert(qemu_in_coroutine());
     client->recv_coroutine = qemu_coroutine_self();
     nbd_update_can_read(client);
 
-    rc = nbd_receive_request(csock, request);
+    rc = nbd_receive_request(ioc, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
             rc = -EIO;
@@ -1250,7 +1311,7 @@  static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
     if (command == NBD_CMD_WRITE) {
         TRACE("Reading %u byte(s)", request->len);
 
-        if (qemu_co_recv(csock, req->data, request->len) != request->len) {
+        if (read_sync(ioc, req->data, request->len) != request->len) {
             LOG("reading from socket failed");
             rc = -EIO;
             goto out;
@@ -1445,7 +1506,7 @@  static void nbd_restart_write(void *opaque)
 static void nbd_set_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true,
                            client->can_read ? nbd_read : NULL,
                            client->send_coroutine ? nbd_restart_write : NULL,
@@ -1456,7 +1517,7 @@  static void nbd_set_handlers(NBDClient *client)
 static void nbd_unset_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true, NULL, NULL, NULL);
     }
 }
@@ -1475,16 +1536,22 @@  static void nbd_update_can_read(NBDClient *client)
     }
 }
 
-NBDClient *nbd_client_new(NBDExport *exp, int csock,
+NBDClient *nbd_client_new(NBDExport *exp,
+                          QIOChannelSocket *sioc,
                           void (*close)(NBDClient *))
 {
     NBDClient *client;
     client = g_malloc0(sizeof(NBDClient));
     client->refcount = 1;
     client->exp = exp;
-    client->sock = csock;
+    client->sioc = sioc;
+    object_ref(OBJECT(client->sioc));
+    client->ioc = QIO_CHANNEL(sioc);
+    object_ref(OBJECT(client->ioc));
     client->can_read = true;
     if (nbd_send_negotiate(client)) {
+        object_unref(OBJECT(client->sioc));
+        object_unref(OBJECT(client->ioc));
         g_free(client);
         return NULL;
     }
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 1941324..2c47fc6 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -252,7 +252,7 @@  static void *nbd_client_thread(void *arg)
         goto out;
     }
 
-    ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags,
                                 &size, &local_error);
     if (ret < 0) {
         if (local_error) {
@@ -269,7 +269,7 @@  static void *nbd_client_thread(void *arg)
         goto out_socket;
     }
 
-    ret = nbd_init(fd, sioc->fd, nbdflags, size);
+    ret = nbd_init(fd, sioc, nbdflags, size);
     if (ret < 0) {
         goto out_fd;
     }
@@ -329,7 +329,6 @@  static void nbd_client_closed(NBDClient *client)
 static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -342,9 +341,7 @@  static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
         return TRUE;
     }
 
-    fd = dup(cioc->fd);
-    if (fd >= 0 &&
-        nbd_client_new(exp, fd, nbd_client_closed)) {
+    if (nbd_client_new(exp, cioc, nbd_client_closed)) {
         nb_fds++;
         nbd_update_server_watch();
     }