diff mbox series

[v2,for-7.1,8/9] nbd: take receive_mutex when reading requests[].receiving

Message ID 20220414175756.671165-9-pbonzini@redhat.com
State New
Headers show
Series nbd: actually make s->state thread-safe | expand

Commit Message

Paolo Bonzini April 14, 2022, 5:57 p.m. UTC
requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
Read it under the same mutex as well.  Waking up receivers on errors happens
after each reply finishes processing, in nbd_co_receive_one_chunk().
If there is no currently-active reply, there are two cases:

* either there is no active request at all, in which case no
element of request[] can have .receiving = true

* or nbd_receive_replies() must be running and owns receive_mutex;
in that case it will get back to nbd_co_receive_one_chunk() because
the socket has been shutdown, and all waiting coroutines will wake up
in turn.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

Comments

Eric Blake April 14, 2022, 7:42 p.m. UTC | #1
On Thu, Apr 14, 2022 at 07:57:55PM +0200, Paolo Bonzini wrote:
> requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
> Read it under the same mutex as well.  Waking up receivers on errors happens
> after each reply finishes processing, in nbd_co_receive_one_chunk().
> If there is no currently-active reply, there are two cases:
> 
> * either there is no active request at all, in which case no
> element of request[] can have .receiving = true
> 
> * or nbd_receive_replies() must be running and owns receive_mutex;
> in that case it will get back to nbd_co_receive_one_chunk() because
> the socket has been shutdown, and all waiting coroutines will wake up
> in turn.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 15 +++++++--------
>  1 file changed, 7 insertions(+), 8 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>
Vladimir Sementsov-Ogievskiy April 16, 2022, 1:54 p.m. UTC | #2
14.04.2022 20:57, Paolo Bonzini wrote:
> requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
> Read it under the same mutex as well.  Waking up receivers on errors happens
> after each reply finishes processing, in nbd_co_receive_one_chunk().
> If there is no currently-active reply, there are two cases:
> 
> * either there is no active request at all, in which case no
> element of request[] can have .receiving = true
> 
> * or nbd_receive_replies() must be running and owns receive_mutex;
> in that case it will get back to nbd_co_receive_one_chunk() because
> the socket has been shutdown, and all waiting coroutines will wake up
> in turn.
> 

Seems that removing "nbd_recv_coroutines_wake(s, true);" from nbd_channel_error_locked() could be a separate preparing patch. It's a separate thing:
no sense to wake all receving coroutines on error, if they will wake each-other in a chain anyway..

> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>


Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>

> ---
>   block/nbd.c | 15 +++++++--------
>   1 file changed, 7 insertions(+), 8 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index b5aac2548c..31c684772e 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -131,6 +131,7 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
>       s->x_dirty_bitmap = NULL;
>   }
>   
> +/* Called with s->receive_mutex taken.  */
>   static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
>   {
>       if (req->receiving) {
> @@ -142,12 +143,13 @@ static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
>       return false;
>   }
>   
> -static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
> +static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
>   {
>       int i;
>   
> +    QEMU_LOCK_GUARD(&s->receive_mutex);
>       for (i = 0; i < MAX_NBD_REQUESTS; i++) {
> -        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
> +        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
>               return;
>           }
>       }
> @@ -168,8 +170,6 @@ static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
>       } else {
>           s->state = NBD_CLIENT_QUIT;
>       }
> -
> -    nbd_recv_coroutines_wake(s, true);
>   }
>   
>   static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
> @@ -432,11 +432,10 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
>   
>               qemu_coroutine_yield();
>               /*
> -             * We may be woken for 3 reasons:
> +             * We may be woken for 2 reasons:
>                * 1. From this function, executing in parallel coroutine, when our
>                *    handle is received.
> -             * 2. From nbd_channel_error(), when connection is lost.
> -             * 3. From nbd_co_receive_one_chunk(), when previous request is
> +             * 2. From nbd_co_receive_one_chunk(), when previous request is
>                *    finished and s->reply.handle set to 0.
>                * Anyway, it's OK to lock the mutex and go to the next iteration.
>                */
> @@ -928,7 +927,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
>       }
>       s->reply.handle = 0;
>   
> -    nbd_recv_coroutines_wake(s, false);
> +    nbd_recv_coroutines_wake(s);
>   
>       return ret;
>   }
diff mbox series

Patch

diff --git a/block/nbd.c b/block/nbd.c
index b5aac2548c..31c684772e 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -131,6 +131,7 @@  static void nbd_clear_bdrvstate(BlockDriverState *bs)
     s->x_dirty_bitmap = NULL;
 }
 
+/* Called with s->receive_mutex taken.  */
 static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 {
     if (req->receiving) {
@@ -142,12 +143,13 @@  static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
     return false;
 }
 
-static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
+static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 {
     int i;
 
+    QEMU_LOCK_GUARD(&s->receive_mutex);
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
+        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
             return;
         }
     }
@@ -168,8 +170,6 @@  static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
     } else {
         s->state = NBD_CLIENT_QUIT;
     }
-
-    nbd_recv_coroutines_wake(s, true);
 }
 
 static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
@@ -432,11 +432,10 @@  static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 
             qemu_coroutine_yield();
             /*
-             * We may be woken for 3 reasons:
+             * We may be woken for 2 reasons:
              * 1. From this function, executing in parallel coroutine, when our
              *    handle is received.
-             * 2. From nbd_channel_error(), when connection is lost.
-             * 3. From nbd_co_receive_one_chunk(), when previous request is
+             * 2. From nbd_co_receive_one_chunk(), when previous request is
              *    finished and s->reply.handle set to 0.
              * Anyway, it's OK to lock the mutex and go to the next iteration.
              */
@@ -928,7 +927,7 @@  static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    nbd_recv_coroutines_wake(s, false);
+    nbd_recv_coroutines_wake(s);
 
     return ret;
 }