diff mbox series

[v9,14/21] jobs: protect job.aio_context with BQL and job_mutex

Message ID 20220706201533.289775-15-eesposit@redhat.com
State New
Headers show
Series job: replace AioContext lock with job_mutex | expand

Commit Message

Emanuele Giuseppe Esposito July 6, 2022, 8:15 p.m. UTC
In order to make it thread safe, implement a "fake rwlock",
where we allow reads under BQL *or* job_mutex held, but
writes only under BQL *and* job_mutex.

The only write we have is in child_job_set_aio_ctx, which always
happens under drain (so the job is paused).
For this reason, introduce job_set_aio_context and make sure that
the context is set under BQL, job_mutex and drain.
Also make sure all other places where the aiocontext is read
are protected.

Note: at this stage, job_{lock/unlock} and job lock guard macros
are *nop*.

Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 block/replication.c |  6 ++++--
 blockjob.c          |  3 ++-
 include/qemu/job.h  | 19 ++++++++++++++++++-
 job.c               | 12 ++++++++++++
 4 files changed, 36 insertions(+), 4 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy July 11, 2022, 2:19 p.m. UTC | #1
On 7/6/22 23:15, Emanuele Giuseppe Esposito wrote:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.
> 
> The only write we have is in child_job_set_aio_ctx

also in job_create of course, but it seems safe anyway

> , which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.
> Also make sure all other places where the aiocontext is read
> are protected.
> 
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
> 
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>   block/replication.c |  6 ++++--
>   blockjob.c          |  3 ++-
>   include/qemu/job.h  | 19 ++++++++++++++++++-
>   job.c               | 12 ++++++++++++
>   4 files changed, 36 insertions(+), 4 deletions(-)
> 
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..2189863df1 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>       }
>       if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>           commit_job = &s->commit_job->job;
> -        assert(commit_job->aio_context == qemu_get_current_aio_context());
> -        job_cancel_sync(commit_job, false);
> +        WITH_JOB_LOCK_GUARD() {
> +            assert(commit_job->aio_context == qemu_get_current_aio_context());
> +            job_cancel_sync_locked(commit_job, false);
> +        }
>       }
>   
>       if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index bce05a9096..0d120ed126 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -166,12 +166,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>           bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>       }
>   
> -    job->job.aio_context = ctx;
> +    job_set_aio_context(&job->job, ctx);
>   }
>   
>   static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>   {
>       BlockJob *job = c->opaque;
> +    assert(qemu_in_main_thread());
>   
>       return job->job.aio_context;
>   }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..c144aabefc 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -77,7 +77,12 @@ typedef struct Job {
>   
>       /** Protected by AioContext lock */
>   
> -    /** AioContext to run the job coroutine in */
> +    /**
> +     * AioContext to run the job coroutine in.
> +     * This field can be read when holding either the BQL (so we are in
> +     * the main loop) or the job_mutex.
> +     * It can be only written when we hold *both* BQL and job_mutex.
> +     */
>       AioContext *aio_context;
>   
>       /** Reference count of the block job */
> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
>   int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
>                              Error **errp);
>   
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(),
> + * takes the job_mutex lock to protect against the read in
> + * job_do_yield_locked(), and must be called when the coroutine
> + * is quiescent.
> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
>   #endif
> diff --git a/job.c b/job.c
> index 405f39566b..66cae82593 100644
> --- a/job.c
> +++ b/job.c
> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
>       return job_get_locked(id);
>   }
>   
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> +    /* protect against read in job_finish_sync_locked and job_start */
> +    assert(qemu_in_main_thread());
> +    /* protect against read in job_do_yield_locked */
> +    JOB_LOCK_GUARD();
> +    /* ensure the coroutine is quiescent while the AioContext is changed */
> +    assert(job->pause_count > 0);
> +    job->aio_context = ctx;
> +}
> +
>   /* Called with job_mutex *not* held. */
>   static void job_sleep_timer_cb(void *opaque)
>   {
> @@ -1380,6 +1391,7 @@ int job_finish_sync_locked(Job *job,
>   {
>       Error *local_err = NULL;
>       int ret;
> +    assert(qemu_in_main_thread());
>   
>       job_ref_locked(job);
>   


Missed update of block_job_add_bdrv(), block_job_get_aio_context(). Should they have an assertion too?

Also some occurences in test, but that doesn't seem significant.
Emanuele Giuseppe Esposito July 19, 2022, 12:48 p.m. UTC | #2
Am 11/07/2022 um 16:19 schrieb Vladimir Sementsov-Ogievskiy:
> On 7/6/22 23:15, Emanuele Giuseppe Esposito wrote:
>> In order to make it thread safe, implement a "fake rwlock",
>> where we allow reads under BQL *or* job_mutex held, but
>> writes only under BQL *and* job_mutex.
>>
>> The only write we have is in child_job_set_aio_ctx
> 
> also in job_create of course, but it seems safe anyway
> 
>> , which always
>> happens under drain (so the job is paused).
>> For this reason, introduce job_set_aio_context and make sure that
>> the context is set under BQL, job_mutex and drain.
>> Also make sure all other places where the aiocontext is read
>> are protected.
>>
>> Note: at this stage, job_{lock/unlock} and job lock guard macros
>> are *nop*.
>>
>> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
>> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
>> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
>> ---
>>   block/replication.c |  6 ++++--
>>   blockjob.c          |  3 ++-
>>   include/qemu/job.h  | 19 ++++++++++++++++++-
>>   job.c               | 12 ++++++++++++
>>   4 files changed, 36 insertions(+), 4 deletions(-)
>>
>> diff --git a/block/replication.c b/block/replication.c
>> index 55c8f894aa..2189863df1 100644
>> --- a/block/replication.c
>> +++ b/block/replication.c
>> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>>       }
>>       if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>>           commit_job = &s->commit_job->job;
>> -        assert(commit_job->aio_context ==
>> qemu_get_current_aio_context());
>> -        job_cancel_sync(commit_job, false);
>> +        WITH_JOB_LOCK_GUARD() {
>> +            assert(commit_job->aio_context ==
>> qemu_get_current_aio_context());
>> +            job_cancel_sync_locked(commit_job, false);
>> +        }
>>       }
>>         if (s->mode == REPLICATION_MODE_SECONDARY) {
>> diff --git a/blockjob.c b/blockjob.c
>> index bce05a9096..0d120ed126 100644
>> --- a/blockjob.c
>> +++ b/blockjob.c
>> @@ -166,12 +166,13 @@ static void child_job_set_aio_ctx(BdrvChild *c,
>> AioContext *ctx,
>>           bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>>       }
>>   -    job->job.aio_context = ctx;
>> +    job_set_aio_context(&job->job, ctx);
>>   }
>>     static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>>   {
>>       BlockJob *job = c->opaque;
>> +    assert(qemu_in_main_thread());
>>         return job->job.aio_context;
>>   }
>> diff --git a/include/qemu/job.h b/include/qemu/job.h
>> index 5709e8d4a8..c144aabefc 100644
>> --- a/include/qemu/job.h
>> +++ b/include/qemu/job.h
>> @@ -77,7 +77,12 @@ typedef struct Job {
>>         /** Protected by AioContext lock */
>>   -    /** AioContext to run the job coroutine in */
>> +    /**
>> +     * AioContext to run the job coroutine in.
>> +     * This field can be read when holding either the BQL (so we are in
>> +     * the main loop) or the job_mutex.
>> +     * It can be only written when we hold *both* BQL and job_mutex.
>> +     */
>>       AioContext *aio_context;
>>         /** Reference count of the block job */
>> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job
>> *, Error **errp),
>>   int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error
>> **errp),
>>                              Error **errp);
>>   +/**
>> + * Sets the @job->aio_context.
>> + * Called with job_mutex *not* held.
>> + *
>> + * This function must run in the main thread to protect against
>> + * concurrent read in job_finish_sync_locked(),
>> + * takes the job_mutex lock to protect against the read in
>> + * job_do_yield_locked(), and must be called when the coroutine
>> + * is quiescent.
>> + */
>> +void job_set_aio_context(Job *job, AioContext *ctx);
>> +
>>   #endif
>> diff --git a/job.c b/job.c
>> index 405f39566b..66cae82593 100644
>> --- a/job.c
>> +++ b/job.c
>> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
>>       return job_get_locked(id);
>>   }
>>   +void job_set_aio_context(Job *job, AioContext *ctx)
>> +{
>> +    /* protect against read in job_finish_sync_locked and job_start */
>> +    assert(qemu_in_main_thread());
>> +    /* protect against read in job_do_yield_locked */
>> +    JOB_LOCK_GUARD();
>> +    /* ensure the coroutine is quiescent while the AioContext is
>> changed */
>> +    assert(job->pause_count > 0);
>> +    job->aio_context = ctx;
>> +}
>> +
>>   /* Called with job_mutex *not* held. */
>>   static void job_sleep_timer_cb(void *opaque)
>>   {
>> @@ -1380,6 +1391,7 @@ int job_finish_sync_locked(Job *job,
>>   {
>>       Error *local_err = NULL;
>>       int ret;
>> +    assert(qemu_in_main_thread());
>>         job_ref_locked(job);
>>   
> 
> 
> Missed update of block_job_add_bdrv(), block_job_get_aio_context().
> Should they have an assertion too?
> 
> Also some occurences in test, but that doesn't seem significant.
> 

Both have GLOBAL_STATE_CODE.

Emanuele
diff mbox series

Patch

diff --git a/block/replication.c b/block/replication.c
index 55c8f894aa..2189863df1 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -148,8 +148,10 @@  static void replication_close(BlockDriverState *bs)
     }
     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
         commit_job = &s->commit_job->job;
-        assert(commit_job->aio_context == qemu_get_current_aio_context());
-        job_cancel_sync(commit_job, false);
+        WITH_JOB_LOCK_GUARD() {
+            assert(commit_job->aio_context == qemu_get_current_aio_context());
+            job_cancel_sync_locked(commit_job, false);
+        }
     }
 
     if (s->mode == REPLICATION_MODE_SECONDARY) {
diff --git a/blockjob.c b/blockjob.c
index bce05a9096..0d120ed126 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -166,12 +166,13 @@  static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
         bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
     }
 
-    job->job.aio_context = ctx;
+    job_set_aio_context(&job->job, ctx);
 }
 
 static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
+    assert(qemu_in_main_thread());
 
     return job->job.aio_context;
 }
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 5709e8d4a8..c144aabefc 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -77,7 +77,12 @@  typedef struct Job {
 
     /** Protected by AioContext lock */
 
-    /** AioContext to run the job coroutine in */
+    /**
+     * AioContext to run the job coroutine in.
+     * This field can be read when holding either the BQL (so we are in
+     * the main loop) or the job_mutex.
+     * It can be only written when we hold *both* BQL and job_mutex.
+     */
     AioContext *aio_context;
 
     /** Reference count of the block job */
@@ -741,4 +746,16 @@  int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
 int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
                            Error **errp);
 
+/**
+ * Sets the @job->aio_context.
+ * Called with job_mutex *not* held.
+ *
+ * This function must run in the main thread to protect against
+ * concurrent read in job_finish_sync_locked(),
+ * takes the job_mutex lock to protect against the read in
+ * job_do_yield_locked(), and must be called when the coroutine
+ * is quiescent.
+ */
+void job_set_aio_context(Job *job, AioContext *ctx);
+
 #endif
diff --git a/job.c b/job.c
index 405f39566b..66cae82593 100644
--- a/job.c
+++ b/job.c
@@ -394,6 +394,17 @@  Job *job_get(const char *id)
     return job_get_locked(id);
 }
 
+void job_set_aio_context(Job *job, AioContext *ctx)
+{
+    /* protect against read in job_finish_sync_locked and job_start */
+    assert(qemu_in_main_thread());
+    /* protect against read in job_do_yield_locked */
+    JOB_LOCK_GUARD();
+    /* ensure the coroutine is quiescent while the AioContext is changed */
+    assert(job->pause_count > 0);
+    job->aio_context = ctx;
+}
+
 /* Called with job_mutex *not* held. */
 static void job_sleep_timer_cb(void *opaque)
 {
@@ -1380,6 +1391,7 @@  int job_finish_sync_locked(Job *job,
 {
     Error *local_err = NULL;
     int ret;
+    assert(qemu_in_main_thread());
 
     job_ref_locked(job);