diff mbox series

[v2,6/7] block-copy: atomic .cancelled and .finished fields in BlockCopyCallState

Message ID 20210518100757.31243-7-eesposit@redhat.com
State New
Headers show
Series block-copy: protect block-copy internal structures | expand

Commit Message

Emanuele Giuseppe Esposito May 18, 2021, 10:07 a.m. UTC
By adding acquire/release pairs, we ensure that .ret and .error_is_read
fields are written by block_copy_dirty_clusters before .finished is true.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block/block-copy.c | 33 ++++++++++++++++++---------------
 1 file changed, 18 insertions(+), 15 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy May 20, 2021, 3:34 p.m. UTC | #1
18.05.2021 13:07, Emanuele Giuseppe Esposito wrote:
> By adding acquire/release pairs, we ensure that .ret and .error_is_read
> fields are written by block_copy_dirty_clusters before .finished is true.

As I already said, please, can we live with one mutex for now? finished, ret, error_is_read, all these variables are changing rarely. I doubt that performance is improved by these atomic operations. But complexity of the architecture increases exponentially.

> 
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> ---
>   block/block-copy.c | 33 ++++++++++++++++++---------------
>   1 file changed, 18 insertions(+), 15 deletions(-)
> 
> diff --git a/block/block-copy.c b/block/block-copy.c
> index d5ed5932b0..573e96fefb 100644
> --- a/block/block-copy.c
> +++ b/block/block-copy.c
> @@ -55,11 +55,11 @@ typedef struct BlockCopyCallState {
>       QLIST_ENTRY(BlockCopyCallState) list;
>   
>       /* State */
> -    bool finished;
> +    bool finished; /* atomic */
>       QemuCoSleep sleep; /* TODO: protect API with a lock */
>   
>       /* OUT parameters */
> -    bool cancelled;
> +    bool cancelled; /* atomic */
>       /* Fields protected by calls_lock in BlockCopyState */
>       bool error_is_read;
>       int ret;
> @@ -646,7 +646,8 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state)
>       assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
>       assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
>   
> -    while (bytes && aio_task_pool_status(aio) == 0 && !call_state->cancelled) {
> +    while (bytes && aio_task_pool_status(aio) == 0 &&
> +           !qatomic_read(&call_state->cancelled)) {
>           BlockCopyTask *task;
>           int64_t status_bytes;
>   
> @@ -754,7 +755,7 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
>       do {
>           ret = block_copy_dirty_clusters(call_state);
>   
> -        if (ret == 0 && !call_state->cancelled) {
> +        if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
>               ret = block_copy_wait_one(call_state->s, call_state->offset,
>                                         call_state->bytes);
>           }
> @@ -768,9 +769,9 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
>            * 2. We have waited for some intersecting block-copy request
>            *    It may have failed and produced new dirty bits.
>            */
> -    } while (ret > 0 && !call_state->cancelled);
> +    } while (ret > 0 && !qatomic_read(&call_state->cancelled));
>   
> -    call_state->finished = true;
> +    qatomic_store_release(&call_state->finished, true);
>   
>       if (call_state->cb) {
>           call_state->cb(call_state->cb_opaque);
> @@ -833,35 +834,37 @@ void block_copy_call_free(BlockCopyCallState *call_state)
>           return;
>       }
>   
> -    assert(call_state->finished);
> +    assert(qatomic_load_acquire(&call_state->finished));
>       g_free(call_state);
>   }
>   
>   bool block_copy_call_finished(BlockCopyCallState *call_state)
>   {
> -    return call_state->finished;
> +    return qatomic_load_acquire(&call_state->finished);
>   }
>   
>   bool block_copy_call_succeeded(BlockCopyCallState *call_state)
>   {
> -    return call_state->finished && !call_state->cancelled &&
> -        call_state->ret == 0;
> +    return qatomic_load_acquire(&call_state->finished) &&
> +           !qatomic_read(&call_state->cancelled) &&
> +           call_state->ret == 0;
>   }
>   
>   bool block_copy_call_failed(BlockCopyCallState *call_state)
>   {
> -    return call_state->finished && !call_state->cancelled &&
> -        call_state->ret < 0;
> +    return qatomic_load_acquire(&call_state->finished) &&
> +           !qatomic_read(&call_state->cancelled) &&
> +           call_state->ret < 0;
>   }
>   
>   bool block_copy_call_cancelled(BlockCopyCallState *call_state)
>   {
> -    return call_state->cancelled;
> +    return qatomic_read(&call_state->cancelled);
>   }
>   
>   int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
>   {
> -    assert(call_state->finished);
> +    assert(qatomic_load_acquire(&call_state->finished));
>       if (error_is_read) {
>           *error_is_read = call_state->error_is_read;
>       }
> @@ -870,7 +873,7 @@ int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
>   
>   void block_copy_call_cancel(BlockCopyCallState *call_state)
>   {
> -    call_state->cancelled = true;
> +    qatomic_set(&call_state->cancelled, true);
>       block_copy_kick(call_state);
>   }
>   
>
Paolo Bonzini May 21, 2021, 3:02 p.m. UTC | #2
On 20/05/21 17:34, Vladimir Sementsov-Ogievskiy wrote:
> 
>> By adding acquire/release pairs, we ensure that .ret and .error_is_read
>> fields are written by block_copy_dirty_clusters before .finished is true.
> 
> As I already said, please, can we live with one mutex for now? finished, 
> ret, error_is_read, all these variables are changing rarely. I doubt 
> that performance is improved by these atomic operations. But complexity 
> of the architecture increases exponentially.

The problem is that these are used outside coroutines. 
load-acquire/store-release is the simplest way to handle a "finished" 
flag really.

Paolo
Vladimir Sementsov-Ogievskiy May 21, 2021, 3:53 p.m. UTC | #3
21.05.2021 18:02, Paolo Bonzini wrote:
> On 20/05/21 17:34, Vladimir Sementsov-Ogievskiy wrote:
>>
>>> By adding acquire/release pairs, we ensure that .ret and .error_is_read
>>> fields are written by block_copy_dirty_clusters before .finished is true.
>>
>> As I already said, please, can we live with one mutex for now? finished, ret, error_is_read, all these variables are changing rarely. I doubt that performance is improved by these atomic operations. But complexity of the architecture increases exponentially.
> 
> The problem is that these are used outside coroutines.

Hmm. I think, if some bit of block-copy is not in a coroutine yet, it's simple to move it to coroutine

> load-acquire/store-release is the simplest way to handle a "finished" flag really.
> 
> Paolo
>
Vladimir Sementsov-Ogievskiy May 21, 2021, 3:56 p.m. UTC | #4
21.05.2021 18:02, Paolo Bonzini wrote:
> On 20/05/21 17:34, Vladimir Sementsov-Ogievskiy wrote:
>>
>>> By adding acquire/release pairs, we ensure that .ret and .error_is_read
>>> fields are written by block_copy_dirty_clusters before .finished is true.
>>
>> As I already said, please, can we live with one mutex for now? finished, ret, error_is_read, all these variables are changing rarely. I doubt that performance is improved by these atomic operations. But complexity of the architecture increases exponentially.
> 
> The problem is that these are used outside coroutines. load-acquire/store-release is the simplest way to handle a "finished" flag really.
> 

Related, maybe we can support CoMutex outside of coroutine context?

Create a coroutine, which will lock the mutex and unlock it for us... Or something like this.. It will help the task of making everything thread-safe a lot
Paolo Bonzini May 21, 2021, 4:47 p.m. UTC | #5
On 21/05/21 17:56, Vladimir Sementsov-Ogievskiy wrote:
> 21.05.2021 18:02, Paolo Bonzini wrote:
>> On 20/05/21 17:34, Vladimir Sementsov-Ogievskiy wrote:
>>>
>>>> By adding acquire/release pairs, we ensure that .ret and .error_is_read
>>>> fields are written by block_copy_dirty_clusters before .finished is 
>>>> true.
>>>
>>> As I already said, please, can we live with one mutex for now? 
>>> finished, ret, error_is_read, all these variables are changing 
>>> rarely. I doubt that performance is improved by these atomic 
>>> operations. But complexity of the architecture increases exponentially.
>>
>> The problem is that these are used outside coroutines. 
>> load-acquire/store-release is the simplest way to handle a "finished" 
>> flag really.
>>
> 
> Related, maybe we can support CoMutex outside of coroutine context?
> 
> Create a coroutine, which will lock the mutex and unlock it for us... Or 
> something like this.. It will help the task of making everything 
> thread-safe a lot

No, it's not possible because the coroutine will yield to the caller if 
the mutex is contended, but the caller will not be able to use the data 
that is protected by the mutex.

There is no reason to have stuff like block_copy_call_status be a 
coroutine_fn.  Even if it's only called from a coroutine today I'd 
rather have the code more future proof.

Paolo
diff mbox series

Patch

diff --git a/block/block-copy.c b/block/block-copy.c
index d5ed5932b0..573e96fefb 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -55,11 +55,11 @@  typedef struct BlockCopyCallState {
     QLIST_ENTRY(BlockCopyCallState) list;
 
     /* State */
-    bool finished;
+    bool finished; /* atomic */
     QemuCoSleep sleep; /* TODO: protect API with a lock */
 
     /* OUT parameters */
-    bool cancelled;
+    bool cancelled; /* atomic */
     /* Fields protected by calls_lock in BlockCopyState */
     bool error_is_read;
     int ret;
@@ -646,7 +646,8 @@  block_copy_dirty_clusters(BlockCopyCallState *call_state)
     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
 
-    while (bytes && aio_task_pool_status(aio) == 0 && !call_state->cancelled) {
+    while (bytes && aio_task_pool_status(aio) == 0 &&
+           !qatomic_read(&call_state->cancelled)) {
         BlockCopyTask *task;
         int64_t status_bytes;
 
@@ -754,7 +755,7 @@  static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
     do {
         ret = block_copy_dirty_clusters(call_state);
 
-        if (ret == 0 && !call_state->cancelled) {
+        if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
             ret = block_copy_wait_one(call_state->s, call_state->offset,
                                       call_state->bytes);
         }
@@ -768,9 +769,9 @@  static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
          * 2. We have waited for some intersecting block-copy request
          *    It may have failed and produced new dirty bits.
          */
-    } while (ret > 0 && !call_state->cancelled);
+    } while (ret > 0 && !qatomic_read(&call_state->cancelled));
 
-    call_state->finished = true;
+    qatomic_store_release(&call_state->finished, true);
 
     if (call_state->cb) {
         call_state->cb(call_state->cb_opaque);
@@ -833,35 +834,37 @@  void block_copy_call_free(BlockCopyCallState *call_state)
         return;
     }
 
-    assert(call_state->finished);
+    assert(qatomic_load_acquire(&call_state->finished));
     g_free(call_state);
 }
 
 bool block_copy_call_finished(BlockCopyCallState *call_state)
 {
-    return call_state->finished;
+    return qatomic_load_acquire(&call_state->finished);
 }
 
 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
 {
-    return call_state->finished && !call_state->cancelled &&
-        call_state->ret == 0;
+    return qatomic_load_acquire(&call_state->finished) &&
+           !qatomic_read(&call_state->cancelled) &&
+           call_state->ret == 0;
 }
 
 bool block_copy_call_failed(BlockCopyCallState *call_state)
 {
-    return call_state->finished && !call_state->cancelled &&
-        call_state->ret < 0;
+    return qatomic_load_acquire(&call_state->finished) &&
+           !qatomic_read(&call_state->cancelled) &&
+           call_state->ret < 0;
 }
 
 bool block_copy_call_cancelled(BlockCopyCallState *call_state)
 {
-    return call_state->cancelled;
+    return qatomic_read(&call_state->cancelled);
 }
 
 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
 {
-    assert(call_state->finished);
+    assert(qatomic_load_acquire(&call_state->finished));
     if (error_is_read) {
         *error_is_read = call_state->error_is_read;
     }
@@ -870,7 +873,7 @@  int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
 
 void block_copy_call_cancel(BlockCopyCallState *call_state)
 {
-    call_state->cancelled = true;
+    qatomic_set(&call_state->cancelled, true);
     block_copy_kick(call_state);
 }