diff mbox series

[5/5] async: use explicit memory barriers

Message ID 20200407140746.8041-6-pbonzini@redhat.com
State New
Headers show
Series async: fix hangs on weakly-ordered architectures | expand

Commit Message

Paolo Bonzini April 7, 2020, 2:07 p.m. UTC
When using C11 atomics, non-seqcst reads and writes do not participate
in the total order of seqcst operations.  In util/async.c and util/aio-posix.c,
in particular, the pattern that we use

          write ctx->notify_me                 write bh->scheduled
          read bh->scheduled                   read ctx->notify_me
          if !bh->scheduled, sleep             if ctx->notify_me, notify

needs to use seqcst operations for both the write and the read.  In
general this is something that we do not want, because there can be
many sources that are polled in addition to bottom halves.  The
alternative is to place a seqcst memory barrier between the write
and the read.  This also comes with a disadvantage, in that the
memory barrier is implicit on strongly-ordered architectures and
it wastes a few dozen clock cycles.

Fortunately, ctx->notify_me is never written concurrently by two
threads, so we can assert that and relax the writes to ctx->notify_me.
The resulting solution works and performs well on both aarch64 and x86.

Note that the atomic_set/atomic_read combination is not an atomic
read-modify-write, and therefore it is even weaker than C11 ATOMIC_RELAXED;
on x86, ATOMIC_RELAXED compiles to a locked operation.

Analyzed-by: Ying Fang <fangying1@huawei.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/aio-posix.c | 16 ++++++++++++++--
 util/aio-win32.c | 17 ++++++++++++++---
 util/async.c     | 16 ++++++++++++----
 3 files changed, 40 insertions(+), 9 deletions(-)

Comments

Ying Fang April 9, 2020, 6:22 a.m. UTC | #1
On 2020/4/7 22:07, Paolo Bonzini wrote:
> When using C11 atomics, non-seqcst reads and writes do not participate
> in the total order of seqcst operations.  In util/async.c and util/aio-posix.c,
> in particular, the pattern that we use
> 
>            write ctx->notify_me                 write bh->scheduled
>            read bh->scheduled                   read ctx->notify_me
>            if !bh->scheduled, sleep             if ctx->notify_me, notify
> 
> needs to use seqcst operations for both the write and the read.  In
> general this is something that we do not want, because there can be
> many sources that are polled in addition to bottom halves.  The
> alternative is to place a seqcst memory barrier between the write
> and the read.  This also comes with a disadvantage, in that the
> memory barrier is implicit on strongly-ordered architectures and
> it wastes a few dozen clock cycles.
> 
> Fortunately, ctx->notify_me is never written concurrently by two
> threads, so we can assert that and relax the writes to ctx->notify_me.
> The resulting solution works and performs well on both aarch64 and x86.
> 
> Note that the atomic_set/atomic_read combination is not an atomic
> read-modify-write, and therefore it is even weaker than C11 ATOMIC_RELAXED;
> on x86, ATOMIC_RELAXED compiles to a locked operation.
> 
> Analyzed-by: Ying Fang <fangying1@huawei.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>   util/aio-posix.c | 16 ++++++++++++++--
>   util/aio-win32.c | 17 ++++++++++++++---
>   util/async.c     | 16 ++++++++++++----
>   3 files changed, 40 insertions(+), 9 deletions(-)
> 
> diff --git a/util/aio-posix.c b/util/aio-posix.c
> index cd6cf0a4a9..c3613d299e 100644
> --- a/util/aio-posix.c
> +++ b/util/aio-posix.c
> @@ -559,6 +559,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
>       int64_t timeout;
>       int64_t start = 0;
>   
> +    /*
> +     * There cannot be two concurrent aio_poll calls for the same AioContext (or
> +     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
> +     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
> +     */
>       assert(in_aio_context_home_thread(ctx));
>   
>       /* aio_notify can avoid the expensive event_notifier_set if
> @@ -569,7 +574,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
>        * so disable the optimization now.
>        */
>       if (blocking) {
> -        atomic_add(&ctx->notify_me, 2);
> +        atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
> +        /*
> +         * Write ctx->notify_me before computing the timeout
> +         * (reading bottom half flags, etc.).  Pairs with
> +         * smp_mb in aio_notify().
> +         */
> +        smp_mb();
>       }
>   
>       qemu_lockcnt_inc(&ctx->list_lock);
> @@ -590,7 +601,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
>       }
>   
>       if (blocking) {
> -        atomic_sub(&ctx->notify_me, 2);
> +        /* Finish the poll before clearing the flag.  */
> +        atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
>           aio_notify_accept(ctx);
>       }
>   
> diff --git a/util/aio-win32.c b/util/aio-win32.c
> index a23b9c364d..729d533faf 100644
> --- a/util/aio-win32.c
> +++ b/util/aio-win32.c
> @@ -321,6 +321,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
>       int count;
>       int timeout;
>   
> +    /*
> +     * There cannot be two concurrent aio_poll calls for the same AioContext (or
> +     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
> +     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
> +     */
> +    assert(in_aio_context_home_thread(ctx));
>       progress = false;
>   
>       /* aio_notify can avoid the expensive event_notifier_set if
> @@ -331,7 +337,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
>        * so disable the optimization now.
>        */
>       if (blocking) {
> -        atomic_add(&ctx->notify_me, 2);
> +        atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
> +        /*
> +         * Write ctx->notify_me before computing the timeout
> +         * (reading bottom half flags, etc.).  Pairs with
> +         * smp_mb in aio_notify().
> +         */
> +        smp_mb();
>       }
>   
>       qemu_lockcnt_inc(&ctx->list_lock);
> @@ -364,8 +376,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
>           ret = WaitForMultipleObjects(count, events, FALSE, timeout);
>           if (blocking) {
>               assert(first);
> -            assert(in_aio_context_home_thread(ctx));
> -            atomic_sub(&ctx->notify_me, 2);
> +            atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
>               aio_notify_accept(ctx);
>           }
>   
> diff --git a/util/async.c b/util/async.c
> index b94518b948..3165a28f2f 100644
> --- a/util/async.c
> +++ b/util/async.c
> @@ -249,7 +249,14 @@ aio_ctx_prepare(GSource *source, gint    *timeout)
>   {
>       AioContext *ctx = (AioContext *) source;
>   
> -    atomic_or(&ctx->notify_me, 1);
> +    atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) | 1);
> +
> +    /*
> +     * Write ctx->notify_me before computing the timeout
> +     * (reading bottom half flags, etc.).  Pairs with
> +     * smp_mb in aio_notify().
> +     */
> +    smp_mb();
>   
>       /* We assume there is no timeout already supplied */
>       *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
> @@ -268,7 +275,8 @@ aio_ctx_check(GSource *source)
>       QEMUBH *bh;
>       BHListSlice *s;
>   
> -    atomic_and(&ctx->notify_me, ~1);
> +    /* Finish computing the timeout before clearing the flag.  */
> +    atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) & ~1);
>       aio_notify_accept(ctx);
>   
>       QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
> @@ -411,10 +419,10 @@ LuringState *aio_get_linux_io_uring(AioContext *ctx)
>   void aio_notify(AioContext *ctx)
>   {
>       /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
> -     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
> +     * with smp_mb in aio_ctx_prepare or aio_poll.
>        */
>       smp_mb();
> -    if (ctx->notify_me) {
> +    if (atomic_read(&ctx->notify_me)) {
>           event_notifier_set(&ctx->notifier);
>           atomic_mb_set(&ctx->notified, true);
>       }
> 
Tested-by: Ying Fang <fangying1@huawei.com>
diff mbox series

Patch

diff --git a/util/aio-posix.c b/util/aio-posix.c
index cd6cf0a4a9..c3613d299e 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -559,6 +559,11 @@  bool aio_poll(AioContext *ctx, bool blocking)
     int64_t timeout;
     int64_t start = 0;
 
+    /*
+     * There cannot be two concurrent aio_poll calls for the same AioContext (or
+     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
+     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
+     */
     assert(in_aio_context_home_thread(ctx));
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -569,7 +574,13 @@  bool aio_poll(AioContext *ctx, bool blocking)
      * so disable the optimization now.
      */
     if (blocking) {
-        atomic_add(&ctx->notify_me, 2);
+        atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
+        /*
+         * Write ctx->notify_me before computing the timeout
+         * (reading bottom half flags, etc.).  Pairs with
+         * smp_mb in aio_notify().
+         */
+        smp_mb();
     }
 
     qemu_lockcnt_inc(&ctx->list_lock);
@@ -590,7 +601,8 @@  bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     if (blocking) {
-        atomic_sub(&ctx->notify_me, 2);
+        /* Finish the poll before clearing the flag.  */
+        atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
         aio_notify_accept(ctx);
     }
 
diff --git a/util/aio-win32.c b/util/aio-win32.c
index a23b9c364d..729d533faf 100644
--- a/util/aio-win32.c
+++ b/util/aio-win32.c
@@ -321,6 +321,12 @@  bool aio_poll(AioContext *ctx, bool blocking)
     int count;
     int timeout;
 
+    /*
+     * There cannot be two concurrent aio_poll calls for the same AioContext (or
+     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
+     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
+     */
+    assert(in_aio_context_home_thread(ctx));
     progress = false;
 
     /* aio_notify can avoid the expensive event_notifier_set if
@@ -331,7 +337,13 @@  bool aio_poll(AioContext *ctx, bool blocking)
      * so disable the optimization now.
      */
     if (blocking) {
-        atomic_add(&ctx->notify_me, 2);
+        atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
+        /*
+         * Write ctx->notify_me before computing the timeout
+         * (reading bottom half flags, etc.).  Pairs with
+         * smp_mb in aio_notify().
+         */
+        smp_mb();
     }
 
     qemu_lockcnt_inc(&ctx->list_lock);
@@ -364,8 +376,7 @@  bool aio_poll(AioContext *ctx, bool blocking)
         ret = WaitForMultipleObjects(count, events, FALSE, timeout);
         if (blocking) {
             assert(first);
-            assert(in_aio_context_home_thread(ctx));
-            atomic_sub(&ctx->notify_me, 2);
+            atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
             aio_notify_accept(ctx);
         }
 
diff --git a/util/async.c b/util/async.c
index b94518b948..3165a28f2f 100644
--- a/util/async.c
+++ b/util/async.c
@@ -249,7 +249,14 @@  aio_ctx_prepare(GSource *source, gint    *timeout)
 {
     AioContext *ctx = (AioContext *) source;
 
-    atomic_or(&ctx->notify_me, 1);
+    atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) | 1);
+
+    /*
+     * Write ctx->notify_me before computing the timeout
+     * (reading bottom half flags, etc.).  Pairs with
+     * smp_mb in aio_notify().
+     */
+    smp_mb();
 
     /* We assume there is no timeout already supplied */
     *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
@@ -268,7 +275,8 @@  aio_ctx_check(GSource *source)
     QEMUBH *bh;
     BHListSlice *s;
 
-    atomic_and(&ctx->notify_me, ~1);
+    /* Finish computing the timeout before clearing the flag.  */
+    atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) & ~1);
     aio_notify_accept(ctx);
 
     QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
@@ -411,10 +419,10 @@  LuringState *aio_get_linux_io_uring(AioContext *ctx)
 void aio_notify(AioContext *ctx)
 {
     /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
-     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
+     * with smp_mb in aio_ctx_prepare or aio_poll.
      */
     smp_mb();
-    if (ctx->notify_me) {
+    if (atomic_read(&ctx->notify_me)) {
         event_notifier_set(&ctx->notifier);
         atomic_mb_set(&ctx->notified, true);
     }