diff mbox series

[3/8] qemu-thread-win32: cleanup, fix, document QemuEvent

Message ID 20230303171939.237819-4-pbonzini@redhat.com
State New
Headers show
Series Fix missing memory barriers on ARM | expand

Commit Message

Paolo Bonzini March 3, 2023, 5:19 p.m. UTC
QemuEvent is currently broken on ARM due to missing memory barriers
after qatomic_*().  Apart from adding the memory barrier, a closer look
reveals some unpaired memory barriers that are not really needed and
complicated the functions unnecessarily, as well as some optimizations
that I couldn't quite prove to be correct.

Finally, the code is relying on a memory barrier in ResetEvent(); the
barrier _ought_ to be there but there is really no documentation about
it; it only affects the slow path, so make it explicit.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/qemu-thread-win32.c | 78 +++++++++++++++++++++++++++-------------
 1 file changed, 53 insertions(+), 25 deletions(-)

Comments

Richard Henderson March 5, 2023, 7:14 p.m. UTC | #1
On 3/3/23 09:19, Paolo Bonzini wrote:
> QemuEvent is currently broken on ARM due to missing memory barriers
> after qatomic_*().  Apart from adding the memory barrier, a closer look
> reveals some unpaired memory barriers that are not really needed and
> complicated the functions unnecessarily, as well as some optimizations
> that I couldn't quite prove to be correct.
> 
> Finally, the code is relying on a memory barrier in ResetEvent(); the
> barrier_ought_  to be there but there is really no documentation about
> it; it only affects the slow path, so make it explicit.
> 
> Signed-off-by: Paolo Bonzini<pbonzini@redhat.com>
> ---
>   util/qemu-thread-win32.c | 78 +++++++++++++++++++++++++++-------------
>   1 file changed, 53 insertions(+), 25 deletions(-)

Reviewed-by: Richard Henderson <richard.henderson@linaro.org>

r~
David Hildenbrand March 6, 2023, 1:31 p.m. UTC | #2
On 03.03.23 18:19, Paolo Bonzini wrote:
> QemuEvent is currently broken on ARM due to missing memory barriers
> after qatomic_*().  Apart from adding the memory barrier, a closer look
> reveals some unpaired memory barriers that are not really needed and
> complicated the functions unnecessarily, as well as some optimizations
> that I couldn't quite prove to be correct.
> 
> Finally, the code is relying on a memory barrier in ResetEvent(); the
> barrier _ought_ to be there but there is really no documentation about
> it; it only affects the slow path, so make it explicit.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---

[...]

>   util/qemu-thread-win32.c | 78 +++++++++++++++++++++++++++-------------
>   1 file changed, 53 insertions(+), 25 deletions(-)
> 
> diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
> index 69db254ac7c1..eff664ae6b31 100644
> --- a/util/qemu-thread-win32.c
> +++ b/util/qemu-thread-win32.c
> @@ -272,12 +272,20 @@ void qemu_event_destroy(QemuEvent *ev)
>   void qemu_event_set(QemuEvent *ev)
>   {
>       assert(ev->initialized);
> -    /* qemu_event_set has release semantics, but because it *loads*
> +
> +    /*
> +     * Pairs with memory barrier in qemu_event_reset.
> +     *
> +     * qemu_event_set has release semantics, but because it *loads*
>        * ev->value we need a full memory barrier here.
>        */
>       smp_mb();
>       if (qatomic_read(&ev->value) != EV_SET) {
> -        if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
> +        int old = qatomic_xchg(&ev->value, EV_SET);
> +
> +        /* Pairs with memory barrier after ResetEvent.  */
> +        smp_mb__after_rmw();
> +        if (old == EV_BUSY) {
>               /* There were waiters, wake them up.  */
>               SetEvent(ev->event);
>           }
> @@ -286,17 +294,19 @@ void qemu_event_set(QemuEvent *ev)
>   
>   void qemu_event_reset(QemuEvent *ev)
>   {
> -    unsigned value;
> -
>       assert(ev->initialized);
> -    value = qatomic_read(&ev->value);
> -    smp_mb_acquire();
> -    if (value == EV_SET) {
> -        /* If there was a concurrent reset (or even reset+wait),
> -         * do nothing.  Otherwise change EV_SET->EV_FREE.
> -         */
> -        qatomic_or(&ev->value, EV_FREE);
> -    }
> +

[had the same thought on patch #2]

IIUC, the "read first" is an optimization to not unconditionally dirty 
the cache-line. But I assume we don't particularly care about that 
optimization on the reset path.

> +    /*
> +     * If there was a concurrent reset (or even reset+wait),
> +     * do nothing.  Otherwise change EV_SET->EV_FREE.
> +     */
> +    qatomic_or(&ev->value, EV_FREE);
> +
> +    /*
> +     * Order reset before checking the condition in the caller.
> +     * Pairs with the first memory barrier in qemu_event_set().
> +     */
> +    smp_mb__after_rmw();
>   }
>   
>   void qemu_event_wait(QemuEvent *ev)
> @@ -304,29 +314,47 @@ void qemu_event_wait(QemuEvent *ev)
>       unsigned value;
>   
>       assert(ev->initialized);
> +
> +    /*
> +     * This read does not have any particular ordering requirements;
> +     * if it moves earlier, we might miss qemu_event_set() and go down the
> +     * slow path unnecessarily, but ultimately the memory barrier below,
> +     * plus the internal synchronization of the kernel event, will ensure
> +     * the check is done correctly.
> +     */
>       value = qatomic_read(&ev->value);
> -    smp_mb_acquire();
>       if (value != EV_SET) {
>           if (value == EV_FREE) {
> -            /* qemu_event_set is not yet going to call SetEvent, but we are
> -             * going to do another check for EV_SET below when setting EV_BUSY.
> -             * At that point it is safe to call WaitForSingleObject.
> +            /*
> +             * Here the underlying kernel event is reset, but qemu_event_set is
> +             * not yet going to call SetEvent.  However, there will be another
> +             * check for EV_SET below when setting EV_BUSY.  At that point it
> +             * is safe to call WaitForSingleObject.
>                */
>               ResetEvent(ev->event);
>   
> -            /* Tell qemu_event_set that there are waiters.  No need to retry
> -             * because there cannot be a concurrent busy->free transition.
> -             * After the CAS, the event will be either set or busy.
> +            /*
> +             * It is not clear whether ResetEvent provides this barrier; kernel
> +             * APIs (KeResetEvent/KeClearEvent) do not.  Better safe than sorry!
> +             */
> +            smp_mb();
> +
> +            /*
> +             * Leave the event reset and tell qemu_event_set that there are
> +             * waiters.  No need to retry, because there cannot be a concurrent
> +             * busy->free transition.  After the CAS, the event will be either
> +             * set or busy.
>                */
>               if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
> -                value = EV_SET;
> -            } else {
> -                value = EV_BUSY;
> +                return;
>               }
>           }
> -        if (value == EV_BUSY) {
> -            WaitForSingleObject(ev->event, INFINITE);
> -        }
> +
> +        /*
> +         * ev->value is now EV_BUSY.  Since we didn't observe EV_SET,
> +         * qemu_event_set() must observe EV_BUSY and call SetEvent().
> +         */
> +        WaitForSingleObject(ev->event, INFINITE);
>       }
>   }
>   

Reviewed-by: David Hildenbrand <david@redhat.com>
Paolo Bonzini March 6, 2023, 2:20 p.m. UTC | #3
On 3/6/23 14:31, David Hildenbrand wrote:
>>
>> -    smp_mb_acquire();
>> -    if (value == EV_SET) {
>> -        /* If there was a concurrent reset (or even reset+wait),
>> -         * do nothing.  Otherwise change EV_SET->EV_FREE.
>> -         */
>> -        qatomic_or(&ev->value, EV_FREE);
>> -    }
>> +
> 
> [had the same thought on patch #2]
> 
> IIUC, the "read first" is an optimization to not unconditionally dirty 
> the cache-line. But I assume we don't particularly care about that 
> optimization on the reset path.

Thinking more about it, the intended usage of QemuEvent is either

     qemu_event_reset();
     if (!check()) {
          qemu_event_wait());
     }

or

     if (!check()) {
         qemu_event_reset();
         if (!check()) {
              qemu_event_wait());
         }
     }

If we don't care on the reset path we care much less on the wait path. 
Should I remove it and go straight to the cmpxchg, just for peace of mind?

Paolo
David Hildenbrand March 6, 2023, 2:32 p.m. UTC | #4
On 06.03.23 15:20, Paolo Bonzini wrote:
> On 3/6/23 14:31, David Hildenbrand wrote:
>>>
>>> -    smp_mb_acquire();
>>> -    if (value == EV_SET) {
>>> -        /* If there was a concurrent reset (or even reset+wait),
>>> -         * do nothing.  Otherwise change EV_SET->EV_FREE.
>>> -         */
>>> -        qatomic_or(&ev->value, EV_FREE);
>>> -    }
>>> +
>>
>> [had the same thought on patch #2]
>>
>> IIUC, the "read first" is an optimization to not unconditionally dirty
>> the cache-line. But I assume we don't particularly care about that
>> optimization on the reset path.
> 
> Thinking more about it, the intended usage of QemuEvent is either
> 
>       qemu_event_reset();
>       if (!check()) {
>            qemu_event_wait());
>       }
> 
> or
> 
>       if (!check()) {
>           qemu_event_reset();
>           if (!check()) {
>                qemu_event_wait());
>           }
>       }
> 
> If we don't care on the reset path we care much less on the wait path.
> Should I remove it and go straight to the cmpxchg, just for peace of mind?

Sounds reasonable to me at could simplify qemu_event_wait a bit.
Paolo Bonzini March 6, 2023, 3:17 p.m. UTC | #5
On 3/6/23 15:32, David Hildenbrand wrote:
>>
>> Thinking more about it, the intended usage of QemuEvent is either
>>
>>       qemu_event_reset();
>>       if (!check()) {
>>            qemu_event_wait());
>>       }
>>
>> or
>>
>>       if (!check()) {
>>           qemu_event_reset();
>>           if (!check()) {
>>                qemu_event_wait());
>>           }
>>       }
>>
>> If we don't care on the reset path we care much less on the wait path.
>> Should I remove it and go straight to the cmpxchg, just for peace of 
>> mind?
> 
> Sounds reasonable to me at could simplify qemu_event_wait a bit.

Hmm, it does avoid a whole system call in the Windows case, so I prefer 
to keep it.  And I prefer to keep the load-acquire on the fast path too, 
I don't think it's needed in the actual uses of QemuEvent but it's safer 
in case it's used as "just a boolean".

Paolo
diff mbox series

Patch

diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 69db254ac7c1..eff664ae6b31 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -272,12 +272,20 @@  void qemu_event_destroy(QemuEvent *ev)
 void qemu_event_set(QemuEvent *ev)
 {
     assert(ev->initialized);
-    /* qemu_event_set has release semantics, but because it *loads*
+
+    /*
+     * Pairs with memory barrier in qemu_event_reset.
+     *
+     * qemu_event_set has release semantics, but because it *loads*
      * ev->value we need a full memory barrier here.
      */
     smp_mb();
     if (qatomic_read(&ev->value) != EV_SET) {
-        if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+        int old = qatomic_xchg(&ev->value, EV_SET);
+
+        /* Pairs with memory barrier after ResetEvent.  */
+        smp_mb__after_rmw();
+        if (old == EV_BUSY) {
             /* There were waiters, wake them up.  */
             SetEvent(ev->event);
         }
@@ -286,17 +294,19 @@  void qemu_event_set(QemuEvent *ev)
 
 void qemu_event_reset(QemuEvent *ev)
 {
-    unsigned value;
-
     assert(ev->initialized);
-    value = qatomic_read(&ev->value);
-    smp_mb_acquire();
-    if (value == EV_SET) {
-        /* If there was a concurrent reset (or even reset+wait),
-         * do nothing.  Otherwise change EV_SET->EV_FREE.
-         */
-        qatomic_or(&ev->value, EV_FREE);
-    }
+
+    /*
+     * If there was a concurrent reset (or even reset+wait),
+     * do nothing.  Otherwise change EV_SET->EV_FREE.
+     */
+    qatomic_or(&ev->value, EV_FREE);
+
+    /*
+     * Order reset before checking the condition in the caller.
+     * Pairs with the first memory barrier in qemu_event_set().
+     */
+    smp_mb__after_rmw();
 }
 
 void qemu_event_wait(QemuEvent *ev)
@@ -304,29 +314,47 @@  void qemu_event_wait(QemuEvent *ev)
     unsigned value;
 
     assert(ev->initialized);
+
+    /*
+     * This read does not have any particular ordering requirements;
+     * if it moves earlier, we might miss qemu_event_set() and go down the
+     * slow path unnecessarily, but ultimately the memory barrier below,
+     * plus the internal synchronization of the kernel event, will ensure
+     * the check is done correctly.
+     */
     value = qatomic_read(&ev->value);
-    smp_mb_acquire();
     if (value != EV_SET) {
         if (value == EV_FREE) {
-            /* qemu_event_set is not yet going to call SetEvent, but we are
-             * going to do another check for EV_SET below when setting EV_BUSY.
-             * At that point it is safe to call WaitForSingleObject.
+            /*
+             * Here the underlying kernel event is reset, but qemu_event_set is
+             * not yet going to call SetEvent.  However, there will be another
+             * check for EV_SET below when setting EV_BUSY.  At that point it
+             * is safe to call WaitForSingleObject.
              */
             ResetEvent(ev->event);
 
-            /* Tell qemu_event_set that there are waiters.  No need to retry
-             * because there cannot be a concurrent busy->free transition.
-             * After the CAS, the event will be either set or busy.
+            /*
+             * It is not clear whether ResetEvent provides this barrier; kernel
+             * APIs (KeResetEvent/KeClearEvent) do not.  Better safe than sorry!
+             */
+            smp_mb();
+
+            /*
+             * Leave the event reset and tell qemu_event_set that there are
+             * waiters.  No need to retry, because there cannot be a concurrent
+             * busy->free transition.  After the CAS, the event will be either
+             * set or busy.
              */
             if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
-                value = EV_SET;
-            } else {
-                value = EV_BUSY;
+                return;
             }
         }
-        if (value == EV_BUSY) {
-            WaitForSingleObject(ev->event, INFINITE);
-        }
+
+        /*
+         * ev->value is now EV_BUSY.  Since we didn't observe EV_SET,
+         * qemu_event_set() must observe EV_BUSY and call SetEvent().
+         */
+        WaitForSingleObject(ev->event, INFINITE);
     }
 }