From patchwork Tue Aug 27 14:39:45 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stefan Hajnoczi X-Patchwork-Id: 270132 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id ED73D2C0098 for ; Wed, 28 Aug 2013 00:40:21 +1000 (EST) Received: from localhost ([::1]:56827 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1VEKRP-0008AR-V3 for incoming@patchwork.ozlabs.org; Tue, 27 Aug 2013 10:40:19 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:38143) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1VEKR5-00087i-2J for qemu-devel@nongnu.org; Tue, 27 Aug 2013 10:40:05 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1VEKQz-0004Ke-0X for qemu-devel@nongnu.org; Tue, 27 Aug 2013 10:39:58 -0400 Received: from mx1.redhat.com ([209.132.183.28]:20535) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1VEKQy-0004KO-PF for qemu-devel@nongnu.org; Tue, 27 Aug 2013 10:39:52 -0400 Received: from int-mx09.intmail.prod.int.phx2.redhat.com (int-mx09.intmail.prod.int.phx2.redhat.com [10.5.11.22]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id r7REdn1U015186 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK); Tue, 27 Aug 2013 10:39:49 -0400 Received: from localhost (ovpn-112-35.ams2.redhat.com [10.36.112.35]) by int-mx09.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id r7REdmDj007532; Tue, 27 Aug 2013 10:39:49 -0400 From: Stefan Hajnoczi To: Date: Tue, 27 Aug 2013 16:39:45 +0200 Message-Id: <1377614385-20466-1-git-send-email-stefanha@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.22 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Cc: Kevin Wolf , Paolo Bonzini , Wenchao Xia , Stefan Hajnoczi Subject: [Qemu-devel] [RFC] aio: add aio_context_acquire() and aio_context_release() X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org It can be useful to run an AioContext from a thread which normally does not "own" the AioContext. For example, request draining can be implemented by acquiring the AioContext and looping aio_poll() until all requests have been completed. The following pattern should work: /* Event loop thread */ while (running) { aio_context_acquire(ctx); aio_poll(ctx, true); aio_context_release(ctx); } /* Another thread */ aio_context_acquire(ctx); bdrv_read(bs, 0x1000, buf, 1); aio_context_release(ctx); This patch implements aio_context_acquire() and aio_context_release(). Note that existing aio_poll() callers do not need to worry about acquiring and releasing - it is only needed when multiple threads will call aio_poll() on the same AioContext. Signed-off-by: Stefan Hajnoczi --- I previously sent patches that implement bdrv_drain_all() by stopping dataplane threads. AioContext acquire()/release() is a more general solution than temporarily stopping dataplane threads. This solution is less hacky and also supported by other event loops like GMainContext. No need to commit this patch yet, I still want to build things on top of it before submitting a final version. async.c | 27 +++++++++++++++++++++++++ include/block/aio.h | 13 ++++++++++++ tests/test-aio.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+) diff --git a/include/block/aio.h b/include/block/aio.h index 5743bf1..9035e87 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque); typedef struct AioContext { GSource source; + QemuMutex acquire_lock; + QemuCond acquire_cond; + QemuThread owner_thread; + QemuThread *owner; + /* The list of registered AIO handlers */ QLIST_HEAD(, AioHandler) aio_handlers; @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx); */ void aio_context_unref(AioContext *ctx); +/* Take ownership of the AioContext. If the AioContext will be shared between + * threads, a thread must have ownership when calling aio_poll(). + */ +void aio_context_acquire(AioContext *ctx); + +/* Reliquinish ownership of the AioContext. */ +void aio_context_release(AioContext *ctx); + /** * aio_bh_new: Allocate a new bottom half structure. * diff --git a/async.c b/async.c index 9791d8e..9fec07c 100644 --- a/async.c +++ b/async.c @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource *source) thread_pool_free(ctx->thread_pool); aio_set_event_notifier(ctx, &ctx->notifier, NULL); event_notifier_cleanup(&ctx->notifier); + qemu_cond_destroy(&ctx->acquire_cond); + qemu_mutex_destroy(&ctx->acquire_lock); qemu_mutex_destroy(&ctx->bh_lock); g_array_free(ctx->pollfds, TRUE); } @@ -240,6 +242,9 @@ AioContext *aio_context_new(void) ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); ctx->thread_pool = NULL; qemu_mutex_init(&ctx->bh_lock); + qemu_mutex_init(&ctx->acquire_lock); + qemu_cond_init(&ctx->acquire_cond); + ctx->owner = NULL; event_notifier_init(&ctx->notifier, false); aio_set_event_notifier(ctx, &ctx->notifier, (EventNotifierHandler *) @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx) { g_source_unref(&ctx->source); } + +void aio_context_acquire(AioContext *ctx) +{ + qemu_mutex_lock(&ctx->acquire_lock); + while (ctx->owner) { + assert(!qemu_thread_is_self(ctx->owner)); + aio_notify(ctx); /* kick current owner */ + qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock); + } + qemu_thread_get_self(&ctx->owner_thread); + ctx->owner = &ctx->owner_thread; + qemu_mutex_unlock(&ctx->acquire_lock); +} + +void aio_context_release(AioContext *ctx) +{ + qemu_mutex_lock(&ctx->acquire_lock); + assert(ctx->owner && qemu_thread_is_self(ctx->owner)); + ctx->owner = NULL; + qemu_cond_signal(&ctx->acquire_cond); + qemu_mutex_unlock(&ctx->acquire_lock); +} diff --git a/tests/test-aio.c b/tests/test-aio.c index 1ab5637..324c099 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -88,6 +88,63 @@ static void test_notify(void) g_assert(!aio_poll(ctx, false)); } +typedef struct { + QemuMutex start_lock; + bool thread_acquired; +} AcquireTestData; + +static void *test_acquire_thread(void *opaque) +{ + AcquireTestData *data = opaque; + + /* Wait for other thread to let us start */ + qemu_mutex_lock(&data->start_lock); + qemu_mutex_unlock(&data->start_lock); + + aio_context_acquire(ctx); + aio_context_release(ctx); + + data->thread_acquired = true; /* success, we got here */ + + return NULL; +} + +static void dummy_notifier_read(EventNotifier *unused) +{ + g_assert(false); /* should never be invoked */ +} + +static void test_acquire(void) +{ + QemuThread thread; + EventNotifier notifier; + AcquireTestData data; + + /* Dummy event notifier ensures aio_poll() will block */ + event_notifier_init(¬ifier, false); + aio_set_event_notifier(ctx, ¬ifier, dummy_notifier_read); + g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */ + + qemu_mutex_init(&data.start_lock); + qemu_mutex_lock(&data.start_lock); + data.thread_acquired = false; + + qemu_thread_create(&thread, test_acquire_thread, + &data, QEMU_THREAD_JOINABLE); + + /* Block in aio_poll(), let other thread kick us and acquire context */ + aio_context_acquire(ctx); + qemu_mutex_unlock(&data.start_lock); /* let the thread run */ + g_assert(!aio_poll(ctx, true)); + aio_context_release(ctx); + + qemu_thread_join(&thread); + aio_set_event_notifier(ctx, ¬ifier, NULL); + event_notifier_cleanup(¬ifier); + + g_assert(data.thread_acquired); +} + static void test_bh_schedule(void) { BHTestData data = { .n = 0 }; @@ -639,6 +696,7 @@ int main(int argc, char **argv) g_test_init(&argc, &argv, NULL); g_test_add_func("/aio/notify", test_notify); + g_test_add_func("/aio/acquire", test_acquire); g_test_add_func("/aio/bh/schedule", test_bh_schedule); g_test_add_func("/aio/bh/schedule10", test_bh_schedule10); g_test_add_func("/aio/bh/cancel", test_bh_cancel);