diff mbox

[06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock

Message ID 20161129114707.2975-7-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Nov. 29, 2016, 11:47 a.m. UTC
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 aio-posix.c | 51 ++++++++++++++++++++++++++++++---------------------
 1 file changed, 30 insertions(+), 21 deletions(-)

Comments

Stefan Hajnoczi Nov. 30, 2016, 1:31 p.m. UTC | #1
On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>  bool aio_pending(AioContext *ctx)
>  {
>      AioHandler *node;
> +    bool result = false;
>  
> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
> +    /*
> +     * We have to walk very carefully in case aio_set_fd_handler is
> +     * called while we're walking.
> +     */
> +    qemu_lockcnt_inc(&ctx->list_lock);
> +
> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>          int revents;
>  
>          revents = node->pfd.revents & node->pfd.events;
>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>              aio_node_check(ctx, node->is_external)) {
> -            return true;
> +            result = true;
> +            break;
>          }
>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>              aio_node_check(ctx, node->is_external)) {
> -            return true;
> +            result = true;
> +            break;
>          }
>      }
> +    qemu_lockcnt_dec(&ctx->list_lock);
>  
> -    return false;
> +    return result;
>  }
>  
>  bool aio_dispatch(AioContext *ctx)
> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>       * We have to walk very carefully in case aio_set_fd_handler is
>       * called while we're walking.
>       */
> -    ctx->walking_handlers++;
> +    qemu_lockcnt_inc(&ctx->list_lock);
>  
> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>          int revents;
>  
> -        revents = node->pfd.revents & node->pfd.events;
> -        node->pfd.revents = 0;
> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;

Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
but not in aio_pending()?
Paolo Bonzini Nov. 30, 2016, 1:36 p.m. UTC | #2
On 30/11/2016 14:31, Stefan Hajnoczi wrote:
> On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
>> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>>  bool aio_pending(AioContext *ctx)
>>  {
>>      AioHandler *node;
>> +    bool result = false;
>>  
>> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
>> +    /*
>> +     * We have to walk very carefully in case aio_set_fd_handler is
>> +     * called while we're walking.
>> +     */
>> +    qemu_lockcnt_inc(&ctx->list_lock);
>> +
>> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>>          int revents;
>>  
>>          revents = node->pfd.revents & node->pfd.events;
>>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>>              aio_node_check(ctx, node->is_external)) {
>> -            return true;
>> +            result = true;
>> +            break;
>>          }
>>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>>              aio_node_check(ctx, node->is_external)) {
>> -            return true;
>> +            result = true;
>> +            break;
>>          }
>>      }
>> +    qemu_lockcnt_dec(&ctx->list_lock);
>>  
>> -    return false;
>> +    return result;
>>  }
>>  
>>  bool aio_dispatch(AioContext *ctx)
>> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>>       * We have to walk very carefully in case aio_set_fd_handler is
>>       * called while we're walking.
>>       */
>> -    ctx->walking_handlers++;
>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>  
>> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
>> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>>          int revents;
>>  
>> -        revents = node->pfd.revents & node->pfd.events;
>> -        node->pfd.revents = 0;
>> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
> 
> Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
> but not in aio_pending()?

It could use atomic_read there, indeed.

Paolo
Paolo Bonzini Dec. 1, 2016, 3:32 p.m. UTC | #3
On 30/11/2016 14:36, Paolo Bonzini wrote:
> 
> 
> On 30/11/2016 14:31, Stefan Hajnoczi wrote:
>> On Tue, Nov 29, 2016 at 12:47:03PM +0100, Paolo Bonzini wrote:
>>> @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx)
>>>  bool aio_pending(AioContext *ctx)
>>>  {
>>>      AioHandler *node;
>>> +    bool result = false;
>>>  
>>> -    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
>>> +    /*
>>> +     * We have to walk very carefully in case aio_set_fd_handler is
>>> +     * called while we're walking.
>>> +     */
>>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>> +
>>> +    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
>>>          int revents;
>>>  
>>>          revents = node->pfd.revents & node->pfd.events;
>>>          if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
>>>              aio_node_check(ctx, node->is_external)) {
>>> -            return true;
>>> +            result = true;
>>> +            break;
>>>          }
>>>          if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
>>>              aio_node_check(ctx, node->is_external)) {
>>> -            return true;
>>> +            result = true;
>>> +            break;
>>>          }
>>>      }
>>> +    qemu_lockcnt_dec(&ctx->list_lock);
>>>  
>>> -    return false;
>>> +    return result;
>>>  }
>>>  
>>>  bool aio_dispatch(AioContext *ctx)
>>> @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx)
>>>       * We have to walk very carefully in case aio_set_fd_handler is
>>>       * called while we're walking.
>>>       */
>>> -    ctx->walking_handlers++;
>>> +    qemu_lockcnt_inc(&ctx->list_lock);
>>>  
>>> -    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
>>> +    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
>>>          int revents;
>>>  
>>> -        revents = node->pfd.revents & node->pfd.events;
>>> -        node->pfd.revents = 0;
>>> +        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
>>
>> Why is node->pfd.revents accessed with atomic_*() here and in aio_poll()
>> but not in aio_pending()?
> 
> It could use atomic_read there, indeed.

Actually, thanks to the (already committed) patches that limit aio_poll
to the I/O thread, these atomic accesses are not needed anymore.

Paolo
diff mbox

Patch

diff --git a/aio-posix.c b/aio-posix.c
index 93a50ad..c64d36d 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -16,7 +16,7 @@ 
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #ifdef CONFIG_EPOLL_CREATE1
 #include <sys/epoll.h>
@@ -206,6 +206,8 @@  void aio_set_fd_handler(AioContext *ctx,
     bool is_new = false;
     bool deleted = false;
 
+    qemu_lockcnt_lock(&ctx->list_lock);
+
     node = find_aio_handler(ctx, fd);
 
     /* Are we deleting the fd handler? */
@@ -217,7 +219,7 @@  void aio_set_fd_handler(AioContext *ctx,
         g_source_remove_poll(&ctx->source, &node->pfd);
 
         /* If the lock is held, just mark the node as deleted */
-        if (ctx->walking_handlers) {
+        if (qemu_lockcnt_count(&ctx->list_lock)) {
             node->deleted = 1;
             node->pfd.revents = 0;
         } else {
@@ -233,7 +235,7 @@  void aio_set_fd_handler(AioContext *ctx,
             /* Alloc and insert if it's not already there */
             node = g_new0(AioHandler, 1);
             node->pfd.fd = fd;
-            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
 
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
@@ -249,6 +251,7 @@  void aio_set_fd_handler(AioContext *ctx,
     }
 
     aio_epoll_update(ctx, node, is_new);
+    qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
     if (deleted) {
         g_free(node);
@@ -272,22 +275,32 @@  bool aio_prepare(AioContext *ctx)
 bool aio_pending(AioContext *ctx)
 {
     AioHandler *node;
+    bool result = false;
 
-    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+    /*
+     * We have to walk very carefully in case aio_set_fd_handler is
+     * called while we're walking.
+     */
+    qemu_lockcnt_inc(&ctx->list_lock);
+
+    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
         int revents;
 
         revents = node->pfd.revents & node->pfd.events;
         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
             aio_node_check(ctx, node->is_external)) {
-            return true;
+            result = true;
+            break;
         }
     }
+    qemu_lockcnt_dec(&ctx->list_lock);
 
-    return false;
+    return result;
 }
 
 bool aio_dispatch(AioContext *ctx)
@@ -308,13 +321,12 @@  bool aio_dispatch(AioContext *ctx)
      * We have to walk very carefully in case aio_set_fd_handler is
      * called while we're walking.
      */
-    ctx->walking_handlers++;
+    qemu_lockcnt_inc(&ctx->list_lock);
 
-    QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) {
+    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
         int revents;
 
-        revents = node->pfd.revents & node->pfd.events;
-        node->pfd.revents = 0;
+        revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events;
 
         if (!node->deleted &&
             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
@@ -336,16 +348,15 @@  bool aio_dispatch(AioContext *ctx)
         }
 
         if (node->deleted) {
-            ctx->walking_handlers--;
-            if (!ctx->walking_handlers) {
+            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
                 QLIST_REMOVE(node, node);
                 g_free(node);
+                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
             }
-            ctx->walking_handlers++;
         }
     }
 
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run our timers */
     progress |= timerlistgroup_run_timers(&ctx->tlg);
@@ -420,14 +431,12 @@  bool aio_poll(AioContext *ctx, bool blocking)
         atomic_add(&ctx->notify_me, 2);
     }
 
-    ctx->walking_handlers++;
-
+    qemu_lockcnt_inc(&ctx->list_lock);
     assert(npfd == 0);
 
     /* fill pollfds */
-
     if (!aio_epoll_enabled(ctx)) {
-        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
             if (!node->deleted && node->pfd.events
                 && aio_node_check(ctx, node->is_external)) {
                 add_pollfd(node);
@@ -464,12 +473,12 @@  bool aio_poll(AioContext *ctx, bool blocking)
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
         for (i = 0; i < npfd; i++) {
-            nodes[i]->pfd.revents = pollfds[i].revents;
+            atomic_or(&nodes[i]->pfd.revents, pollfds[i].revents);
         }
     }
 
     npfd = 0;
-    ctx->walking_handlers--;
+    qemu_lockcnt_dec(&ctx->list_lock);
 
     /* Run dispatch even if there were no readable fds to run timers */
     if (aio_dispatch(ctx)) {