From patchwork Fri Jun 12 10:09:17 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stefan Hajnoczi X-Patchwork-Id: 483470 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 7AD0C140280 for ; Fri, 12 Jun 2015 20:14:19 +1000 (AEST) Received: from localhost ([::1]:50438 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Z3Lyb-0005Kd-C1 for incoming@patchwork.ozlabs.org; Fri, 12 Jun 2015 06:14:17 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:58376) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Z3Lu5-0005bI-L4 for qemu-devel@nongnu.org; Fri, 12 Jun 2015 06:09:39 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Z3Lu3-00067k-GB for qemu-devel@nongnu.org; Fri, 12 Jun 2015 06:09:37 -0400 Received: from mx1.redhat.com ([209.132.183.28]:46761) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Z3Lu3-00067g-6x for qemu-devel@nongnu.org; Fri, 12 Jun 2015 06:09:35 -0400 Received: from int-mx10.intmail.prod.int.phx2.redhat.com (int-mx10.intmail.prod.int.phx2.redhat.com [10.5.11.23]) by mx1.redhat.com (Postfix) with ESMTPS id ED0A82CAF3F; Fri, 12 Jun 2015 10:09:34 +0000 (UTC) Received: from localhost (ovpn-112-61.ams2.redhat.com [10.36.112.61]) by int-mx10.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id t5CA9X7k013737; Fri, 12 Jun 2015 06:09:34 -0400 From: Stefan Hajnoczi To: qemu-devel@nongnu.org Date: Fri, 12 Jun 2015 11:09:17 +0100 Message-Id: <1434103761-29871-6-git-send-email-stefanha@redhat.com> In-Reply-To: <1434103761-29871-1-git-send-email-stefanha@redhat.com> References: <1434103761-29871-1-git-send-email-stefanha@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.23 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Cc: Kevin Wolf , famz@redhat.com, Jeff Cody , Max Reitz , vsementsov@parallels.com, Stefan Hajnoczi , John Snow Subject: [Qemu-devel] [RFC 5/9] block: add block job transactions X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sometimes block jobs must execute as a transaction group. Finishing jobs wait until all other jobs are ready to complete successfully. Failure or cancellation of one job cancels the other jobs in the group. Signed-off-by: Stefan Hajnoczi --- blockjob.c | 160 ++++++++++++++++++++++++++++++++++++++++++++++ include/block/block.h | 1 + include/block/block_int.h | 3 +- include/block/blockjob.h | 49 ++++++++++++++ trace-events | 4 ++ 5 files changed, 216 insertions(+), 1 deletion(-) diff --git a/blockjob.c b/blockjob.c index 2755465..ff622f5 100644 --- a/blockjob.c +++ b/blockjob.c @@ -399,3 +399,163 @@ void block_job_defer_to_main_loop(BlockJob *job, qemu_bh_schedule(data->bh); } + +/* Transactional group of block jobs */ +struct BlockJobTxn { + /* Jobs may be in different AioContexts so protect all fields */ + QemuMutex lock; + + /* Reference count for txn object */ + unsigned int ref; + + /* Is this txn cancelling its jobs? */ + bool aborting; + + /* Number of jobs still running */ + unsigned int jobs_pending; + + /* List of jobs */ + QLIST_HEAD(, BlockJob) jobs; +}; + +BlockJobTxn *block_job_txn_new(void) +{ + BlockJobTxn *txn = g_new(BlockJobTxn, 1); + qemu_mutex_init(&txn->lock); + txn->ref = 1; /* dropped by block_job_txn_begin() */ + txn->aborting = false; + txn->jobs_pending = 0; + QLIST_INIT(&txn->jobs); + return txn; +} + +static void block_job_txn_unref(BlockJobTxn *txn) +{ + qemu_mutex_lock(&txn->lock); + + if (--txn->ref > 0) { + qemu_mutex_unlock(&txn->lock); + return; + } + + qemu_mutex_unlock(&txn->lock); + qemu_mutex_destroy(&txn->lock); + g_free(txn); +} + +/* The purpose of this is to keep txn alive until all jobs have been added */ +void block_job_txn_begin(BlockJobTxn *txn) +{ + block_job_txn_unref(txn); +} + +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn = txn; + + qemu_mutex_lock(&txn->lock); + txn->ref++; + txn->jobs_pending++; + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + qemu_mutex_unlock(&txn->lock); +} + +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of + * successful completion. Runs from main loop. + */ +static void block_job_txn_complete(BlockJob *job, void *opaque) +{ + BlockJobTxn *txn = opaque; + BlockJob *other_job; + bool aborting = txn->aborting; + + qemu_mutex_lock(&txn->lock); + txn->ref++; /* keep txn alive until the end of this loop */ + + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + AioContext *ctx; + + qemu_mutex_unlock(&txn->lock); + ctx = bdrv_get_aio_context(other_job->bs); + aio_context_acquire(ctx); + + /* Cancel all other jobs if aborting. Don't cancel our own failed job + * since cancellation throws away the error value. + */ + if (aborting && other_job != job) { + block_job_cancel(other_job); + } else { + block_job_enter(other_job); + } + + aio_context_release(ctx); + qemu_mutex_lock(&txn->lock); + } + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); +} + +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, + BlockJob *job, + int ret) +{ + if (!txn) { + return; + } + + qemu_mutex_lock(&txn->lock); + + /* This function is entered in 3 cases: + * + * 1. Successful job completion - wait for other jobs + * 2. First failed/cancelled job in txn - cancel other jobs and wait + * 3. Subsequent cancelled jobs - finish immediately, don't wait + */ + trace_block_job_txn_prepare_to_complete_entry(txn, job, ret, + block_job_is_cancelled(job), + txn->aborting, + txn->jobs_pending); + + if (txn->aborting) { /* Case 3 */ + assert(block_job_is_cancelled(job)); + goto out; /* already cancelled, don't yield */ + } + + if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */ +abort: + txn->aborting = true; + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); + } else { /* Case 1 */ + if (--txn->jobs_pending == 0) { + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); + } + } + + /* Wait for block_job_txn_complete() */ + do { + qemu_mutex_unlock(&txn->lock); + job->busy = false; + qemu_coroutine_yield(); + job->busy = true; + qemu_mutex_lock(&txn->lock); + + if (block_job_is_cancelled(job) && !txn->aborting) { + goto abort; /* this job just got cancelled by the user */ + } + } while (!txn->aborting && txn->jobs_pending > 0); + +out: + trace_block_job_txn_prepare_to_complete_return(txn, job, ret, + block_job_is_cancelled(job), + txn->aborting, + txn->jobs_pending); + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); +} diff --git a/include/block/block.h b/include/block/block.h index 3d62d3e..439925c 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -12,6 +12,7 @@ /* block.c */ typedef struct BlockDriver BlockDriver; typedef struct BlockJob BlockJob; +typedef struct BlockJobTxn BlockJobTxn; typedef struct BlockDriverInfo { /* in bytes, 0 if irrelevant */ diff --git a/include/block/block_int.h b/include/block/block_int.h index 5af712f..9464142 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -617,6 +617,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState *target, * @on_source_error: The action to take upon error reading from the source. * @on_target_error: The action to take upon error writing to the target. * @cb: Completion function for the job. + * @txn: Transaction that this job is part of (may be NULL). * @opaque: Opaque pointer value passed to @cb. * * Start a backup operation on @bs. Clusters in @bs are written to @target @@ -628,7 +629,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target, BlockdevOnError on_source_error, BlockdevOnError on_target_error, BlockCompletionFunc *cb, void *opaque, - Error **errp); + BlockJobTxn *txn, Error **errp); void blk_dev_change_media_cb(BlockBackend *blk, bool load); bool blk_dev_has_removable_media(BlockBackend *blk); diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 57d8ef1..ce57e98 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -122,6 +122,10 @@ struct BlockJob { /** The opaque value that is passed to the completion function. */ void *opaque; + + /** Non-NULL if this job is part of a transaction */ + BlockJobTxn *txn; + QLIST_ENTRY(BlockJob) txn_list; }; /** @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job, BlockJobDeferToMainLoopFn *fn, void *opaque); +/** + * block_job_txn_new: + * + * Allocate and return a new block job transaction. Jobs can be added to the + * transaction using block_job_txn_add_job(). block_job_txn_begin() must be + * called when all jobs (if any) have been added. + * + * All jobs in the transaction either complete successfully or fail/cancel as a + * group. Jobs wait for each other before completing. Cancelling one job + * cancels all jobs in the transaction. + */ +BlockJobTxn *block_job_txn_new(void); + +/** + * block_job_txn_add_job: + * @txn: The transaction + * @job: Job to add to the transaction + * + * Add @job to the transaction. The @job must not already be in a transaction. + * The block job driver must call block_job_txn_prepare_to_complete() before + * final cleanup and completion. + */ +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job); + +/** + * block_job_txn_begin: + * @txn: The transaction + * + * Call this to mark the end of adding jobs to the transaction. This must be + * called even if no jobs were added. + */ +void block_job_txn_begin(BlockJobTxn *txn); + +/** + * block_job_txn_prepare_to_complete: + * @txn: The transaction + * @job: The block job + * @ret: Block job return value (0 for success, otherwise job failure) + * + * Wait for other jobs in the transaction to complete. If @ret is non-zero or + * @job is cancelled, all other jobs in the transaction will be cancelled. + */ +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, + BlockJob *job, int ret); + #endif diff --git a/trace-events b/trace-events index a589650..526e6ac 100644 --- a/trace-events +++ b/trace-events @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p" virtio_blk_data_plane_stop(void *s) "dataplane %p" virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u" +# blockjob.c +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u" +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u" + # hw/virtio/dataplane/vring.c vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"