diff mbox

[05/10] block: add block job transactions

Message ID 1435234332-581-6-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi June 25, 2015, 12:12 p.m. UTC
Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 blockjob.c                | 160 ++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h     |   1 +
 include/block/block_int.h |   3 +-
 include/block/blockjob.h  |  49 ++++++++++++++
 trace-events              |   4 ++
 5 files changed, 216 insertions(+), 1 deletion(-)

Comments

Fam Zheng June 26, 2015, 6:41 a.m. UTC | #1
On Thu, 06/25 13:12, Stefan Hajnoczi wrote:
> Sometimes block jobs must execute as a transaction group.  Finishing
> jobs wait until all other jobs are ready to complete successfully.
> Failure or cancellation of one job cancels the other jobs in the group.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>

Reviewed-by: Fam Zheng <famz@redhat.com>

> ---
>  blockjob.c                | 160 ++++++++++++++++++++++++++++++++++++++++++++++
>  include/block/block.h     |   1 +
>  include/block/block_int.h |   3 +-
>  include/block/blockjob.h  |  49 ++++++++++++++
>  trace-events              |   4 ++
>  5 files changed, 216 insertions(+), 1 deletion(-)
> 
> diff --git a/blockjob.c b/blockjob.c
> index ec46fad..3c6f1d4 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
>  
>      qemu_bh_schedule(data->bh);
>  }
> +
> +/* Transactional group of block jobs */
> +struct BlockJobTxn {
> +    /* Jobs may be in different AioContexts so protect all fields */
> +    QemuMutex lock;
> +
> +    /* Reference count for txn object */
> +    unsigned int ref;
> +
> +    /* Is this txn cancelling its jobs? */
> +    bool aborting;
> +
> +    /* Number of jobs still running */
> +    unsigned int jobs_pending;
> +
> +    /* List of jobs */
> +    QLIST_HEAD(, BlockJob) jobs;
> +};
> +
> +BlockJobTxn *block_job_txn_new(void)
> +{
> +    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
> +    qemu_mutex_init(&txn->lock);
> +    txn->ref = 1; /* dropped by block_job_txn_begin() */
> +    txn->aborting = false;
> +    txn->jobs_pending = 0;
> +    QLIST_INIT(&txn->jobs);
> +    return txn;
> +}
> +
> +static void block_job_txn_unref(BlockJobTxn *txn)
> +{
> +    qemu_mutex_lock(&txn->lock);
> +
> +    if (--txn->ref > 0) {
> +        qemu_mutex_unlock(&txn->lock);
> +        return;
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    qemu_mutex_destroy(&txn->lock);
> +    g_free(txn);
> +}
> +
> +/* The purpose of this is to keep txn alive until all jobs have been added */
> +void block_job_txn_begin(BlockJobTxn *txn)
> +{
> +    block_job_txn_unref(txn);
> +}
> +
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    assert(!job->txn);
> +    job->txn = txn;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++;
> +    txn->jobs_pending++;
> +    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
> +    qemu_mutex_unlock(&txn->lock);
> +}
> +
> +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
> + * successful completion.  Runs from main loop.
> + */
> +static void block_job_txn_complete(BlockJob *job, void *opaque)
> +{
> +    BlockJobTxn *txn = opaque;
> +    BlockJob *other_job;
> +    bool aborting = txn->aborting;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++; /* keep txn alive until the end of this loop */
> +
> +    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
> +        AioContext *ctx;
> +
> +        qemu_mutex_unlock(&txn->lock);
> +        ctx = bdrv_get_aio_context(other_job->bs);
> +        aio_context_acquire(ctx);
> +
> +        /* Cancel all other jobs if aborting.  Don't cancel our own failed job
> +         * since cancellation throws away the error value.
> +         */
> +        if (aborting && other_job != job) {
> +            block_job_cancel(other_job);
> +        } else {
> +            block_job_enter(other_job);
> +        }
> +
> +        aio_context_release(ctx);
> +        qemu_mutex_lock(&txn->lock);
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> +
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job,
> +                                                    int ret)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    qemu_mutex_lock(&txn->lock);
> +
> +    /* This function is entered in 3 cases:
> +     *
> +     * 1. Successful job completion - wait for other jobs
> +     * 2. First failed/cancelled job in txn - cancel other jobs and wait
> +     * 3. Subsequent cancelled jobs - finish immediately, don't wait
> +     */
> +    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
> +                                                  block_job_is_cancelled(job),
> +                                                  txn->aborting,
> +                                                  txn->jobs_pending);
> +
> +    if (txn->aborting) { /* Case 3 */
> +        assert(block_job_is_cancelled(job));
> +        goto out; /* already cancelled, don't yield */
> +    }
> +
> +    if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
> +abort:
> +        txn->aborting = true;
> +        block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +    } else { /* Case 1 */
> +        if (--txn->jobs_pending == 0) {
> +            block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +        }
> +    }
> +
> +    /* Wait for block_job_txn_complete() */
> +    do {
> +        qemu_mutex_unlock(&txn->lock);
> +        job->busy = false;
> +        qemu_coroutine_yield();
> +        job->busy = true;
> +        qemu_mutex_lock(&txn->lock);
> +
> +        if (block_job_is_cancelled(job) && !txn->aborting) {
> +            goto abort; /* this job just got cancelled by the user */
> +        }
> +    } while (!txn->aborting && txn->jobs_pending > 0);
> +
> +out:
> +    trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
> +                                                   block_job_is_cancelled(job),
> +                                                   txn->aborting,
> +                                                   txn->jobs_pending);
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> diff --git a/include/block/block.h b/include/block/block.h
> index a4c505d..cb19c73 100644
> --- a/include/block/block.h
> +++ b/include/block/block.h
> @@ -13,6 +13,7 @@
>  typedef struct BlockDriver BlockDriver;
>  typedef struct BlockJob BlockJob;
>  typedef struct BdrvChildRole BdrvChildRole;
> +typedef struct BlockJobTxn BlockJobTxn;
>  
>  typedef struct BlockDriverInfo {
>      /* in bytes, 0 if irrelevant */
> diff --git a/include/block/block_int.h b/include/block/block_int.h
> index ea3e7f0..812a18a 100644
> --- a/include/block/block_int.h
> +++ b/include/block/block_int.h
> @@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState *target,
>   * @on_source_error: The action to take upon error reading from the source.
>   * @on_target_error: The action to take upon error writing to the target.
>   * @cb: Completion function for the job.
> + * @txn: Transaction that this job is part of (may be NULL).
>   * @opaque: Opaque pointer value passed to @cb.
>   *
>   * Start a backup operation on @bs.  Clusters in @bs are written to @target
> @@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target,
>                    BlockdevOnError on_source_error,
>                    BlockdevOnError on_target_error,
>                    BlockCompletionFunc *cb, void *opaque,
> -                  Error **errp);
> +                  BlockJobTxn *txn, Error **errp);
>  
>  void blk_dev_change_media_cb(BlockBackend *blk, bool load);
>  bool blk_dev_has_removable_media(BlockBackend *blk);
> diff --git a/include/block/blockjob.h b/include/block/blockjob.h
> index 57d8ef1..ce57e98 100644
> --- a/include/block/blockjob.h
> +++ b/include/block/blockjob.h
> @@ -122,6 +122,10 @@ struct BlockJob {
>  
>      /** The opaque value that is passed to the completion function.  */
>      void *opaque;
> +
> +    /** Non-NULL if this job is part of a transaction */
> +    BlockJobTxn *txn;
> +    QLIST_ENTRY(BlockJob) txn_list;
>  };
>  
>  /**
> @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
>                                    BlockJobDeferToMainLoopFn *fn,
>                                    void *opaque);
>  
> +/**
> + * block_job_txn_new:
> + *
> + * Allocate and return a new block job transaction.  Jobs can be added to the
> + * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
> + * called when all jobs (if any) have been added.
> + *
> + * All jobs in the transaction either complete successfully or fail/cancel as a
> + * group.  Jobs wait for each other before completing.  Cancelling one job
> + * cancels all jobs in the transaction.
> + */
> +BlockJobTxn *block_job_txn_new(void);
> +
> +/**
> + * block_job_txn_add_job:
> + * @txn: The transaction
> + * @job: Job to add to the transaction
> + *
> + * Add @job to the transaction.  The @job must not already be in a transaction.
> + * The block job driver must call block_job_txn_prepare_to_complete() before
> + * final cleanup and completion.
> + */
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
> +
> +/**
> + * block_job_txn_begin:
> + * @txn: The transaction
> + *
> + * Call this to mark the end of adding jobs to the transaction.  This must be
> + * called even if no jobs were added.
> + */
> +void block_job_txn_begin(BlockJobTxn *txn);
> +
> +/**
> + * block_job_txn_prepare_to_complete:
> + * @txn: The transaction
> + * @job: The block job
> + * @ret: Block job return value (0 for success, otherwise job failure)
> + *
> + * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
> + * @job is cancelled, all other jobs in the transaction will be cancelled.
> + */
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job, int ret);
> +
>  #endif
> diff --git a/trace-events b/trace-events
> index 52b7efa..b6a43a0 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
>  virtio_blk_data_plane_stop(void *s) "dataplane %p"
>  virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
>  
> +# blockjob.c
> +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
> +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
> +
>  # hw/virtio/dataplane/vring.c
>  vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
>  
> -- 
> 2.4.3
> 
>
John Snow June 29, 2015, 10:38 p.m. UTC | #2
On 06/25/2015 08:12 AM, Stefan Hajnoczi wrote:
> Sometimes block jobs must execute as a transaction group.  Finishing
> jobs wait until all other jobs are ready to complete successfully.
> Failure or cancellation of one job cancels the other jobs in the group.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  blockjob.c                | 160 ++++++++++++++++++++++++++++++++++++++++++++++
>  include/block/block.h     |   1 +
>  include/block/block_int.h |   3 +-
>  include/block/blockjob.h  |  49 ++++++++++++++
>  trace-events              |   4 ++
>  5 files changed, 216 insertions(+), 1 deletion(-)
> 
> diff --git a/blockjob.c b/blockjob.c
> index ec46fad..3c6f1d4 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
>  
>      qemu_bh_schedule(data->bh);
>  }
> +
> +/* Transactional group of block jobs */
> +struct BlockJobTxn {
> +    /* Jobs may be in different AioContexts so protect all fields */
> +    QemuMutex lock;
> +
> +    /* Reference count for txn object */
> +    unsigned int ref;
> +
> +    /* Is this txn cancelling its jobs? */
> +    bool aborting;
> +
> +    /* Number of jobs still running */
> +    unsigned int jobs_pending;
> +
> +    /* List of jobs */
> +    QLIST_HEAD(, BlockJob) jobs;
> +};
> +
> +BlockJobTxn *block_job_txn_new(void)
> +{
> +    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
> +    qemu_mutex_init(&txn->lock);
> +    txn->ref = 1; /* dropped by block_job_txn_begin() */
> +    txn->aborting = false;
> +    txn->jobs_pending = 0;
> +    QLIST_INIT(&txn->jobs);
> +    return txn;
> +}
> +
> +static void block_job_txn_unref(BlockJobTxn *txn)
> +{
> +    qemu_mutex_lock(&txn->lock);
> +
> +    if (--txn->ref > 0) {
> +        qemu_mutex_unlock(&txn->lock);
> +        return;
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    qemu_mutex_destroy(&txn->lock);
> +    g_free(txn);
> +}
> +
> +/* The purpose of this is to keep txn alive until all jobs have been added */
> +void block_job_txn_begin(BlockJobTxn *txn)
> +{
> +    block_job_txn_unref(txn);
> +}
> +

Maybe it's not entirely clear to the caller that "begin" may in fact
actually delete the BlockJobTxn. Shall we update the caller's pointer to
NULL in this case as a hint? Passing a **txn will imply that we are
giving up our ownership of the object.

> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    assert(!job->txn);
> +    job->txn = txn;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++;
> +    txn->jobs_pending++;
> +    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
> +    qemu_mutex_unlock(&txn->lock);
> +}
> +
> +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
> + * successful completion.  Runs from main loop.
> + */
> +static void block_job_txn_complete(BlockJob *job, void *opaque)
> +{
> +    BlockJobTxn *txn = opaque;
> +    BlockJob *other_job;
> +    bool aborting = txn->aborting;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++; /* keep txn alive until the end of this loop */
> +
> +    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
> +        AioContext *ctx;
> +
> +        qemu_mutex_unlock(&txn->lock);
> +        ctx = bdrv_get_aio_context(other_job->bs);
> +        aio_context_acquire(ctx);
> +
> +        /* Cancel all other jobs if aborting.  Don't cancel our own failed job
> +         * since cancellation throws away the error value.
> +         */
> +        if (aborting && other_job != job) {
> +            block_job_cancel(other_job);
> +        } else {
> +            block_job_enter(other_job);
> +        }
> +
> +        aio_context_release(ctx);
> +        qemu_mutex_lock(&txn->lock);
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> +

Maybe we can name this one along the lines of
block_job_txn_complete_impl or something clearly internal, so that we
can name the public interface simply "block_job_txn_complete."

Maybe I'm just bike shedding, but calling only a "prepare to X" function
without a matching "X" call in the exposed API seems odd.

I suppose it doesn't matter, because I can't think of anything nicer :)

> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job,
> +                                                    int ret)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    qemu_mutex_lock(&txn->lock);
> +
> +    /* This function is entered in 3 cases:
> +     *
> +     * 1. Successful job completion - wait for other jobs
> +     * 2. First failed/cancelled job in txn - cancel other jobs and wait
> +     * 3. Subsequent cancelled jobs - finish immediately, don't wait
> +     */
> +    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
> +                                                  block_job_is_cancelled(job),
> +                                                  txn->aborting,
> +                                                  txn->jobs_pending);
> +
> +    if (txn->aborting) { /* Case 3 */
> +        assert(block_job_is_cancelled(job));
> +        goto out; /* already cancelled, don't yield */
> +    }
> +

So the first failure forces all jobs not-yet-complete to cancel. Is
there any chance for a race condition of two jobs completing almost
simultaneously, where the first fails and the second completes, and the
2nd job makes it here before it gets canceled?

BOD: I really assume the answer is "no," but it's not immediately
evident to me.

> +    if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
> +abort:
> +        txn->aborting = true;
> +        block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +    } else { /* Case 1 */
> +        if (--txn->jobs_pending == 0) {
> +            block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +        }
> +    }
> +
> +    /* Wait for block_job_txn_complete() */
> +    do {
> +        qemu_mutex_unlock(&txn->lock);
> +        job->busy = false;
> +        qemu_coroutine_yield();
> +        job->busy = true;
> +        qemu_mutex_lock(&txn->lock);
> +
> +        if (block_job_is_cancelled(job) && !txn->aborting) {
> +            goto abort; /* this job just got cancelled by the user */
> +        }
> +    } while (!txn->aborting && txn->jobs_pending > 0);
> +
> +out:
> +    trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
> +                                                   block_job_is_cancelled(job),
> +                                                   txn->aborting,
> +                                                   txn->jobs_pending);
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> diff --git a/include/block/block.h b/include/block/block.h
> index a4c505d..cb19c73 100644
> --- a/include/block/block.h
> +++ b/include/block/block.h
> @@ -13,6 +13,7 @@
>  typedef struct BlockDriver BlockDriver;
>  typedef struct BlockJob BlockJob;
>  typedef struct BdrvChildRole BdrvChildRole;
> +typedef struct BlockJobTxn BlockJobTxn;
>  
>  typedef struct BlockDriverInfo {
>      /* in bytes, 0 if irrelevant */
> diff --git a/include/block/block_int.h b/include/block/block_int.h
> index ea3e7f0..812a18a 100644
> --- a/include/block/block_int.h
> +++ b/include/block/block_int.h
> @@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState *target,
>   * @on_source_error: The action to take upon error reading from the source.
>   * @on_target_error: The action to take upon error writing to the target.
>   * @cb: Completion function for the job.
> + * @txn: Transaction that this job is part of (may be NULL).
>   * @opaque: Opaque pointer value passed to @cb.
>   *
>   * Start a backup operation on @bs.  Clusters in @bs are written to @target
> @@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target,
>                    BlockdevOnError on_source_error,
>                    BlockdevOnError on_target_error,
>                    BlockCompletionFunc *cb, void *opaque,
> -                  Error **errp);
> +                  BlockJobTxn *txn, Error **errp);
>  
>  void blk_dev_change_media_cb(BlockBackend *blk, bool load);
>  bool blk_dev_has_removable_media(BlockBackend *blk);
> diff --git a/include/block/blockjob.h b/include/block/blockjob.h
> index 57d8ef1..ce57e98 100644
> --- a/include/block/blockjob.h
> +++ b/include/block/blockjob.h
> @@ -122,6 +122,10 @@ struct BlockJob {
>  
>      /** The opaque value that is passed to the completion function.  */
>      void *opaque;
> +
> +    /** Non-NULL if this job is part of a transaction */
> +    BlockJobTxn *txn;
> +    QLIST_ENTRY(BlockJob) txn_list;
>  };
>  
>  /**
> @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
>                                    BlockJobDeferToMainLoopFn *fn,
>                                    void *opaque);
>  
> +/**
> + * block_job_txn_new:
> + *
> + * Allocate and return a new block job transaction.  Jobs can be added to the
> + * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
> + * called when all jobs (if any) have been added.
> + *
> + * All jobs in the transaction either complete successfully or fail/cancel as a
> + * group.  Jobs wait for each other before completing.  Cancelling one job
> + * cancels all jobs in the transaction.
> + */
> +BlockJobTxn *block_job_txn_new(void);
> +
> +/**
> + * block_job_txn_add_job:
> + * @txn: The transaction
> + * @job: Job to add to the transaction
> + *
> + * Add @job to the transaction.  The @job must not already be in a transaction.
> + * The block job driver must call block_job_txn_prepare_to_complete() before
> + * final cleanup and completion.
> + */
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
> +
> +/**
> + * block_job_txn_begin:
> + * @txn: The transaction
> + *
> + * Call this to mark the end of adding jobs to the transaction.  This must be
> + * called even if no jobs were added.
> + */
> +void block_job_txn_begin(BlockJobTxn *txn);
> +
> +/**
> + * block_job_txn_prepare_to_complete:
> + * @txn: The transaction
> + * @job: The block job
> + * @ret: Block job return value (0 for success, otherwise job failure)
> + *
> + * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
> + * @job is cancelled, all other jobs in the transaction will be cancelled.
> + */
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job, int ret);
> +
>  #endif
> diff --git a/trace-events b/trace-events
> index 52b7efa..b6a43a0 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
>  virtio_blk_data_plane_stop(void *s) "dataplane %p"
>  virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
>  
> +# blockjob.c
> +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
> +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
> +
>  # hw/virtio/dataplane/vring.c
>  vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
>  
> 

Bike-shedding comments and a benefit-of-the-doubt aside;

Reviewed-by: John Snow <jsnow@redhat.com>
Stefan Hajnoczi June 30, 2015, 4:20 p.m. UTC | #3
On Mon, Jun 29, 2015 at 06:38:13PM -0400, John Snow wrote:
> On 06/25/2015 08:12 AM, Stefan Hajnoczi wrote:
> > +/* The purpose of this is to keep txn alive until all jobs have been added */
> > +void block_job_txn_begin(BlockJobTxn *txn)
> > +{
> > +    block_job_txn_unref(txn);
> > +}
> > +
> 
> Maybe it's not entirely clear to the caller that "begin" may in fact
> actually delete the BlockJobTxn. Shall we update the caller's pointer to
> NULL in this case as a hint? Passing a **txn will imply that we are
> giving up our ownership of the object.

Good idea.

> > +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
> > + * successful completion.  Runs from main loop.
> > + */
> > +static void block_job_txn_complete(BlockJob *job, void *opaque)
> > +{
> > +    BlockJobTxn *txn = opaque;
> > +    BlockJob *other_job;
> > +    bool aborting = txn->aborting;
> > +
> > +    qemu_mutex_lock(&txn->lock);
> > +    txn->ref++; /* keep txn alive until the end of this loop */
> > +
> > +    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
> > +        AioContext *ctx;
> > +
> > +        qemu_mutex_unlock(&txn->lock);
> > +        ctx = bdrv_get_aio_context(other_job->bs);
> > +        aio_context_acquire(ctx);
> > +
> > +        /* Cancel all other jobs if aborting.  Don't cancel our own failed job
> > +         * since cancellation throws away the error value.
> > +         */
> > +        if (aborting && other_job != job) {
> > +            block_job_cancel(other_job);
> > +        } else {
> > +            block_job_enter(other_job);
> > +        }
> > +
> > +        aio_context_release(ctx);
> > +        qemu_mutex_lock(&txn->lock);
> > +    }
> > +
> > +    qemu_mutex_unlock(&txn->lock);
> > +    block_job_txn_unref(txn);
> > +}
> > +
> 
> Maybe we can name this one along the lines of
> block_job_txn_complete_impl or something clearly internal, so that we
> can name the public interface simply "block_job_txn_complete."
> 
> Maybe I'm just bike shedding, but calling only a "prepare to X" function
> without a matching "X" call in the exposed API seems odd.
> 
> I suppose it doesn't matter, because I can't think of anything nicer :)
> 
> > +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> > +                                                    BlockJob *job,
> > +                                                    int ret)

I'll rename the function to:
void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn, BlockJob *job, int ret)

"complete" is already overloaded.  block_job_completed() is called by
jobs to terminate.  block_job_complete() is called by the monitor to
nudge a waiting job to finish.  They are two different things.  Maybe
using the term in BlockJobTxn makes things more confusing.

(It's possible to drop the txn argument since it can be fetched from
job->txn, but that makes boundary between BlockJob and BlockJobTxn
harder to understand.)

> > +{
> > +    if (!txn) {
> > +        return;
> > +    }
> > +
> > +    qemu_mutex_lock(&txn->lock);
> > +
> > +    /* This function is entered in 3 cases:
> > +     *
> > +     * 1. Successful job completion - wait for other jobs
> > +     * 2. First failed/cancelled job in txn - cancel other jobs and wait
> > +     * 3. Subsequent cancelled jobs - finish immediately, don't wait
> > +     */
> > +    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
> > +                                                  block_job_is_cancelled(job),
> > +                                                  txn->aborting,
> > +                                                  txn->jobs_pending);
> > +
> > +    if (txn->aborting) { /* Case 3 */
> > +        assert(block_job_is_cancelled(job));
> > +        goto out; /* already cancelled, don't yield */
> > +    }
> > +
> 
> So the first failure forces all jobs not-yet-complete to cancel. Is
> there any chance for a race condition of two jobs completing almost
> simultaneously, where the first fails and the second completes, and the
> 2nd job makes it here before it gets canceled?
> 
> BOD: I really assume the answer is "no," but it's not immediately
> evident to me.

Good catch!  It is possible if the 2nd job is entered after the 1st job
yielded but before block_job_txn_complete() is called.  There would have
to be a callback pending for the 2nd job.

It can be fixed by adding a new case for jobs that call
block_job_txn_prepare_to_complete() when block_job_txn_complete has been
scheduled.  The job should simply wait for block_job_txn_complete().

I'll add a test case for this scenario.
diff mbox

Patch

diff --git a/blockjob.c b/blockjob.c
index ec46fad..3c6f1d4 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -400,3 +400,163 @@  void block_job_defer_to_main_loop(BlockJob *job,
 
     qemu_bh_schedule(data->bh);
 }
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+    /* Jobs may be in different AioContexts so protect all fields */
+    QemuMutex lock;
+
+    /* Reference count for txn object */
+    unsigned int ref;
+
+    /* Is this txn cancelling its jobs? */
+    bool aborting;
+
+    /* Number of jobs still running */
+    unsigned int jobs_pending;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+    qemu_mutex_init(&txn->lock);
+    txn->ref = 1; /* dropped by block_job_txn_begin() */
+    txn->aborting = false;
+    txn->jobs_pending = 0;
+    QLIST_INIT(&txn->jobs);
+    return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+    qemu_mutex_lock(&txn->lock);
+
+    if (--txn->ref > 0) {
+        qemu_mutex_unlock(&txn->lock);
+        return;
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    qemu_mutex_destroy(&txn->lock);
+    g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn *txn)
+{
+    block_job_txn_unref(txn);
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++;
+    txn->jobs_pending++;
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion.  Runs from main loop.
+ */
+static void block_job_txn_complete(BlockJob *job, void *opaque)
+{
+    BlockJobTxn *txn = opaque;
+    BlockJob *other_job;
+    bool aborting = txn->aborting;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++; /* keep txn alive until the end of this loop */
+
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        AioContext *ctx;
+
+        qemu_mutex_unlock(&txn->lock);
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+
+        /* Cancel all other jobs if aborting.  Don't cancel our own failed job
+         * since cancellation throws away the error value.
+         */
+        if (aborting && other_job != job) {
+            block_job_cancel(other_job);
+        } else {
+            block_job_enter(other_job);
+        }
+
+        aio_context_release(ctx);
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
+
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+                                                    BlockJob *job,
+                                                    int ret)
+{
+    if (!txn) {
+        return;
+    }
+
+    qemu_mutex_lock(&txn->lock);
+
+    /* This function is entered in 3 cases:
+     *
+     * 1. Successful job completion - wait for other jobs
+     * 2. First failed/cancelled job in txn - cancel other jobs and wait
+     * 3. Subsequent cancelled jobs - finish immediately, don't wait
+     */
+    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
+                                                  block_job_is_cancelled(job),
+                                                  txn->aborting,
+                                                  txn->jobs_pending);
+
+    if (txn->aborting) { /* Case 3 */
+        assert(block_job_is_cancelled(job));
+        goto out; /* already cancelled, don't yield */
+    }
+
+    if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+abort:
+        txn->aborting = true;
+        block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+    } else { /* Case 1 */
+        if (--txn->jobs_pending == 0) {
+            block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+        }
+    }
+
+    /* Wait for block_job_txn_complete() */
+    do {
+        qemu_mutex_unlock(&txn->lock);
+        job->busy = false;
+        qemu_coroutine_yield();
+        job->busy = true;
+        qemu_mutex_lock(&txn->lock);
+
+        if (block_job_is_cancelled(job) && !txn->aborting) {
+            goto abort; /* this job just got cancelled by the user */
+        }
+    } while (!txn->aborting && txn->jobs_pending > 0);
+
+out:
+    trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
+                                                   block_job_is_cancelled(job),
+                                                   txn->aborting,
+                                                   txn->jobs_pending);
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index a4c505d..cb19c73 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -13,6 +13,7 @@ 
 typedef struct BlockDriver BlockDriver;
 typedef struct BlockJob BlockJob;
 typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
 
 typedef struct BlockDriverInfo {
     /* in bytes, 0 if irrelevant */
diff --git a/include/block/block_int.h b/include/block/block_int.h
index ea3e7f0..812a18a 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -639,6 +639,7 @@  void mirror_start(BlockDriverState *bs, BlockDriverState *target,
  * @on_source_error: The action to take upon error reading from the source.
  * @on_target_error: The action to take upon error writing to the target.
  * @cb: Completion function for the job.
+ * @txn: Transaction that this job is part of (may be NULL).
  * @opaque: Opaque pointer value passed to @cb.
  *
  * Start a backup operation on @bs.  Clusters in @bs are written to @target
@@ -650,7 +651,7 @@  void backup_start(BlockDriverState *bs, BlockDriverState *target,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
                   BlockCompletionFunc *cb, void *opaque,
-                  Error **errp);
+                  BlockJobTxn *txn, Error **errp);
 
 void blk_dev_change_media_cb(BlockBackend *blk, bool load);
 bool blk_dev_has_removable_media(BlockBackend *blk);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..ce57e98 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@  struct BlockJob {
 
     /** The opaque value that is passed to the completion function.  */
     void *opaque;
+
+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
 };
 
 /**
@@ -348,4 +352,49 @@  void block_job_defer_to_main_loop(BlockJob *job,
                                   BlockJobDeferToMainLoopFn *fn,
                                   void *opaque);
 
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction
+ *
+ * Call this to mark the end of adding jobs to the transaction.  This must be
+ * called even if no jobs were added.
+ */
+void block_job_txn_begin(BlockJobTxn *txn);
+
+/**
+ * block_job_txn_prepare_to_complete:
+ * @txn: The transaction
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+                                                    BlockJob *job, int ret);
+
 #endif
diff --git a/trace-events b/trace-events
index 52b7efa..b6a43a0 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@  virtio_blk_data_plane_start(void *s) "dataplane %p"
 virtio_blk_data_plane_stop(void *s) "dataplane %p"
 virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
 
+# blockjob.c
+block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
+block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u"
+
 # hw/virtio/dataplane/vring.c
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"