diff mbox

[v3,4/6] blockjob: add block_job_start

Message ID 1478109056-25198-5-git-send-email-jsnow@redhat.com
State New
Headers show

Commit Message

John Snow Nov. 2, 2016, 5:50 p.m. UTC
Instead of automatically starting jobs at creation time via backup_start
et al, we'd like to return a job object pointer that can be started
manually at later point in time.

For now, add the block_job_start mechanism and start the jobs
automatically as we have been doing, with conversions job-by-job coming
in later patches.

Of note: cancellation of unstarted jobs will perform all the normal
cleanup as if the job had started, particularly abort and clean. The
only difference is that we will not emit any events, because the job
never actually started.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 block/backup.c            |  3 +--
 block/commit.c            |  3 +--
 block/mirror.c            |  3 +--
 block/stream.c            |  3 +--
 blockjob.c                | 51 ++++++++++++++++++++++++++++++++++++-----------
 include/block/blockjob.h  |  9 +++++++++
 tests/test-blockjob-txn.c | 12 +++++------
 7 files changed, 58 insertions(+), 26 deletions(-)

Comments

Kevin Wolf Nov. 3, 2016, 12:17 p.m. UTC | #1
Am 02.11.2016 um 18:50 hat John Snow geschrieben:
> Instead of automatically starting jobs at creation time via backup_start
> et al, we'd like to return a job object pointer that can be started
> manually at later point in time.
> 
> For now, add the block_job_start mechanism and start the jobs
> automatically as we have been doing, with conversions job-by-job coming
> in later patches.
> 
> Of note: cancellation of unstarted jobs will perform all the normal
> cleanup as if the job had started, particularly abort and clean. The
> only difference is that we will not emit any events, because the job
> never actually started.
> 
> Signed-off-by: John Snow <jsnow@redhat.com>

> diff --git a/block/commit.c b/block/commit.c
> index 20d27e2..5b7c454 100644
> --- a/block/commit.c
> +++ b/block/commit.c
> @@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
>      s->backing_file_str = g_strdup(backing_file_str);
>  
>      s->on_error = on_error;
> -    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
>  
>      trace_commit_start(bs, base, top, s, s->common.co);

s->common.co is now uninitialised and should probably be removed from
the tracepoint arguments. The same is true for mirror and stream.

> -    qemu_coroutine_enter(s->common.co);
> +    block_job_start(&s->common);
>  }

> diff --git a/blockjob.c b/blockjob.c
> index e3c458c..16c5159 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -174,7 +174,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
>      job->blk           = blk;
>      job->cb            = cb;
>      job->opaque        = opaque;
> -    job->busy          = true;
> +    job->busy          = false;
> +    job->paused        = true;
>      job->refcnt        = 1;
>      bs->job = job;
>  
> @@ -202,6 +203,21 @@ bool block_job_is_internal(BlockJob *job)
>      return (job->id == NULL);
>  }
>  
> +static bool block_job_started(BlockJob *job)
> +{
> +    return job->co;
> +}
> +
> +void block_job_start(BlockJob *job)
> +{
> +    assert(job && !block_job_started(job) && job->paused &&
> +           !job->busy && job->driver->start);
> +    job->paused = false;
> +    job->busy = true;
> +    job->co = qemu_coroutine_create(job->driver->start, job);
> +    qemu_coroutine_enter(job->co);
> +}

We allow the user to pause a job while it's not started yet. You
classified this as "harmless". But if we accept this, can we really
unconditionally enter the coroutine even if the job has been paused?
Can't a user expect that a job remains in paused state when they
explicitly requested a pause and the job was already internally paused,
like in this case by block_job_create()?

The same probably also applies to the internal job pausing during
bdrv_drain_all_begin/end, though as you know there is a larger problem
with starting jobs under drain_all anyway. For now, we just need to keep
in mind that we can neither create nor start a job in such sections.

Kevin
John Snow Nov. 8, 2016, 2:02 a.m. UTC | #2
On 11/03/2016 08:17 AM, Kevin Wolf wrote:
> Am 02.11.2016 um 18:50 hat John Snow geschrieben:
>> Instead of automatically starting jobs at creation time via backup_start
>> et al, we'd like to return a job object pointer that can be started
>> manually at later point in time.
>>
>> For now, add the block_job_start mechanism and start the jobs
>> automatically as we have been doing, with conversions job-by-job coming
>> in later patches.
>>
>> Of note: cancellation of unstarted jobs will perform all the normal
>> cleanup as if the job had started, particularly abort and clean. The
>> only difference is that we will not emit any events, because the job
>> never actually started.
>>
>> Signed-off-by: John Snow <jsnow@redhat.com>
>
>> diff --git a/block/commit.c b/block/commit.c
>> index 20d27e2..5b7c454 100644
>> --- a/block/commit.c
>> +++ b/block/commit.c
>> @@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
>>      s->backing_file_str = g_strdup(backing_file_str);
>>
>>      s->on_error = on_error;
>> -    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
>>
>>      trace_commit_start(bs, base, top, s, s->common.co);
>
> s->common.co is now uninitialised and should probably be removed from
> the tracepoint arguments. The same is true for mirror and stream.
>
>> -    qemu_coroutine_enter(s->common.co);
>> +    block_job_start(&s->common);
>>  }
>
>> diff --git a/blockjob.c b/blockjob.c
>> index e3c458c..16c5159 100644
>> --- a/blockjob.c
>> +++ b/blockjob.c
>> @@ -174,7 +174,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
>>      job->blk           = blk;
>>      job->cb            = cb;
>>      job->opaque        = opaque;
>> -    job->busy          = true;
>> +    job->busy          = false;
>> +    job->paused        = true;
>>      job->refcnt        = 1;
>>      bs->job = job;
>>
>> @@ -202,6 +203,21 @@ bool block_job_is_internal(BlockJob *job)
>>      return (job->id == NULL);
>>  }
>>
>> +static bool block_job_started(BlockJob *job)
>> +{
>> +    return job->co;
>> +}
>> +
>> +void block_job_start(BlockJob *job)
>> +{
>> +    assert(job && !block_job_started(job) && job->paused &&
>> +           !job->busy && job->driver->start);
>> +    job->paused = false;
>> +    job->busy = true;
>> +    job->co = qemu_coroutine_create(job->driver->start, job);
>> +    qemu_coroutine_enter(job->co);
>> +}
>
> We allow the user to pause a job while it's not started yet. You
> classified this as "harmless". But if we accept this, can we really
> unconditionally enter the coroutine even if the job has been paused?
> Can't a user expect that a job remains in paused state when they
> explicitly requested a pause and the job was already internally paused,
> like in this case by block_job_create()?
>

What will end up happening is that we'll enter the job, and then it'll 
pause immediately upon entrance. Is that a problem?

If the jobs themselves are not checking their pause state fastidiously, 
it could be (but block/backup does -- after it creates a write notifier.)

Do we want a stronger guarantee here?

Naively I think it's OK as-is, but I could add a stronger boolean in 
that lets us know if it's okay to start or not, and we could delay the 
actual creation and start until the 'resume' comes in if you'd like.

I'd like to avoid the complexity if we can help it, but perhaps I'm not 
thinking carefully enough about the existing edge cases.

> The same probably also applies to the internal job pausing during
> bdrv_drain_all_begin/end, though as you know there is a larger problem
> with starting jobs under drain_all anyway. For now, we just need to keep
> in mind that we can neither create nor start a job in such sections.
>

Yeah, there are deeper problems there. As long as the existing critical 
sections don't allow us to create jobs (started or not) I think we're 
probably already OK.

> Kevin
>
Jeff Cody Nov. 8, 2016, 2:05 a.m. UTC | #3
On Mon, Nov 07, 2016 at 09:02:14PM -0500, John Snow wrote:
> 
> 
> On 11/03/2016 08:17 AM, Kevin Wolf wrote:
> >Am 02.11.2016 um 18:50 hat John Snow geschrieben:
> >>Instead of automatically starting jobs at creation time via backup_start
> >>et al, we'd like to return a job object pointer that can be started
> >>manually at later point in time.
> >>
> >>For now, add the block_job_start mechanism and start the jobs
> >>automatically as we have been doing, with conversions job-by-job coming
> >>in later patches.
> >>
> >>Of note: cancellation of unstarted jobs will perform all the normal
> >>cleanup as if the job had started, particularly abort and clean. The
> >>only difference is that we will not emit any events, because the job
> >>never actually started.
> >>
> >>Signed-off-by: John Snow <jsnow@redhat.com>
> >
> >>diff --git a/block/commit.c b/block/commit.c
> >>index 20d27e2..5b7c454 100644
> >>--- a/block/commit.c
> >>+++ b/block/commit.c
> >>@@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
> >>     s->backing_file_str = g_strdup(backing_file_str);
> >>
> >>     s->on_error = on_error;
> >>-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
> >>
> >>     trace_commit_start(bs, base, top, s, s->common.co);
> >
> >s->common.co is now uninitialised and should probably be removed from
> >the tracepoint arguments. The same is true for mirror and stream.
> >
> >>-    qemu_coroutine_enter(s->common.co);
> >>+    block_job_start(&s->common);
> >> }
> >
> >>diff --git a/blockjob.c b/blockjob.c
> >>index e3c458c..16c5159 100644
> >>--- a/blockjob.c
> >>+++ b/blockjob.c
> >>@@ -174,7 +174,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
> >>     job->blk           = blk;
> >>     job->cb            = cb;
> >>     job->opaque        = opaque;
> >>-    job->busy          = true;
> >>+    job->busy          = false;
> >>+    job->paused        = true;
> >>     job->refcnt        = 1;
> >>     bs->job = job;
> >>
> >>@@ -202,6 +203,21 @@ bool block_job_is_internal(BlockJob *job)
> >>     return (job->id == NULL);
> >> }
> >>
> >>+static bool block_job_started(BlockJob *job)
> >>+{
> >>+    return job->co;
> >>+}
> >>+
> >>+void block_job_start(BlockJob *job)
> >>+{
> >>+    assert(job && !block_job_started(job) && job->paused &&
> >>+           !job->busy && job->driver->start);
> >>+    job->paused = false;
> >>+    job->busy = true;
> >>+    job->co = qemu_coroutine_create(job->driver->start, job);
> >>+    qemu_coroutine_enter(job->co);
> >>+}
> >
> >We allow the user to pause a job while it's not started yet. You
> >classified this as "harmless". But if we accept this, can we really
> >unconditionally enter the coroutine even if the job has been paused?
> >Can't a user expect that a job remains in paused state when they
> >explicitly requested a pause and the job was already internally paused,
> >like in this case by block_job_create()?
> >
> 
> What will end up happening is that we'll enter the job, and then it'll pause
> immediately upon entrance. Is that a problem?
> 
> If the jobs themselves are not checking their pause state fastidiously, it
> could be (but block/backup does -- after it creates a write notifier.)
> 
> Do we want a stronger guarantee here?
> 
> Naively I think it's OK as-is, but I could add a stronger boolean in that
> lets us know if it's okay to start or not, and we could delay the actual
> creation and start until the 'resume' comes in if you'd like.
> 
> I'd like to avoid the complexity if we can help it, but perhaps I'm not
> thinking carefully enough about the existing edge cases.
> 

Is there any reason we can't just use job->pause_count here?  When the job
is created, set job->paused = true, and job->pause_count = 1.  In the
block_job_start(), check the pause_count prior to qemu_coroutine_enter():

    void block_job_start(BlockJob *job)
    {
        assert(job && !block_job_started(job) && job->paused &&
              !job->busy && job->driver->start);
        job->co = qemu_coroutine_create(job->driver->start, job);
        job->paused = --job->pause_count > 0;
        if (!job->paused) {
            job->busy = true;
            qemu_coroutine_enter(job->co);
        }
    }


> >The same probably also applies to the internal job pausing during
> >bdrv_drain_all_begin/end, though as you know there is a larger problem
> >with starting jobs under drain_all anyway. For now, we just need to keep
> >in mind that we can neither create nor start a job in such sections.
> >
> 
> Yeah, there are deeper problems there. As long as the existing critical
> sections don't allow us to create jobs (started or not) I think we're
> probably already OK.
> 
> >Kevin
> >
John Snow Nov. 8, 2016, 2:20 a.m. UTC | #4
On 11/07/2016 09:05 PM, Jeff Cody wrote:
> On Mon, Nov 07, 2016 at 09:02:14PM -0500, John Snow wrote:
>>
>>
>> On 11/03/2016 08:17 AM, Kevin Wolf wrote:
>>> Am 02.11.2016 um 18:50 hat John Snow geschrieben:
>>>> Instead of automatically starting jobs at creation time via backup_start
>>>> et al, we'd like to return a job object pointer that can be started
>>>> manually at later point in time.
>>>>
>>>> For now, add the block_job_start mechanism and start the jobs
>>>> automatically as we have been doing, with conversions job-by-job coming
>>>> in later patches.
>>>>
>>>> Of note: cancellation of unstarted jobs will perform all the normal
>>>> cleanup as if the job had started, particularly abort and clean. The
>>>> only difference is that we will not emit any events, because the job
>>>> never actually started.
>>>>
>>>> Signed-off-by: John Snow <jsnow@redhat.com>
>>>
>>>> diff --git a/block/commit.c b/block/commit.c
>>>> index 20d27e2..5b7c454 100644
>>>> --- a/block/commit.c
>>>> +++ b/block/commit.c
>>>> @@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
>>>>     s->backing_file_str = g_strdup(backing_file_str);
>>>>
>>>>     s->on_error = on_error;
>>>> -    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
>>>>
>>>>     trace_commit_start(bs, base, top, s, s->common.co);
>>>
>>> s->common.co is now uninitialised and should probably be removed from
>>> the tracepoint arguments. The same is true for mirror and stream.
>>>
>>>> -    qemu_coroutine_enter(s->common.co);
>>>> +    block_job_start(&s->common);
>>>> }
>>>
>>>> diff --git a/blockjob.c b/blockjob.c
>>>> index e3c458c..16c5159 100644
>>>> --- a/blockjob.c
>>>> +++ b/blockjob.c
>>>> @@ -174,7 +174,8 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
>>>>     job->blk           = blk;
>>>>     job->cb            = cb;
>>>>     job->opaque        = opaque;
>>>> -    job->busy          = true;
>>>> +    job->busy          = false;
>>>> +    job->paused        = true;
>>>>     job->refcnt        = 1;
>>>>     bs->job = job;
>>>>
>>>> @@ -202,6 +203,21 @@ bool block_job_is_internal(BlockJob *job)
>>>>     return (job->id == NULL);
>>>> }
>>>>
>>>> +static bool block_job_started(BlockJob *job)
>>>> +{
>>>> +    return job->co;
>>>> +}
>>>> +
>>>> +void block_job_start(BlockJob *job)
>>>> +{
>>>> +    assert(job && !block_job_started(job) && job->paused &&
>>>> +           !job->busy && job->driver->start);
>>>> +    job->paused = false;
>>>> +    job->busy = true;
>>>> +    job->co = qemu_coroutine_create(job->driver->start, job);
>>>> +    qemu_coroutine_enter(job->co);
>>>> +}
>>>
>>> We allow the user to pause a job while it's not started yet. You
>>> classified this as "harmless". But if we accept this, can we really
>>> unconditionally enter the coroutine even if the job has been paused?
>>> Can't a user expect that a job remains in paused state when they
>>> explicitly requested a pause and the job was already internally paused,
>>> like in this case by block_job_create()?
>>>
>>
>> What will end up happening is that we'll enter the job, and then it'll pause
>> immediately upon entrance. Is that a problem?
>>
>> If the jobs themselves are not checking their pause state fastidiously, it
>> could be (but block/backup does -- after it creates a write notifier.)
>>
>> Do we want a stronger guarantee here?
>>
>> Naively I think it's OK as-is, but I could add a stronger boolean in that
>> lets us know if it's okay to start or not, and we could delay the actual
>> creation and start until the 'resume' comes in if you'd like.
>>
>> I'd like to avoid the complexity if we can help it, but perhaps I'm not
>> thinking carefully enough about the existing edge cases.
>>
>
> Is there any reason we can't just use job->pause_count here?  When the job
> is created, set job->paused = true, and job->pause_count = 1.  In the
> block_job_start(), check the pause_count prior to qemu_coroutine_enter():
>
>     void block_job_start(BlockJob *job)
>     {
>         assert(job && !block_job_started(job) && job->paused &&
>               !job->busy && job->driver->start);
>         job->co = qemu_coroutine_create(job->driver->start, job);
>         job->paused = --job->pause_count > 0;
>         if (!job->paused) {
>             job->busy = true;
>             qemu_coroutine_enter(job->co);
>         }
>     }
>

Solid point. Let's do it this way.
Thanks!

>
>>> The same probably also applies to the internal job pausing during
>>> bdrv_drain_all_begin/end, though as you know there is a larger problem
>>> with starting jobs under drain_all anyway. For now, we just need to keep
>>> in mind that we can neither create nor start a job in such sections.
>>>
>>
>> Yeah, there are deeper problems there. As long as the existing critical
>> sections don't allow us to create jobs (started or not) I think we're
>> probably already OK.
>>
>>> Kevin
>>>
Kevin Wolf Nov. 8, 2016, 9:16 a.m. UTC | #5
Am 08.11.2016 um 03:05 hat Jeff Cody geschrieben:
> On Mon, Nov 07, 2016 at 09:02:14PM -0500, John Snow wrote:
> > On 11/03/2016 08:17 AM, Kevin Wolf wrote:
> > >Am 02.11.2016 um 18:50 hat John Snow geschrieben:
> > >>+void block_job_start(BlockJob *job)
> > >>+{
> > >>+    assert(job && !block_job_started(job) && job->paused &&
> > >>+           !job->busy && job->driver->start);
> > >>+    job->paused = false;
> > >>+    job->busy = true;
> > >>+    job->co = qemu_coroutine_create(job->driver->start, job);
> > >>+    qemu_coroutine_enter(job->co);
> > >>+}
> > >
> > >We allow the user to pause a job while it's not started yet. You
> > >classified this as "harmless". But if we accept this, can we really
> > >unconditionally enter the coroutine even if the job has been paused?
> > >Can't a user expect that a job remains in paused state when they
> > >explicitly requested a pause and the job was already internally paused,
> > >like in this case by block_job_create()?
> > >
> > 
> > What will end up happening is that we'll enter the job, and then it'll pause
> > immediately upon entrance. Is that a problem?
> > 
> > If the jobs themselves are not checking their pause state fastidiously, it
> > could be (but block/backup does -- after it creates a write notifier.)
> > 
> > Do we want a stronger guarantee here?
> > 
> > Naively I think it's OK as-is, but I could add a stronger boolean in that
> > lets us know if it's okay to start or not, and we could delay the actual
> > creation and start until the 'resume' comes in if you'd like.
> > 
> > I'd like to avoid the complexity if we can help it, but perhaps I'm not
> > thinking carefully enough about the existing edge cases.
> > 
> 
> Is there any reason we can't just use job->pause_count here?  When the job
> is created, set job->paused = true, and job->pause_count = 1.  In the
> block_job_start(), check the pause_count prior to qemu_coroutine_enter():
> 
>     void block_job_start(BlockJob *job)
>     {
>         assert(job && !block_job_started(job) && job->paused &&
>               !job->busy && job->driver->start);
>         job->co = qemu_coroutine_create(job->driver->start, job);
>         job->paused = --job->pause_count > 0;
>         if (!job->paused) {
>             job->busy = true;
>             qemu_coroutine_enter(job->co);
>         }
>     }

Yes, something like this is what I had in mind.

> > >The same probably also applies to the internal job pausing during
> > >bdrv_drain_all_begin/end, though as you know there is a larger problem
> > >with starting jobs under drain_all anyway. For now, we just need to keep
> > >in mind that we can neither create nor start a job in such sections.
> > >
> > 
> > Yeah, there are deeper problems there. As long as the existing critical
> > sections don't allow us to create jobs (started or not) I think we're
> > probably already OK.

My point here was that we would like the get rid of that restriction
eventually, and if we add more and more things that depend on the
restriction, getting rid of it will only become harder.

But with the above code, I think this specific problem is solved.

Kevin
diff mbox

Patch

diff --git a/block/backup.c b/block/backup.c
index 4ed4494..ae1b99a 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -654,9 +654,8 @@  void backup_start(const char *job_id, BlockDriverState *bs,
 
     block_job_add_bdrv(&job->common, target);
     job->common.len = len;
-    job->common.co = qemu_coroutine_create(job->common.driver->start, job);
     block_job_txn_add_job(txn, &job->common);
-    qemu_coroutine_enter(job->common.co);
+    block_job_start(&job->common);
     return;
 
  error:
diff --git a/block/commit.c b/block/commit.c
index 20d27e2..5b7c454 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -289,10 +289,9 @@  void commit_start(const char *job_id, BlockDriverState *bs,
     s->backing_file_str = g_strdup(backing_file_str);
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
 
     trace_commit_start(bs, base, top, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
 }
 
 
diff --git a/block/mirror.c b/block/mirror.c
index 659e09c..c078d45 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1009,9 +1009,8 @@  static void mirror_start_job(const char *job_id, BlockDriverState *bs,
         }
     }
 
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
     trace_mirror_start(bs, s, s->common.co, opaque);
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
 }
 
 void mirror_start(const char *job_id, BlockDriverState *bs,
diff --git a/block/stream.c b/block/stream.c
index 92309ff..2de8d38 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -255,7 +255,6 @@  void stream_start(const char *job_id, BlockDriverState *bs,
     s->bs_flags = orig_bs_flags;
 
     s->on_error = on_error;
-    s->common.co = qemu_coroutine_create(s->common.driver->start, s);
     trace_stream_start(bs, base, s, s->common.co);
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
 }
diff --git a/blockjob.c b/blockjob.c
index e3c458c..16c5159 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -174,7 +174,8 @@  void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     job->blk           = blk;
     job->cb            = cb;
     job->opaque        = opaque;
-    job->busy          = true;
+    job->busy          = false;
+    job->paused        = true;
     job->refcnt        = 1;
     bs->job = job;
 
@@ -202,6 +203,21 @@  bool block_job_is_internal(BlockJob *job)
     return (job->id == NULL);
 }
 
+static bool block_job_started(BlockJob *job)
+{
+    return job->co;
+}
+
+void block_job_start(BlockJob *job)
+{
+    assert(job && !block_job_started(job) && job->paused &&
+           !job->busy && job->driver->start);
+    job->paused = false;
+    job->busy = true;
+    job->co = qemu_coroutine_create(job->driver->start, job);
+    qemu_coroutine_enter(job->co);
+}
+
 void block_job_ref(BlockJob *job)
 {
     ++job->refcnt;
@@ -248,14 +264,18 @@  static void block_job_completed_single(BlockJob *job)
     if (job->cb) {
         job->cb(job->opaque, job->ret);
     }
-    if (block_job_is_cancelled(job)) {
-        block_job_event_cancelled(job);
-    } else {
-        const char *msg = NULL;
-        if (job->ret < 0) {
-            msg = strerror(-job->ret);
+
+    /* Emit events only if we actually started */
+    if (block_job_started(job)) {
+        if (block_job_is_cancelled(job)) {
+            block_job_event_cancelled(job);
+        } else {
+            const char *msg = NULL;
+            if (job->ret < 0) {
+                msg = strerror(-job->ret);
+            }
+            block_job_event_completed(job, msg);
         }
-        block_job_event_completed(job, msg);
     }
 
     if (job->txn) {
@@ -363,7 +383,8 @@  void block_job_complete(BlockJob *job, Error **errp)
 {
     /* Should not be reachable via external interface for internal jobs */
     assert(job->id);
-    if (job->pause_count || job->cancelled || !job->driver->complete) {
+    if (job->pause_count || job->cancelled ||
+        !block_job_started(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
@@ -395,6 +416,8 @@  bool block_job_user_paused(BlockJob *job)
 
 void coroutine_fn block_job_pause_point(BlockJob *job)
 {
+    assert(job && block_job_started(job));
+
     if (!block_job_should_pause(job)) {
         return;
     }
@@ -446,9 +469,13 @@  void block_job_enter(BlockJob *job)
 
 void block_job_cancel(BlockJob *job)
 {
-    job->cancelled = true;
-    block_job_iostatus_reset(job);
-    block_job_enter(job);
+    if (block_job_started(job)) {
+        job->cancelled = true;
+        block_job_iostatus_reset(job);
+        block_job_enter(job);
+    } else {
+        block_job_completed(job, -ECANCELED);
+    }
 }
 
 bool block_job_is_cancelled(BlockJob *job)
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 356cacf..1acb256 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -189,6 +189,15 @@  void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs);
 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
 
 /**
+ * block_job_start:
+ * @job: A job that has not yet been started.
+ *
+ * Begins execution of a block job.
+ * Takes ownership of one reference to the job object.
+ */
+void block_job_start(BlockJob *job);
+
+/**
  * block_job_cancel:
  * @job: The job to be canceled.
  *
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index f9afc3b..b132e39 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,10 +24,6 @@  typedef struct {
     int *result;
 } TestBlockJob;
 
-static const BlockJobDriver test_block_job_driver = {
-    .instance_size = sizeof(TestBlockJob),
-};
-
 static void test_block_job_complete(BlockJob *job, void *opaque)
 {
     BlockDriverState *bs = blk_bs(job->blk);
@@ -77,6 +73,11 @@  static void test_block_job_cb(void *opaque, int ret)
     g_free(data);
 }
 
+static const BlockJobDriver test_block_job_driver = {
+    .instance_size = sizeof(TestBlockJob),
+    .start = test_block_job_run,
+};
+
 /* Create a block job that completes with a given return code after a given
  * number of event loop iterations.  The return code is stored in the given
  * result pointer.
@@ -104,10 +105,9 @@  static BlockJob *test_block_job_start(unsigned int iterations,
     s->use_timer = use_timer;
     s->rc = rc;
     s->result = result;
-    s->common.co = qemu_coroutine_create(test_block_job_run, s);
     data->job = s;
     data->result = result;
-    qemu_coroutine_enter(s->common.co);
+    block_job_start(&s->common);
     return &s->common;
 }