From patchwork Fri Dec 23 15:26:07 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 133072 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [140.186.70.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 1D8F1B7023 for ; Sat, 24 Dec 2011 02:28:04 +1100 (EST) Received: from localhost ([::1]:48910 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re72I-0007kL-H0 for incoming@patchwork.ozlabs.org; Fri, 23 Dec 2011 10:27:54 -0500 Received: from eggs.gnu.org ([140.186.70.92]:50253) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re722-0007KK-LH for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:27:44 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Re71x-0006I8-LD for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:27:38 -0500 Received: from mail-iy0-f173.google.com ([209.85.210.173]:47637) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re71x-0006Em-5s for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:27:33 -0500 Received: by mail-iy0-f173.google.com with SMTP id j37so17390749iag.4 for ; Fri, 23 Dec 2011 07:27:33 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=sender:from:to:subject:date:message-id:x-mailer:in-reply-to :references; bh=NjgHVupg8sMpQp4kKjyhvRJAW1/cLIfKBd3qYnbSI0E=; b=ioIZ9LeupzyYYpxJjrJE7yBvMb7QfMeiz0cegn3I8RO22pk/6BdWAGKm0f1kVO73xP efKNTUKXsJ0uO8hU/T8kEBu8Fixq0djTolccrtlCXrj8qhrsy3HJmeM5SNbpwjTRNll5 0uIJ2aDXjBjTiFpwYp8MkZe6Y+iSjnUkS8i10= Received: by 10.43.43.130 with SMTP id uc2mr15677263icb.35.1324654052985; Fri, 23 Dec 2011 07:27:32 -0800 (PST) Received: from localhost.localdomain (93-34-178-147.ip50.fastwebnet.it. [93.34.178.147]) by mx.google.com with ESMTPS id aq5sm42055557igc.5.2011.12.23.07.27.27 (version=TLSv1/SSLv3 cipher=OTHER); Fri, 23 Dec 2011 07:27:31 -0800 (PST) From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Fri, 23 Dec 2011 16:26:07 +0100 Message-Id: <1324653990-20074-4-git-send-email-pbonzini@redhat.com> X-Mailer: git-send-email 1.7.7.1 In-Reply-To: <1324653990-20074-1-git-send-email-pbonzini@redhat.com> References: <1324653990-20074-1-git-send-email-pbonzini@redhat.com> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6 (newer, 2) X-Received-From: 209.85.210.173 Subject: [Qemu-devel] [PATCH 03/26] nbd: switch to asynchronous operation X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Signed-off-by: Paolo Bonzini --- block/nbd.c | 188 ++++++++++++++++++++++++++++++++++++++-------------------- nbd.c | 8 +++ 2 files changed, 131 insertions(+), 65 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 95212da..bea7acd 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -47,13 +47,17 @@ #endif typedef struct BDRVNBDState { - CoMutex lock; int sock; uint32_t nbdflags; off_t size; size_t blocksize; char *export_name; /* An NBD server may export several devices */ + CoMutex mutex; + Coroutine *coroutine; + + struct nbd_reply reply; + /* If it begins with '/', this is a UNIX domain socket. Otherwise, * it's a string of the form :port */ @@ -106,6 +110,95 @@ out: return err; } +static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) +{ + qemu_co_mutex_lock(&s->mutex); + s->coroutine = qemu_coroutine_self(); + request->handle = (uint64_t)(intptr_t)s; +} + +static int nbd_have_request(void *opaque) +{ + BDRVNBDState *s = opaque; + + return !!s->coroutine; +} + +static void nbd_reply_ready(void *opaque) +{ + BDRVNBDState *s = opaque; + + if (s->reply.handle == 0) { + /* No reply already in flight. Fetch a header. */ + if (nbd_receive_reply(s->sock, &s->reply) < 0) { + s->reply.handle = 0; + } + } + + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. */ + if (s->coroutine) { + qemu_coroutine_enter(s->coroutine, NULL); + } +} + +static void nbd_restart_write(void *opaque) +{ + BDRVNBDState *s = opaque; + qemu_coroutine_enter(s->coroutine, NULL); +} + +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, + struct iovec *iov, int offset) +{ + int rc, ret; + + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, + nbd_have_request, NULL, s); + rc = nbd_send_request(s->sock, request); + if (rc != -1 && iov) { + ret = qemu_co_sendv(s->sock, iov, request->len, offset); + if (ret != request->len) { + errno = -EIO; + rc = -1; + } + } + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); + return rc; +} + +static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, + struct nbd_reply *reply, + struct iovec *iov, int offset) +{ + int ret; + + /* Wait until we're woken up by the read handler. */ + qemu_coroutine_yield(); + *reply = s->reply; + if (reply->handle != request->handle) { + reply->error = EIO; + } else { + if (iov && reply->error == 0) { + ret = qemu_co_recvv(s->sock, iov, request->len, offset); + if (ret != request->len) { + reply->error = EIO; + } + } + + /* Tell the read handler to read another header. */ + s->reply.handle = 0; + } +} + +static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) +{ + s->coroutine = NULL; + qemu_co_mutex_unlock(&s->mutex); +} + static int nbd_establish_connection(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs) return -errno; } - /* Now that we're connected, set the socket to be non-blocking */ + /* Now that we're connected, set the socket to be non-blocking and + * kick the reply mechanism. */ socket_set_nonblock(sock); + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); s->sock = sock; s->size = size; @@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs) struct nbd_request request; request.type = NBD_CMD_DISC; - request.handle = (uint64_t)(intptr_t)bs; request.from = 0; request.len = 0; nbd_send_request(s->sock, &request); + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); closesocket(s->sock); } @@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; + qemu_co_mutex_init(&s->mutex); + /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags); if (result != 0) { @@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) */ result = nbd_establish_connection(bs); - qemu_co_mutex_init(&s->lock); return result; } -static int nbd_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_READ; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) - return -EIO; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; - return 0; } -static int nbd_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_WRITE; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) - return -EIO; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - return 0; -} - -static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_read(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; -} - -static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_write(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; } static void nbd_close(BlockDriverState *bs) @@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = { .format_name = "nbd", .instance_size = sizeof(BDRVNBDState), .bdrv_file_open = nbd_open, - .bdrv_read = nbd_co_read, - .bdrv_write = nbd_co_write, + .bdrv_co_readv = nbd_co_readv, + .bdrv_co_writev = nbd_co_writev, .bdrv_close = nbd_close, .bdrv_getlength = nbd_getlength, .protocol_name = "nbd", diff --git a/nbd.c b/nbd.c index de880fe..ff701d3 100644 --- a/nbd.c +++ b/nbd.c @@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; + if (qemu_in_coroutine()) { + if (do_read) { + return qemu_co_recv(fd, buffer, size); + } else { + return qemu_co_send(fd, buffer, size); + } + } + while (offset < size) { ssize_t len;