diff mbox

[RFC,v8,15/21] aio: replace stack of bottom halves with queue

Message ID 20150122085254.5276.53380.stgit@PASHA-ISP.def.inno
State New
Headers show

Commit Message

Pavel Dovgalyuk Jan. 22, 2015, 8:52 a.m. UTC
Bottom halves in AIO context are stored and removes
in LIFO order. It makes their execution non-deterministic.
This patch replaces the stack with queue to preserve the
order of bottom halves processing.

Signed-off-by: Pavel Dovgalyuk <pavel.dovgaluk@ispras.ru>
---
 async.c              |   25 +++++++++----------------
 include/block/aio.h  |    4 ++--
 include/qemu/queue.h |    7 +++++++
 3 files changed, 18 insertions(+), 18 deletions(-)

Comments

Paolo Bonzini Jan. 30, 2015, 10:43 a.m. UTC | #1
On 22/01/2015 09:52, Pavel Dovgalyuk wrote:
> Bottom halves in AIO context are stored and removes
> in LIFO order. It makes their execution non-deterministic.
> This patch replaces the stack with queue to preserve the
> order of bottom halves processing.
> 
> Signed-off-by: Pavel Dovgalyuk <pavel.dovgaluk@ispras.ru>
> ---
>  async.c              |   25 +++++++++----------------
>  include/block/aio.h  |    4 ++--
>  include/qemu/queue.h |    7 +++++++
>  3 files changed, 18 insertions(+), 18 deletions(-)
> 
> diff --git a/async.c b/async.c
> index 2be88cc..bc6e83b 100644
> --- a/async.c
> +++ b/async.c
> @@ -35,7 +35,7 @@ struct QEMUBH {
>      AioContext *ctx;
>      QEMUBHFunc *cb;
>      void *opaque;
> -    QEMUBH *next;
> +    QSIMPLEQ_ENTRY(QEMUBH) next;
>      bool scheduled;
>      bool idle;
>      bool deleted;
> @@ -51,10 +51,7 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>          .opaque = opaque,
>      };
>      qemu_mutex_lock(&ctx->bh_lock);
> -    bh->next = ctx->first_bh;
> -    /* Make sure that the members are ready before putting bh into list */
> -    smp_wmb();
> -    ctx->first_bh = bh;
> +    QSIMPLEQ_INSERT_TAIL_RCU(&ctx->bh_queue, bh, next);
>      qemu_mutex_unlock(&ctx->bh_lock);
>      return bh;
>  }
> @@ -62,16 +59,15 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>  /* Multiple occurrences of aio_bh_poll cannot be called concurrently */
>  int aio_bh_poll(AioContext *ctx)
>  {
> -    QEMUBH *bh, **bhp, *next;
> +    QEMUBH *bh, *next;
>      int ret;
>  
>      ctx->walking_bh++;
>  
>      ret = 0;
> -    for (bh = ctx->first_bh; bh; bh = next) {
> +    QSIMPLEQ_FOREACH_SAFE(bh, &ctx->bh_queue, next, next) {
>          /* Make sure that fetching bh happens before accessing its members */
>          smp_read_barrier_depends();
> -        next = bh->next;

Must use QSIMPLEQ_FOREACH.  Otherwise, the access of next is before the
smp_read_barrier_depends().

>          if (!bh->deleted && bh->scheduled) {
>              bh->scheduled = 0;
>              /* Paired with write barrier in bh schedule to ensure reading for
> @@ -90,14 +86,10 @@ int aio_bh_poll(AioContext *ctx)
>      /* remove deleted bhs */
>      if (!ctx->walking_bh) {
>          qemu_mutex_lock(&ctx->bh_lock);
> -        bhp = &ctx->first_bh;
> -        while (*bhp) {
> -            bh = *bhp;
> +        QSIMPLEQ_FOREACH_SAFE(bh, &ctx->bh_queue, next, next) {
>              if (bh->deleted) {
> -                *bhp = bh->next;
> +                QSIMPLEQ_REMOVE(&ctx->bh_queue, bh, QEMUBH, next);
>                  g_free(bh);
> -            } else {
> -                bhp = &bh->next;
>              }
>          }
>          qemu_mutex_unlock(&ctx->bh_lock);
> @@ -161,7 +153,7 @@ aio_compute_timeout(AioContext *ctx)
>      int timeout = -1;
>      QEMUBH *bh;
>  
> -    for (bh = ctx->first_bh; bh; bh = bh->next) {
> +    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
>          if (!bh->deleted && bh->scheduled) {
>              if (bh->idle) {
>                  /* idle bottom halves will be polled at least
> @@ -204,7 +196,7 @@ aio_ctx_check(GSource *source)
>      AioContext *ctx = (AioContext *) source;
>      QEMUBH *bh;
>  
> -    for (bh = ctx->first_bh; bh; bh = bh->next) {
> +    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
>          if (!bh->deleted && bh->scheduled) {
>              return true;
>  	}
> @@ -311,6 +303,7 @@ AioContext *aio_context_new(Error **errp)
>      qemu_mutex_init(&ctx->bh_lock);
>      rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
>      timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
> +    QSIMPLEQ_INIT(&ctx->bh_queue);
>  
>      return ctx;
>  }
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 7d1e26b..82cdf78 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -71,8 +71,8 @@ struct AioContext {
>      /* lock to protect between bh's adders and deleter */
>      QemuMutex bh_lock;
>  
> -    /* Anchor of the list of Bottom Halves belonging to the context */
> -    struct QEMUBH *first_bh;
> +    /* List of Bottom Halves belonging to the context */
> +    QSIMPLEQ_HEAD(, QEMUBH) bh_queue;
>  
>      /* A simple lock used to protect the first_bh list, and ensure that
>       * no callbacks are removed while we're walking and dispatching callbacks.
> diff --git a/include/qemu/queue.h b/include/qemu/queue.h
> index a98eb3a..b94c4d4 100644
> --- a/include/qemu/queue.h
> +++ b/include/qemu/queue.h
> @@ -268,6 +268,13 @@ struct {                                                                \
>      (head)->sqh_last = &(elm)->field.sqe_next;                          \
>  } while (/*CONSTCOND*/0)
>  
> +#define QSIMPLEQ_INSERT_TAIL_RCU(head, elm, field) do {                 \
> +    (elm)->field.sqe_next = NULL;                                       \
> +    *(head)->sqh_last = (elm);                                          \
> +    (head)->sqh_last = &(elm)->field.sqe_next;                          \
> +    smp_wmb();                                                          \

smp_wmb() must be after the (elm) assignment.

Paolo

> +} while (/*CONSTCOND*/0)
> +
>  #define QSIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do {           \
>      if (((elm)->field.sqe_next = (listelm)->field.sqe_next) == NULL)    \
>          (head)->sqh_last = &(elm)->field.sqe_next;                      \
>
diff mbox

Patch

diff --git a/async.c b/async.c
index 2be88cc..bc6e83b 100644
--- a/async.c
+++ b/async.c
@@ -35,7 +35,7 @@  struct QEMUBH {
     AioContext *ctx;
     QEMUBHFunc *cb;
     void *opaque;
-    QEMUBH *next;
+    QSIMPLEQ_ENTRY(QEMUBH) next;
     bool scheduled;
     bool idle;
     bool deleted;
@@ -51,10 +51,7 @@  QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
         .opaque = opaque,
     };
     qemu_mutex_lock(&ctx->bh_lock);
-    bh->next = ctx->first_bh;
-    /* Make sure that the members are ready before putting bh into list */
-    smp_wmb();
-    ctx->first_bh = bh;
+    QSIMPLEQ_INSERT_TAIL_RCU(&ctx->bh_queue, bh, next);
     qemu_mutex_unlock(&ctx->bh_lock);
     return bh;
 }
@@ -62,16 +59,15 @@  QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 /* Multiple occurrences of aio_bh_poll cannot be called concurrently */
 int aio_bh_poll(AioContext *ctx)
 {
-    QEMUBH *bh, **bhp, *next;
+    QEMUBH *bh, *next;
     int ret;
 
     ctx->walking_bh++;
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
+    QSIMPLEQ_FOREACH_SAFE(bh, &ctx->bh_queue, next, next) {
         /* Make sure that fetching bh happens before accessing its members */
         smp_read_barrier_depends();
-        next = bh->next;
         if (!bh->deleted && bh->scheduled) {
             bh->scheduled = 0;
             /* Paired with write barrier in bh schedule to ensure reading for
@@ -90,14 +86,10 @@  int aio_bh_poll(AioContext *ctx)
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
         qemu_mutex_lock(&ctx->bh_lock);
-        bhp = &ctx->first_bh;
-        while (*bhp) {
-            bh = *bhp;
+        QSIMPLEQ_FOREACH_SAFE(bh, &ctx->bh_queue, next, next) {
             if (bh->deleted) {
-                *bhp = bh->next;
+                QSIMPLEQ_REMOVE(&ctx->bh_queue, bh, QEMUBH, next);
                 g_free(bh);
-            } else {
-                bhp = &bh->next;
             }
         }
         qemu_mutex_unlock(&ctx->bh_lock);
@@ -161,7 +153,7 @@  aio_compute_timeout(AioContext *ctx)
     int timeout = -1;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
         if (!bh->deleted && bh->scheduled) {
             if (bh->idle) {
                 /* idle bottom halves will be polled at least
@@ -204,7 +196,7 @@  aio_ctx_check(GSource *source)
     AioContext *ctx = (AioContext *) source;
     QEMUBH *bh;
 
-    for (bh = ctx->first_bh; bh; bh = bh->next) {
+    QSIMPLEQ_FOREACH(bh, &ctx->bh_queue, next) {
         if (!bh->deleted && bh->scheduled) {
             return true;
 	}
@@ -311,6 +303,7 @@  AioContext *aio_context_new(Error **errp)
     qemu_mutex_init(&ctx->bh_lock);
     rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
+    QSIMPLEQ_INIT(&ctx->bh_queue);
 
     return ctx;
 }
diff --git a/include/block/aio.h b/include/block/aio.h
index 7d1e26b..82cdf78 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -71,8 +71,8 @@  struct AioContext {
     /* lock to protect between bh's adders and deleter */
     QemuMutex bh_lock;
 
-    /* Anchor of the list of Bottom Halves belonging to the context */
-    struct QEMUBH *first_bh;
+    /* List of Bottom Halves belonging to the context */
+    QSIMPLEQ_HEAD(, QEMUBH) bh_queue;
 
     /* A simple lock used to protect the first_bh list, and ensure that
      * no callbacks are removed while we're walking and dispatching callbacks.
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index a98eb3a..b94c4d4 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -268,6 +268,13 @@  struct {                                                                \
     (head)->sqh_last = &(elm)->field.sqe_next;                          \
 } while (/*CONSTCOND*/0)
 
+#define QSIMPLEQ_INSERT_TAIL_RCU(head, elm, field) do {                 \
+    (elm)->field.sqe_next = NULL;                                       \
+    *(head)->sqh_last = (elm);                                          \
+    (head)->sqh_last = &(elm)->field.sqe_next;                          \
+    smp_wmb();                                                          \
+} while (/*CONSTCOND*/0)
+
 #define QSIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do {           \
     if (((elm)->field.sqe_next = (listelm)->field.sqe_next) == NULL)    \
         (head)->sqh_last = &(elm)->field.sqe_next;                      \