diff mbox

[v2,05/10] block: add block job transactions

Message ID 1436192669-10062-6-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi July 6, 2015, 2:24 p.m. UTC
Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
v2:
 * Set txn pointer to NULL in block_job_txn_begin() [jsnow]
 * Rename block_job_txn_prepare_to_complete to block_job_txn_job_done [jsnow]
 * Rename block_job_txn_complete to block_job_txn_kick [jsnow]
 * Add BLOCK_JOB_TXN_CANCEL_PENDING to solve race condition on cancel [jsnow]
 * Document when txn may be NULL
---
 blockjob.c               | 193 +++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h    |   1 +
 include/block/blockjob.h |  52 +++++++++++++
 trace-events             |   4 +
 4 files changed, 250 insertions(+)

Comments

Fam Zheng July 7, 2015, 7:32 a.m. UTC | #1
On Mon, 07/06 15:24, Stefan Hajnoczi wrote:
> +/**
> + * block_job_txn_add_job:
> + * @txn: The transaction (may be NULL)
> + * @job: Job to add to the transaction
> + *
> + * Add @job to the transaction.  The @job must not already be in a transaction.
> + * The block job driver must call block_job_txn_prepare_to_complete() before

s/block_job_txn_prepare_to_complete/block_job_txn_job_done/

Reading this for a second time I start to feel it too complicated for the good.

I have another idea: in block_job_completed, check if other jobs have failed,
and call this job driver's (imaginary) "abort()" callback accordingly; if all
jobs has succeeded, call a "commit" callback during last block_job_completed.

Does that make sense?

Fam
Stefan Hajnoczi July 7, 2015, 12:59 p.m. UTC | #2
On Tue, Jul 07, 2015 at 03:32:45PM +0800, Fam Zheng wrote:
> On Mon, 07/06 15:24, Stefan Hajnoczi wrote:
> > +/**
> > + * block_job_txn_add_job:
> > + * @txn: The transaction (may be NULL)
> > + * @job: Job to add to the transaction
> > + *
> > + * Add @job to the transaction.  The @job must not already be in a transaction.
> > + * The block job driver must call block_job_txn_prepare_to_complete() before
> 
> s/block_job_txn_prepare_to_complete/block_job_txn_job_done/
> 
> Reading this for a second time I start to feel it too complicated for the good.
> 
> I have another idea: in block_job_completed, check if other jobs have failed,
> and call this job driver's (imaginary) "abort()" callback accordingly; if all
> jobs has succeeded, call a "commit" callback during last block_job_completed.
> 
> Does that make sense?

I think you've skipped the hard part: immediate cancellation.  If a job
is cancelled by the user or a job fails, then all other jobs are
cancelled immediately.

Immediate cancellation has the problem that jobs could be running in any
AioContext, so you need to handle concurrency.  That's where the
locking, juggling AioContexts, and interaction between blockjobs comes
in.

If immediate cancellation is removed then this patch becomes simple:
block_job_txn_job_done() becomes a yield and until the jobs_pending
counter reaches 0.

But now the user must handle cancellation and error cases manually by
invoking QMP block-job-cancel on all the other jobs.  In other words,
we're requiring that the user implements their own BlockJobTxn struct
that keeps track of related jobs.  If they don't keep track then they
may not be able to terminate jobs after failure.

This is why QEMU should be the one to complete or fail all block jobs in
the transaction.  It's simpler if we don't do that but the API then
requires more complexity in the user to be used correctly.

Stefan
Fam Zheng July 8, 2015, 1:59 a.m. UTC | #3
On Tue, 07/07 13:59, Stefan Hajnoczi wrote:
> On Tue, Jul 07, 2015 at 03:32:45PM +0800, Fam Zheng wrote:
> > On Mon, 07/06 15:24, Stefan Hajnoczi wrote:
> > > +/**
> > > + * block_job_txn_add_job:
> > > + * @txn: The transaction (may be NULL)
> > > + * @job: Job to add to the transaction
> > > + *
> > > + * Add @job to the transaction.  The @job must not already be in a transaction.
> > > + * The block job driver must call block_job_txn_prepare_to_complete() before
> > 
> > s/block_job_txn_prepare_to_complete/block_job_txn_job_done/
> > 
> > Reading this for a second time I start to feel it too complicated for the good.
> > 
> > I have another idea: in block_job_completed, check if other jobs have failed,
> > and call this job driver's (imaginary) "abort()" callback accordingly; if all
> > jobs has succeeded, call a "commit" callback during last block_job_completed.
> > 
> > Does that make sense?
> 
> I think you've skipped the hard part: immediate cancellation.  If a job
> is cancelled by the user or a job fails, then all other jobs are
> cancelled immediately.
> 
> Immediate cancellation has the problem that jobs could be running in any
> AioContext, so you need to handle concurrency.  That's where the
> locking, juggling AioContexts, and interaction between blockjobs comes
> in.

OK, let me try again:

The idea is intercepting job->cb so we can handle jobs completely in
block_job_completed (which is in main loop), rather than the coroutines that
can be any AioContext.

1) If a job is cancelled or failed, it goes to block_job_completed immediately,
with block_job_is_cancelled() == true. In this case, we call
block_job_cancel_sync on all other jobs, and then call "abort()" callbacks to
reclaim any bitmaps, then emit QMP events. Some other jobs may have already
completed before this point, but it's not a problem because we always defer the
actual completions (abort/commit and QMP) altogether.

2) If there is no job failed or canceled, in the last block_job_completed, we
call "commit()" to abdicate bitmaps, and emit the QMP events.

This would still require BlockJobTxn to track the block jobs in a group, but
hopefully it could reduce the complexity of interactions between block jobs.

I can prototype it if this isn't missing anything obvious.

Fam

> 
> If immediate cancellation is removed then this patch becomes simple:
> block_job_txn_job_done() becomes a yield and until the jobs_pending
> counter reaches 0.
> 
> But now the user must handle cancellation and error cases manually by
> invoking QMP block-job-cancel on all the other jobs.  In other words,
> we're requiring that the user implements their own BlockJobTxn struct
> that keeps track of related jobs.  If they don't keep track then they
> may not be able to terminate jobs after failure.
> 
> This is why QEMU should be the one to complete or fail all block jobs in
> the transaction.  It's simpler if we don't do that but the API then
> requires more complexity in the user to be used correctly.
Stefan Hajnoczi July 8, 2015, 1:36 p.m. UTC | #4
On Wed, Jul 08, 2015 at 09:59:24AM +0800, Fam Zheng wrote:
> On Tue, 07/07 13:59, Stefan Hajnoczi wrote:
> > On Tue, Jul 07, 2015 at 03:32:45PM +0800, Fam Zheng wrote:
> > > On Mon, 07/06 15:24, Stefan Hajnoczi wrote:
> > > > +/**
> > > > + * block_job_txn_add_job:
> > > > + * @txn: The transaction (may be NULL)
> > > > + * @job: Job to add to the transaction
> > > > + *
> > > > + * Add @job to the transaction.  The @job must not already be in a transaction.
> > > > + * The block job driver must call block_job_txn_prepare_to_complete() before
> > > 
> > > s/block_job_txn_prepare_to_complete/block_job_txn_job_done/
> > > 
> > > Reading this for a second time I start to feel it too complicated for the good.
> > > 
> > > I have another idea: in block_job_completed, check if other jobs have failed,
> > > and call this job driver's (imaginary) "abort()" callback accordingly; if all
> > > jobs has succeeded, call a "commit" callback during last block_job_completed.
> > > 
> > > Does that make sense?
> > 
> > I think you've skipped the hard part: immediate cancellation.  If a job
> > is cancelled by the user or a job fails, then all other jobs are
> > cancelled immediately.
> > 
> > Immediate cancellation has the problem that jobs could be running in any
> > AioContext, so you need to handle concurrency.  That's where the
> > locking, juggling AioContexts, and interaction between blockjobs comes
> > in.
> 
> OK, let me try again:
> 
> The idea is intercepting job->cb so we can handle jobs completely in
> block_job_completed (which is in main loop), rather than the coroutines that
> can be any AioContext.
> 
> 1) If a job is cancelled or failed, it goes to block_job_completed immediately,
> with block_job_is_cancelled() == true. In this case, we call
> block_job_cancel_sync on all other jobs, and then call "abort()" callbacks to
> reclaim any bitmaps, then emit QMP events. Some other jobs may have already
> completed before this point, but it's not a problem because we always defer the
> actual completions (abort/commit and QMP) altogether.
> 
> 2) If there is no job failed or canceled, in the last block_job_completed, we
> call "commit()" to abdicate bitmaps, and emit the QMP events.
> 
> This would still require BlockJobTxn to track the block jobs in a group, but
> hopefully it could reduce the complexity of interactions between block jobs.
> 
> I can prototype it if this isn't missing anything obvious.

Yes, please try it.  It's half-way between what John originally did and
what I did.  It might be the simplest solution.

Be careful with the final piece of code used to complete jobs from
block_job_defer_to_main_loop().  It runs from a BH in the main loop
after the coroutine has terminated.  In the fail/cancel case you might
need to protect against race conditions - especially if two jobs finish
in the same event loop iteration.

I didn't handle that since block_job_txn_job_done() is called while the
coroutine is still alive.
diff mbox

Patch

diff --git a/blockjob.c b/blockjob.c
index ec46fad..d1f0206 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -400,3 +400,196 @@  void block_job_defer_to_main_loop(BlockJob *job,
 
     qemu_bh_schedule(data->bh);
 }
+
+typedef enum {
+    BLOCK_JOB_TXN_OK,               /* no job failed yet */
+    BLOCK_JOB_TXN_CANCEL_PENDING,   /* kick scheduled to cancel jobs */
+    BLOCK_JOB_TXN_CANCEL_DONE,      /* cancelled jobs can terminate now */
+} BlockJobTxnState;
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+    /* Jobs may be in different AioContexts so protect all fields */
+    QemuMutex lock;
+
+    /* Reference count for txn object */
+    unsigned int ref;
+
+    /* Is this txn ok or are jobs being cancelled? */
+    BlockJobTxnState state;
+
+    /* Number of jobs still running */
+    unsigned int jobs_pending;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+    qemu_mutex_init(&txn->lock);
+    txn->ref = 1; /* dropped by block_job_txn_begin() */
+    txn->state = BLOCK_JOB_TXN_OK;
+    txn->jobs_pending = 0;
+    QLIST_INIT(&txn->jobs);
+    return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+    qemu_mutex_lock(&txn->lock);
+
+    if (--txn->ref > 0) {
+        qemu_mutex_unlock(&txn->lock);
+        return;
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    qemu_mutex_destroy(&txn->lock);
+    g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn **txn)
+{
+    block_job_txn_unref(*txn);
+    *txn = NULL;
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++;
+    txn->jobs_pending++;
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion.  Runs from main loop.
+ */
+static void block_job_txn_kick(BlockJob *job, void *opaque)
+{
+    BlockJobTxn *txn = opaque;
+    BlockJob *other_job;
+    GSList *ctxs = NULL;
+    GSList *ctxs_iter;
+    bool cancel = false;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++; /* keep txn alive until the end of this loop */
+
+    /* Acquire AioContexts so jobs cannot race with us */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        AioContext *ctx;
+
+        qemu_mutex_unlock(&txn->lock);
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+        ctxs = g_slist_prepend(ctxs, ctx);
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    /* From here on block_job_txn_job_done() callers should not wait */
+    if (txn->state == BLOCK_JOB_TXN_CANCEL_PENDING) {
+        txn->state = BLOCK_JOB_TXN_CANCEL_DONE;
+        cancel = true;
+    }
+
+    /* Kick jobs */
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        qemu_mutex_unlock(&txn->lock);
+
+        /* Don't cancel our own failed job since cancellation throws away the
+         * error value.
+         */
+        if (cancel && other_job != job) {
+            block_job_cancel(other_job);
+        } else {
+            block_job_enter(other_job);
+        }
+
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+
+    /* Release AioContexts */
+    for (ctxs_iter = ctxs; ctxs_iter; ctxs_iter = ctxs_iter->next) {
+        aio_context_release(ctxs_iter->data);
+    }
+    g_slist_free(ctxs);
+}
+
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+                                         BlockJob *job,
+                                         int ret)
+{
+    if (!txn) {
+        return;
+    }
+
+    qemu_mutex_lock(&txn->lock);
+
+    /* This function is entered in 4 cases:
+     *
+     * 1. Successful job completion - wait for other jobs
+     * 2. First failed/cancelled job in txn - cancel other jobs and wait
+     * 3. Subsequent cancelled jobs - finish immediately, don't wait
+     * 4. While kick is pending - wait for kick to cancel or wake us
+     */
+    trace_block_job_txn_job_done_entry(txn, job, ret,
+                                       block_job_is_cancelled(job),
+                                       txn->state,
+                                       txn->jobs_pending);
+
+    if (txn->state == BLOCK_JOB_TXN_CANCEL_DONE) { /* Case 3 */
+        assert(block_job_is_cancelled(job));
+        goto out; /* already cancelled, don't yield */
+    }
+
+    if (txn->state == BLOCK_JOB_TXN_OK) {
+        if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+            txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+            block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+        } else { /* Case 1 */
+            if (--txn->jobs_pending == 0) {
+                block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+            }
+        }
+    }
+
+    /* Wait for block_job_txn_kick() in cases 1, 2, and 4 */
+    do {
+        qemu_mutex_unlock(&txn->lock);
+        job->busy = false;
+        qemu_coroutine_yield();
+        job->busy = true;
+        qemu_mutex_lock(&txn->lock);
+
+        /* Did the user just cancel this job? */
+        if (block_job_is_cancelled(job) && txn->state == BLOCK_JOB_TXN_OK) {
+            txn->state = BLOCK_JOB_TXN_CANCEL_PENDING;
+            block_job_defer_to_main_loop(job, block_job_txn_kick, txn);
+        }
+    } while (txn->state != BLOCK_JOB_TXN_CANCEL_DONE &&
+             txn->jobs_pending > 0);
+
+out:
+    trace_block_job_txn_job_done_return(txn, job, ret,
+                                        block_job_is_cancelled(job),
+                                        txn->state,
+                                        txn->jobs_pending);
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index 7437590..c7fc5b6 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -13,6 +13,7 @@ 
 typedef struct BlockDriver BlockDriver;
 typedef struct BlockJob BlockJob;
 typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
 
 typedef struct BlockDriverInfo {
     /* in bytes, 0 if irrelevant */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..7d6ffb7 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@  struct BlockJob {
 
     /** The opaque value that is passed to the completion function.  */
     void *opaque;
+
+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
 };
 
 /**
@@ -348,4 +352,52 @@  void block_job_defer_to_main_loop(BlockJob *job,
                                   BlockJobDeferToMainLoopFn *fn,
                                   void *opaque);
 
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction pointer
+ *
+ * Call this to mark the end of adding jobs to the transaction.  This must be
+ * called even if no jobs were added.
+ *
+ * The caller may not add jobs after the transaction begins so the @txn pointer
+ * is set to NULL to show that the caller has released ownership.
+ */
+void block_job_txn_begin(BlockJobTxn **txn);
+
+/**
+ * block_job_txn_job_done:
+ * @txn: The transaction (may be NULL)
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_job_done(BlockJobTxn *txn,
+                                         BlockJob *job, int ret);
+
 #endif
diff --git a/trace-events b/trace-events
index 52b7efa..5877289 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@  virtio_blk_data_plane_start(void *s) "dataplane %p"
 virtio_blk_data_plane_stop(void *s) "dataplane %p"
 virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
 
+# blockjob.c
+block_job_txn_job_done_entry(void *txn, void *job, int ret, bool cancelled, int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d status %d jobs_pending %u"
+block_job_txn_job_done_return(void *txn, void *job, int ret, bool cancelled, int status, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d status %d jobs_pending %u"
+
 # hw/virtio/dataplane/vring.c
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"