From patchwork Fri Dec 23 15:26:09 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 133087 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [140.186.70.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id E1E2FB71B7 for ; Sat, 24 Dec 2011 02:50:13 +1100 (EST) Received: from localhost ([::1]:52946 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re732-0001TP-DD for incoming@patchwork.ozlabs.org; Fri, 23 Dec 2011 10:28:40 -0500 Received: from eggs.gnu.org ([140.186.70.92]:50643) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re72q-0001I8-2R for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:28:29 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Re72G-0006RQ-M0 for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:28:08 -0500 Received: from mail-tul01m020-f173.google.com ([209.85.214.173]:53250) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Re72F-0006Qc-MV for qemu-devel@nongnu.org; Fri, 23 Dec 2011 10:27:52 -0500 Received: by obcwp18 with SMTP id wp18so5559728obc.4 for ; Fri, 23 Dec 2011 07:27:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=sender:from:to:subject:date:message-id:x-mailer:in-reply-to :references; bh=MXzdy88bWWZnMrwAFiUeG+QaTdLsUEJ6UvaSn6wyNFw=; b=fQVcnUG5LwGGVDvrM2+jJiCtwLxlTMwUSYqKUgFkXZ9tRWgAFg2ArwMISqb1ivgoK/ m932t9GgEG7lHLwM8bm3/zaWZ7d5zuj5p85LSIyIkb9yD0oCqF1cS7SUTyzgdmaO927w dAttZYvk5eRdn33CJQBXLzdI4w7Sv/y3grgQo= Received: by 10.50.195.164 with SMTP id if4mr14252234igc.11.1324654070566; Fri, 23 Dec 2011 07:27:50 -0800 (PST) Received: from localhost.localdomain (93-34-178-147.ip50.fastwebnet.it. [93.34.178.147]) by mx.google.com with ESMTPS id aq5sm42055557igc.5.2011.12.23.07.27.42 (version=TLSv1/SSLv3 cipher=OTHER); Fri, 23 Dec 2011 07:27:49 -0800 (PST) From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Fri, 23 Dec 2011 16:26:09 +0100 Message-Id: <1324653990-20074-6-git-send-email-pbonzini@redhat.com> X-Mailer: git-send-email 1.7.7.1 In-Reply-To: <1324653990-20074-1-git-send-email-pbonzini@redhat.com> References: <1324653990-20074-1-git-send-email-pbonzini@redhat.com> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6 (newer, 2) X-Received-From: 209.85.214.173 Subject: [Qemu-devel] [PATCH 05/26] nbd: allow multiple in-flight requests X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Allow sending up to 16 requests, and drive the replies to the coroutine that did the request. The code is written to be exactly the same as before this patch when MAX_NBD_REQUESTS == 1 (modulo the extra mutex and state). Signed-off-by: Paolo Bonzini --- block/nbd.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 56 insertions(+), 13 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 9d661c1..3f693e3 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -46,6 +46,10 @@ #define logout(fmt, ...) ((void)0) #endif +#define MAX_NBD_REQUESTS 16 +#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) +#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) + typedef struct BDRVNBDState { int sock; uint32_t nbdflags; @@ -53,9 +57,12 @@ typedef struct BDRVNBDState { size_t blocksize; char *export_name; /* An NBD server may export several devices */ - CoMutex mutex; - Coroutine *coroutine; + CoMutex send_mutex; + CoMutex free_sema; + Coroutine *send_coroutine; + int in_flight; + Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; struct nbd_reply reply; /* If it begins with '/', this is a UNIX domain socket. Otherwise, @@ -112,41 +119,68 @@ out: static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) { - qemu_co_mutex_lock(&s->mutex); - s->coroutine = qemu_coroutine_self(); - request->handle = (uint64_t)(intptr_t)s; + int i; + + /* Poor man semaphore. The free_sema is locked when no other request + * can be accepted, and unlocked after receiving one reply. */ + if (s->in_flight >= MAX_NBD_REQUESTS - 1) { + qemu_co_mutex_lock(&s->free_sema); + assert(s->in_flight < MAX_NBD_REQUESTS); + } + s->in_flight++; + + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i] == NULL) { + s->recv_coroutine[i] = qemu_coroutine_self(); + break; + } + } + + assert(i < MAX_NBD_REQUESTS); + request->handle = INDEX_TO_HANDLE(s, i); } static int nbd_have_request(void *opaque) { BDRVNBDState *s = opaque; - return !!s->coroutine; + return s->in_flight > 0; } static void nbd_reply_ready(void *opaque) { BDRVNBDState *s = opaque; + int i; if (s->reply.handle == 0) { /* No reply already in flight. Fetch a header. */ if (nbd_receive_reply(s->sock, &s->reply) < 0) { s->reply.handle = 0; + goto fail; } } /* There's no need for a mutex on the receive side, because the * handler acts as a synchronization point and ensures that only * one coroutine is called until the reply finishes. */ - if (s->coroutine) { - qemu_coroutine_enter(s->coroutine, NULL); + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + return; + } + +fail: + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + } } } static void nbd_restart_write(void *opaque) { BDRVNBDState *s = opaque; - qemu_coroutine_enter(s->coroutine, NULL); + qemu_coroutine_enter(s->send_coroutine, NULL); } static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, @@ -154,6 +188,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, { int rc, ret; + qemu_co_mutex_lock(&s->send_mutex); + s->send_coroutine = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, nbd_have_request, NULL, s); rc = nbd_send_request(s->sock, request); @@ -166,6 +202,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, } qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, nbd_have_request, NULL, s); + s->send_coroutine = NULL; + qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -175,7 +213,8 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, { int ret; - /* Wait until we're woken up by the read handler. */ + /* Wait until we're woken up by the read handler. TODO: perhaps + * peek at the next reply and avoid yielding if it's ours? */ qemu_coroutine_yield(); *reply = s->reply; if (reply->handle != request->handle) { @@ -195,8 +234,11 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) { - s->coroutine = NULL; - qemu_co_mutex_unlock(&s->mutex); + int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; + if (s->in_flight-- == MAX_NBD_REQUESTS) { + qemu_co_mutex_unlock(&s->free_sema); + } } static int nbd_establish_connection(BlockDriverState *bs) @@ -261,7 +303,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; - qemu_co_mutex_init(&s->mutex); + qemu_co_mutex_init(&s->send_mutex); + qemu_co_mutex_init(&s->free_sema); /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags);