diff mbox

[02/20] blockjob: introduce .drain callback for jobs

Message ID 1477565348-5458-3-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Oct. 27, 2016, 10:48 a.m. UTC
This is required to decouple block jobs from running in an
AioContext.  With multiqueue block devices, a BlockDriverState
does not really belong to a single AioContext.

The solution is to first wait until all I/O operations are
complete; then loop in the main thread for the block job to
complete entirely.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/backup.c           | 17 +++++++++++++++++
 block/mirror.c           | 35 +++++++++++++++++++++++++++--------
 blockjob.c               | 37 ++++++++++++++++++++-----------------
 include/block/blockjob.h |  7 +++++++
 4 files changed, 71 insertions(+), 25 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy Dec. 2, 2016, 2:01 p.m. UTC | #1
27.10.2016 13:48, Paolo Bonzini wrote:
> This is required to decouple block jobs from running in an
> AioContext.  With multiqueue block devices, a BlockDriverState
> does not really belong to a single AioContext.
>
> The solution is to first wait until all I/O operations are
> complete; then loop in the main thread for the block job to
> complete entirely.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---

[...]

> @@ -413,14 +409,21 @@ static int block_job_finish_sync(BlockJob *job,
>       assert(blk_bs(job->blk)->job == job);
>   
>       block_job_ref(job);
> +
>       finish(job, &local_err);
>       if (local_err) {
>           error_propagate(errp, local_err);
>           block_job_unref(job);
>           return -EBUSY;
>       }
> +    /* block_job_drain calls block_job_enter, and it should be enough to
> +     * induce progress until the job completes or moves to the main thread.
> +    */

Hi Paolo!

Looks like I have a problem with this. block_job_drain enters the job 
only if job.busy = false. But what if job yielded with busy = true?

My case is the following: in the job I call co_aio_sleep_ns() for some 
time without setting job.busy to false, and it looks like timer doesn't 
work while we are in "while() { block_job_drain() }" loop. If I just set 
"job.busy = false" and "job.busy = true" around co_aio_sleep_ns() all 
start to work.

I don't want set job.busy to false, because actually job is working - 
several additional coroutines do their work, only the main one (job.co) 
do nothing. I can remove timer, and make other coroutines wake up the 
main one when it needed, and, anyway it looks like better way..

But the question is: is it ok, that we can't use sleep timer in the job, 
without setting busy = true? Is it right that only io can wake up block 
job coroutine, if it yielded without setting busy=false?

> +    while (!job->deferred_to_main_loop && !job->completed) {
> +        block_job_drain(job);
> +    }
>       while (!job->completed) {
> -        aio_poll(block_job_get_aio_context(job), true);
> +        aio_poll(qemu_get_aio_context(), true);
>       }
>       ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
>       block_job_unref(job);
>
Paolo Bonzini Dec. 3, 2016, 9:12 a.m. UTC | #2
----- Original Message -----
> From: "Vladimir Sementsov-Ogievskiy" <vsementsov@virtuozzo.com>
> To: "Paolo Bonzini" <pbonzini@redhat.com>, qemu-devel@nongnu.org
> Cc: kwolf@redhat.com, famz@redhat.com, stefanha@redhat.com
> Sent: Friday, December 2, 2016 3:01:30 PM
> Subject: Re: [Qemu-devel] [PATCH 02/20] blockjob: introduce .drain callback for jobs
> 
> 27.10.2016 13:48, Paolo Bonzini wrote:
> > This is required to decouple block jobs from running in an
> > AioContext.  With multiqueue block devices, a BlockDriverState
> > does not really belong to a single AioContext.
> >
> > The solution is to first wait until all I/O operations are
> > complete; then loop in the main thread for the block job to
> > complete entirely.
>
> Looks like I have a problem with this. block_job_drain enters the job
> only if job.busy = false. But what if job yielded with busy = true?
> 
> My case is the following: in the job I call co_aio_sleep_ns() for some
> time without setting job.busy to false, and it looks like timer doesn't
> work while we are in "while() { block_job_drain() }" loop. If I just set
> "job.busy = false" and "job.busy = true" around co_aio_sleep_ns() all
> start to work.
> 
> I don't want set job.busy to false, because actually job is working -
> several additional coroutines do their work, only the main one (job.co)
> do nothing. I can remove timer, and make other coroutines wake up the
> main one when it needed, and, anyway it looks like better way..
> 
> But the question is: is it ok, that we can't use sleep timer in the job,
> without setting busy = true? Is it right that only io can wake up block
> job coroutine, if it yielded without setting busy=false?

That's more or less correct.  See for example mirror.c.  Whenever it yields
with busy=false, mirror_write_complete will wake the coroutine.

Note that it's also okay to use co_aio_sleep_ns if I/O and _also_ wake the
coroutine on I/O.  Reentering a coroutine automatically interrupts the sleep.

Paolo

> > +    while (!job->deferred_to_main_loop && !job->completed) {
> > +        block_job_drain(job);
> > +    }
> >       while (!job->completed) {
> > -        aio_poll(block_job_get_aio_context(job), true);
> > +        aio_poll(qemu_get_aio_context(), true);
> >       }
> >       ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
> >       block_job_unref(job);
> >
> 
> 
> --
> Best regards,
> Vladimir
> 
>
diff mbox

Patch

diff --git a/block/backup.c b/block/backup.c
index 02dbe48..81d4042 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -300,6 +300,21 @@  void backup_cow_request_end(CowRequest *req)
     cow_request_end(req);
 }
 
+static void backup_drain(BlockJob *job)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+
+    /* Need to keep a reference in case blk_drain triggers execution
+     * of backup_complete...
+     */
+    if (s->target) {
+        BlockBackend *target = s->target;
+        blk_ref(target);
+        blk_drain(target);
+        blk_unref(target);
+    }
+}
+
 static const BlockJobDriver backup_job_driver = {
     .instance_size          = sizeof(BackupBlockJob),
     .job_type               = BLOCK_JOB_TYPE_BACKUP,
@@ -307,6 +322,7 @@  static const BlockJobDriver backup_job_driver = {
     .commit                 = backup_commit,
     .abort                  = backup_abort,
     .attached_aio_context   = backup_attached_aio_context,
+    .drain                  = backup_drain,
 };
 
 static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -331,6 +347,7 @@  static void backup_complete(BlockJob *job, void *opaque)
     BackupCompleteData *data = opaque;
 
     blk_unref(s->target);
+    s->target = NULL;
 
     block_job_completed(job, data->ret);
     g_free(data);
diff --git a/block/mirror.c b/block/mirror.c
index a433e68..52f879a 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -469,7 +469,11 @@  static void mirror_free_init(MirrorBlockJob *s)
     }
 }
 
-static void mirror_drain(MirrorBlockJob *s)
+/* This is also used for the .pause callback. There is no matching
+ * mirror_resume() because mirror_run() will begin iterating again
+ * when the job is resumed.
+ */
+static void mirror_wait_for_all_io(MirrorBlockJob *s)
 {
     while (s->in_flight > 0) {
         mirror_wait_for_io(s);
@@ -528,6 +532,7 @@  static void mirror_exit(BlockJob *job, void *opaque)
     g_free(s->replaces);
     bdrv_op_unblock_all(target_bs, s->common.blocker);
     blk_unref(s->target);
+    s->target = NULL;
     block_job_completed(&s->common, data->ret);
     g_free(data);
     bdrv_drained_end(src);
@@ -582,7 +587,7 @@  static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
             sector_num += nb_sectors;
         }
 
-        mirror_drain(s);
+        mirror_wait_for_all_io(s);
     }
 
     /* First part, loop on the sectors and initialize the dirty bitmap.  */
@@ -787,7 +792,7 @@  immediate_exit:
          * the target is a copy of the source.
          */
         assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
-        mirror_drain(s);
+        mirror_wait_for_all_io(s);
     }
 
     assert(s->in_flight == 0);
@@ -872,14 +877,11 @@  static void mirror_complete(BlockJob *job, Error **errp)
     block_job_enter(&s->common);
 }
 
-/* There is no matching mirror_resume() because mirror_run() will begin
- * iterating again when the job is resumed.
- */
-static void coroutine_fn mirror_pause(BlockJob *job)
+static void mirror_pause(BlockJob *job)
 {
     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
 
-    mirror_drain(s);
+    mirror_wait_for_all_io(s);
 }
 
 static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
@@ -889,6 +891,21 @@  static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
     blk_set_aio_context(s->target, new_context);
 }
 
+static void mirror_drain(BlockJob *job)
+{
+    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+
+    /* Need to keep a reference in case blk_drain triggers execution
+     * of mirror_complete...
+     */
+    if (s->target) {
+        BlockBackend *target = s->target;
+        blk_ref(target);
+        blk_drain(target);
+        blk_unref(target);
+    }
+}
+
 static const BlockJobDriver mirror_job_driver = {
     .instance_size          = sizeof(MirrorBlockJob),
     .job_type               = BLOCK_JOB_TYPE_MIRROR,
@@ -896,6 +913,7 @@  static const BlockJobDriver mirror_job_driver = {
     .complete               = mirror_complete,
     .pause                  = mirror_pause,
     .attached_aio_context   = mirror_attached_aio_context,
+    .drain                  = mirror_drain,
 };
 
 static const BlockJobDriver commit_active_job_driver = {
@@ -905,6 +923,7 @@  static const BlockJobDriver commit_active_job_driver = {
     .complete               = mirror_complete,
     .pause                  = mirror_pause,
     .attached_aio_context   = mirror_attached_aio_context,
+    .drain                  = mirror_drain,
 };
 
 static void mirror_start_job(const char *job_id, BlockDriverState *bs,
diff --git a/blockjob.c b/blockjob.c
index 43fecbe..7c88b30 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -74,17 +74,6 @@  BlockJob *block_job_get(const char *id)
     return NULL;
 }
 
-/* Normally the job runs in its BlockBackend's AioContext.  The exception is
- * block_job_defer_to_main_loop() where it runs in the QEMU main loop.  Code
- * that supports both cases uses this helper function.
- */
-static AioContext *block_job_get_aio_context(BlockJob *job)
-{
-    return job->deferred_to_main_loop ?
-           qemu_get_aio_context() :
-           blk_get_aio_context(job->blk);
-}
-
 static void block_job_attached_aio_context(AioContext *new_context,
                                            void *opaque)
 {
@@ -97,6 +86,17 @@  static void block_job_attached_aio_context(AioContext *new_context,
     block_job_resume(job);
 }
 
+static void block_job_drain(BlockJob *job)
+{
+    /* If job is !job->busy this kicks it into the next pause point. */
+    block_job_enter(job);
+
+    blk_drain(job->blk);
+    if (job->driver->drain) {
+        job->driver->drain(job);
+    }
+}
+
 static void block_job_detach_aio_context(void *opaque)
 {
     BlockJob *job = opaque;
@@ -106,12 +106,8 @@  static void block_job_detach_aio_context(void *opaque)
 
     block_job_pause(job);
 
-    if (!job->paused) {
-        /* If job is !job->busy this kicks it into the next pause point. */
-        block_job_enter(job);
-    }
     while (!job->paused && !job->completed) {
-        aio_poll(block_job_get_aio_context(job), true);
+        block_job_drain(job);
     }
 
     block_job_unref(job);
@@ -413,14 +409,21 @@  static int block_job_finish_sync(BlockJob *job,
     assert(blk_bs(job->blk)->job == job);
 
     block_job_ref(job);
+
     finish(job, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         block_job_unref(job);
         return -EBUSY;
     }
+    /* block_job_drain calls block_job_enter, and it should be enough to
+     * induce progress until the job completes or moves to the main thread.
+    */
+    while (!job->deferred_to_main_loop && !job->completed) {
+        block_job_drain(job);
+    }
     while (!job->completed) {
-        aio_poll(block_job_get_aio_context(job), true);
+        aio_poll(qemu_get_aio_context(), true);
     }
     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
     block_job_unref(job);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 4ddb4ae..2bb39f4 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -92,6 +92,13 @@  typedef struct BlockJobDriver {
      * besides job->blk to the new AioContext.
      */
     void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
+
+    /*
+     * If the callback is not NULL, it will be invoked when the job has to be
+     * synchronously cancelled or completed; it should drain BlockDriverStates
+     * as required to ensure progress.
+     */
+    void (*drain)(BlockJob *job);
 } BlockJobDriver;
 
 /**