diff mbox series

[11/15] block-backend: make queued_requests thread-safe

Message ID 20221212125920.248567-12-pbonzini@redhat.com
State New
Headers show
Series More cleanups and fixes for drain | expand

Commit Message

Paolo Bonzini Dec. 12, 2022, 12:59 p.m. UTC
Protect quiesce_counter and queued_requests with a QemuMutex, so that
they are protected from concurrent access in the main thread (for example
blk_root_drained_end() reached from bdrv_drain_all()) and in the iothread
(where any I/O operation will call blk_inc_in_flight()).

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/block-backend.c | 44 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 36 insertions(+), 8 deletions(-)

Comments

Stefan Hajnoczi Jan. 11, 2023, 8:44 p.m. UTC | #1
On Mon, Dec 12, 2022 at 01:59:16PM +0100, Paolo Bonzini wrote:
> Protect quiesce_counter and queued_requests with a QemuMutex, so that
> they are protected from concurrent access in the main thread (for example
> blk_root_drained_end() reached from bdrv_drain_all()) and in the iothread
> (where any I/O operation will call blk_inc_in_flight()).
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/block-backend.c | 44 +++++++++++++++++++++++++++++++++++--------
>  1 file changed, 36 insertions(+), 8 deletions(-)
> 
> diff --git a/block/block-backend.c b/block/block-backend.c
> index 627d491d4155..fdf82f1f1599 100644
> --- a/block/block-backend.c
> +++ b/block/block-backend.c
> @@ -23,6 +23,7 @@
>  #include "qapi/error.h"
>  #include "qapi/qapi-events-block.h"
>  #include "qemu/id.h"
> +#include "qemu/thread.h"
>  #include "qemu/main-loop.h"
>  #include "qemu/option.h"
>  #include "trace.h"
> @@ -85,6 +86,8 @@ struct BlockBackend {
>      NotifierList remove_bs_notifiers, insert_bs_notifiers;
>      QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
>  
> +    /* Protected by quiesce_lock.  */
> +    QemuMutex quiesce_lock;
>      int quiesce_counter;
>      CoQueue queued_requests;
>  
> @@ -372,6 +375,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
>  
>      block_acct_init(&blk->stats);
>  
> +    qemu_mutex_init(&blk->quiesce_lock);
>      qemu_co_queue_init(&blk->queued_requests);
>      notifier_list_init(&blk->remove_bs_notifiers);
>      notifier_list_init(&blk->insert_bs_notifiers);
> @@ -490,6 +494,7 @@ static void blk_delete(BlockBackend *blk)
>      assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
>      assert(QLIST_EMPTY(&blk->aio_notifiers));
>      QTAILQ_REMOVE(&block_backends, blk, link);
> +    qemu_mutex_destroy(&blk->quiesce_lock);
>      drive_info_del(blk->legacy_dinfo);
>      block_acct_cleanup(&blk->stats);
>      g_free(blk);
> @@ -1451,11 +1456,25 @@ void blk_inc_in_flight(BlockBackend *blk)
>  {
>      IO_CODE();
>      qatomic_inc(&blk->in_flight);
> -    if (!blk->disable_request_queuing) {
> -        /* TODO: this is not thread-safe! */
> +
> +    /*
> +     * Avoid a continuous stream of requests from AIO callbacks, which
> +     * call a user-provided callback while blk->in_flight is elevated,
> +     * if the BlockBackend is being quiesced.
> +     *
> +     * This initial test does not have to be perfect: a race might
> +     * cause an extra I/O to be queued, but sooner or later a nonzero
> +     * quiesce_counter will be observed.
> +     */
> +    if (!blk->disable_request_queuing && qatomic_read(&blk->quiesce_counter)) {
> +        /*
> +         * ... on the other hand, it is important that the final check and
> +	 * the wait are done under the lock, so that no wakeups are missed.
> +         */
> +        QEMU_LOCK_GUARD(&blk->quiesce_lock);
>          while (blk->quiesce_counter) {
>              qatomic_dec(&blk->in_flight);
> -            qemu_co_queue_wait(&blk->queued_requests, NULL);
> +            qemu_co_queue_wait(&blk->queued_requests, &blk->quiesce_lock);
>              qatomic_inc(&blk->in_flight);
>          }
>      }
> @@ -2542,7 +2561,8 @@ static void blk_root_drained_begin(BdrvChild *child)
>      BlockBackend *blk = child->opaque;
>      ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
>  
> -    if (++blk->quiesce_counter == 1) {
> +    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter + 1);

Can drained begin race with drained end? If yes, then this atomic set
looks racy because drained end might be updating the counter at around
the same time. We should probably hold quiesce_lock?

> +    if (blk->quiesce_counter == 1) {
>          if (blk->dev_ops && blk->dev_ops->drained_begin) {
>              blk->dev_ops->drained_begin(blk->dev_opaque);
>          }
> @@ -2560,6 +2580,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
>  {
>      BlockBackend *blk = child->opaque;
>      bool busy = false;
> +
>      assert(blk->quiesce_counter);
>  
>      if (blk->dev_ops && blk->dev_ops->drained_poll) {
> @@ -2576,14 +2597,21 @@ static void blk_root_drained_end(BdrvChild *child)
>      assert(blk->public.throttle_group_member.io_limits_disabled);
>      qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled);
>  
> -    if (--blk->quiesce_counter == 0) {
> +    qemu_mutex_lock(&blk->quiesce_lock);
> +    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter - 1);
> +    if (blk->quiesce_counter == 0) {
> +        /* After this point, new I/O will not sleep on queued_requests.  */
> +        qemu_mutex_unlock(&blk->quiesce_lock);
> +
>          if (blk->dev_ops && blk->dev_ops->drained_end) {
>              blk->dev_ops->drained_end(blk->dev_opaque);
>          }
> -        while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
> -            /* Resume all queued requests */
> -        }
> +
> +        /* Now resume all queued requests */
> +        qemu_mutex_lock(&blk->quiesce_lock);
> +        qemu_co_enter_all(&blk->queued_requests, &blk->quiesce_lock);
>      }
> +    qemu_mutex_unlock(&blk->quiesce_lock);
>  }
>  
>  bool blk_register_buf(BlockBackend *blk, void *host, size_t size, Error **errp)
> -- 
> 2.38.1
Kevin Wolf Jan. 16, 2023, 4:55 p.m. UTC | #2
Am 12.12.2022 um 13:59 hat Paolo Bonzini geschrieben:
> Protect quiesce_counter and queued_requests with a QemuMutex, so that
> they are protected from concurrent access in the main thread (for example
> blk_root_drained_end() reached from bdrv_drain_all()) and in the iothread
> (where any I/O operation will call blk_inc_in_flight()).
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/block-backend.c | 44 +++++++++++++++++++++++++++++++++++--------
>  1 file changed, 36 insertions(+), 8 deletions(-)
> 
> diff --git a/block/block-backend.c b/block/block-backend.c
> index 627d491d4155..fdf82f1f1599 100644
> --- a/block/block-backend.c
> +++ b/block/block-backend.c
> @@ -23,6 +23,7 @@
>  #include "qapi/error.h"
>  #include "qapi/qapi-events-block.h"
>  #include "qemu/id.h"
> +#include "qemu/thread.h"
>  #include "qemu/main-loop.h"
>  #include "qemu/option.h"
>  #include "trace.h"
> @@ -85,6 +86,8 @@ struct BlockBackend {
>      NotifierList remove_bs_notifiers, insert_bs_notifiers;
>      QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
>  
> +    /* Protected by quiesce_lock.  */
> +    QemuMutex quiesce_lock;
>      int quiesce_counter;
>      CoQueue queued_requests;
>  
> @@ -372,6 +375,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
>  
>      block_acct_init(&blk->stats);
>  
> +    qemu_mutex_init(&blk->quiesce_lock);
>      qemu_co_queue_init(&blk->queued_requests);
>      notifier_list_init(&blk->remove_bs_notifiers);
>      notifier_list_init(&blk->insert_bs_notifiers);
> @@ -490,6 +494,7 @@ static void blk_delete(BlockBackend *blk)
>      assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
>      assert(QLIST_EMPTY(&blk->aio_notifiers));
>      QTAILQ_REMOVE(&block_backends, blk, link);
> +    qemu_mutex_destroy(&blk->quiesce_lock);
>      drive_info_del(blk->legacy_dinfo);
>      block_acct_cleanup(&blk->stats);
>      g_free(blk);
> @@ -1451,11 +1456,25 @@ void blk_inc_in_flight(BlockBackend *blk)
>  {
>      IO_CODE();
>      qatomic_inc(&blk->in_flight);
> -    if (!blk->disable_request_queuing) {
> -        /* TODO: this is not thread-safe! */
> +
> +    /*
> +     * Avoid a continuous stream of requests from AIO callbacks, which
> +     * call a user-provided callback while blk->in_flight is elevated,
> +     * if the BlockBackend is being quiesced.
> +     *
> +     * This initial test does not have to be perfect: a race might
> +     * cause an extra I/O to be queued, but sooner or later a nonzero
> +     * quiesce_counter will be observed.

This is true in the initial drain phase while we're still polling. But
generally this is not only about avoiding a continuous stream of
requests, but about making sure that absolutely no new requests come in
while a node is drained.

> +     */
> +    if (!blk->disable_request_queuing && qatomic_read(&blk->quiesce_counter)) {

So if no other requests were pending and we didn't even call aio_poll()
because the AIO_WAIT_WHILE() condition was false from the start, could
it be that bdrv_drained_begin() has already returned on the thread that
drains, but another thread still sees the old value here?

Starting a new request after bdrv_drained_begin() has returned would be
a bug.

Kevin
diff mbox series

Patch

diff --git a/block/block-backend.c b/block/block-backend.c
index 627d491d4155..fdf82f1f1599 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -23,6 +23,7 @@ 
 #include "qapi/error.h"
 #include "qapi/qapi-events-block.h"
 #include "qemu/id.h"
+#include "qemu/thread.h"
 #include "qemu/main-loop.h"
 #include "qemu/option.h"
 #include "trace.h"
@@ -85,6 +86,8 @@  struct BlockBackend {
     NotifierList remove_bs_notifiers, insert_bs_notifiers;
     QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
 
+    /* Protected by quiesce_lock.  */
+    QemuMutex quiesce_lock;
     int quiesce_counter;
     CoQueue queued_requests;
 
@@ -372,6 +375,7 @@  BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)
 
     block_acct_init(&blk->stats);
 
+    qemu_mutex_init(&blk->quiesce_lock);
     qemu_co_queue_init(&blk->queued_requests);
     notifier_list_init(&blk->remove_bs_notifiers);
     notifier_list_init(&blk->insert_bs_notifiers);
@@ -490,6 +494,7 @@  static void blk_delete(BlockBackend *blk)
     assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
     assert(QLIST_EMPTY(&blk->aio_notifiers));
     QTAILQ_REMOVE(&block_backends, blk, link);
+    qemu_mutex_destroy(&blk->quiesce_lock);
     drive_info_del(blk->legacy_dinfo);
     block_acct_cleanup(&blk->stats);
     g_free(blk);
@@ -1451,11 +1456,25 @@  void blk_inc_in_flight(BlockBackend *blk)
 {
     IO_CODE();
     qatomic_inc(&blk->in_flight);
-    if (!blk->disable_request_queuing) {
-        /* TODO: this is not thread-safe! */
+
+    /*
+     * Avoid a continuous stream of requests from AIO callbacks, which
+     * call a user-provided callback while blk->in_flight is elevated,
+     * if the BlockBackend is being quiesced.
+     *
+     * This initial test does not have to be perfect: a race might
+     * cause an extra I/O to be queued, but sooner or later a nonzero
+     * quiesce_counter will be observed.
+     */
+    if (!blk->disable_request_queuing && qatomic_read(&blk->quiesce_counter)) {
+        /*
+         * ... on the other hand, it is important that the final check and
+	 * the wait are done under the lock, so that no wakeups are missed.
+         */
+        QEMU_LOCK_GUARD(&blk->quiesce_lock);
         while (blk->quiesce_counter) {
             qatomic_dec(&blk->in_flight);
-            qemu_co_queue_wait(&blk->queued_requests, NULL);
+            qemu_co_queue_wait(&blk->queued_requests, &blk->quiesce_lock);
             qatomic_inc(&blk->in_flight);
         }
     }
@@ -2542,7 +2561,8 @@  static void blk_root_drained_begin(BdrvChild *child)
     BlockBackend *blk = child->opaque;
     ThrottleGroupMember *tgm = &blk->public.throttle_group_member;
 
-    if (++blk->quiesce_counter == 1) {
+    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter + 1);
+    if (blk->quiesce_counter == 1) {
         if (blk->dev_ops && blk->dev_ops->drained_begin) {
             blk->dev_ops->drained_begin(blk->dev_opaque);
         }
@@ -2560,6 +2580,7 @@  static bool blk_root_drained_poll(BdrvChild *child)
 {
     BlockBackend *blk = child->opaque;
     bool busy = false;
+
     assert(blk->quiesce_counter);
 
     if (blk->dev_ops && blk->dev_ops->drained_poll) {
@@ -2576,14 +2597,21 @@  static void blk_root_drained_end(BdrvChild *child)
     assert(blk->public.throttle_group_member.io_limits_disabled);
     qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled);
 
-    if (--blk->quiesce_counter == 0) {
+    qemu_mutex_lock(&blk->quiesce_lock);
+    qatomic_set(&blk->quiesce_counter, blk->quiesce_counter - 1);
+    if (blk->quiesce_counter == 0) {
+        /* After this point, new I/O will not sleep on queued_requests.  */
+        qemu_mutex_unlock(&blk->quiesce_lock);
+
         if (blk->dev_ops && blk->dev_ops->drained_end) {
             blk->dev_ops->drained_end(blk->dev_opaque);
         }
-        while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
-            /* Resume all queued requests */
-        }
+
+        /* Now resume all queued requests */
+        qemu_mutex_lock(&blk->quiesce_lock);
+        qemu_co_enter_all(&blk->queued_requests, &blk->quiesce_lock);
     }
+    qemu_mutex_unlock(&blk->quiesce_lock);
 }
 
 bool blk_register_buf(BlockBackend *blk, void *host, size_t size, Error **errp)