diff mbox series

[RFC,2/9] aio: Add drain begin/end API to AioContext

Message ID 20171129144956.11409-3-famz@redhat.com
State New
Headers show
Series block: Rewrite block drain begin/end | expand

Commit Message

Fam Zheng Nov. 29, 2017, 2:49 p.m. UTC
Signed-off-by: Fam Zheng <famz@redhat.com>
---
 include/block/aio.h | 27 +++++++++++++++++---
 util/async.c        | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 97 insertions(+), 3 deletions(-)

Comments

Stefan Hajnoczi Nov. 30, 2017, 4:07 p.m. UTC | #1
On Wed, Nov 29, 2017 at 10:49:49PM +0800, Fam Zheng wrote:
> diff --git a/util/async.c b/util/async.c
> index 4dd9d95a9e..cca0efd263 100644
> --- a/util/async.c
> +++ b/util/async.c
> @@ -402,6 +402,7 @@ AioContext *aio_context_new(Error **errp)
>      AioContext *ctx;
>  
>      ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
> +    QTAILQ_INIT(&ctx->drain_ops);
>      aio_context_setup(ctx);
>  
>      ret = event_notifier_init(&ctx->notifier, false);
> @@ -506,3 +507,75 @@ void aio_context_release(AioContext *ctx)
>  {
>      qemu_rec_mutex_unlock(&ctx->lock);
>  }
> +
> +/* Called with ctx->lock */
> +void aio_context_drained_begin(AioContext *ctx)
> +{
> +    AioDrainOps *ops;
> +
> +    /* TODO: When all external fds are handled in the following drain_ops
> +     * callbacks, aio_disable_external can be dropped. */
> +    aio_disable_external(ctx);
> +restart:
> +    ctx->drain_ops_updated = false;
> +    QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> +        ops->drained_begin(ops->opaque);
> +        if (ctx->drain_ops_updated) {
> +            goto restart;

drained_begin() can be called multiple times.  This needs to be clearly
documented to avoid surprises.

> +        }
> +    }
> +}
> +
> +/* Called with ctx->lock */
> +void aio_context_drained_end(AioContext *ctx)
> +{
> +    AioDrainOps *ops;
> +
> +restart:
> +    ctx->drain_ops_updated = false;
> +    QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> +        if (ops->is_new) {
> +            continue;
> +        }
> +        ops->drained_end(ops->opaque);

drained_end() can be called multiple times.  This needs to be clearly
documented to avoid surprises.


> +        if (ctx->drain_ops_updated) {
> +            goto restart;
> +        }
> +    }
> +    if (aio_enable_external(ctx)) {
> +        QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> +            ops->is_new = false;
> +        }
> +    }

This is weird, aio_context_drained_end() has nesting support for
->is_new but not for ->drained_end() calls.  I'm not sure where you're
going with these semantics yet.
diff mbox series

Patch

diff --git a/include/block/aio.h b/include/block/aio.h
index e9aeeaec94..40c2f64544 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -47,6 +47,15 @@  typedef void QEMUBHFunc(void *opaque);
 typedef bool AioPollFn(void *opaque);
 typedef void IOHandler(void *opaque);
 
+typedef void AioDrainFn(void *opaque);
+typedef struct AioDrainOps {
+    AioDrainFn *drained_begin;
+    AioDrainFn *drained_end;
+    void *opaque;
+    bool is_new;
+    QTAILQ_ENTRY(AioDrainOps) next;
+} AioDrainOps;
+
 struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
@@ -147,6 +156,9 @@  struct AioContext {
     int epollfd;
     bool epoll_enabled;
     bool epoll_available;
+
+    QTAILQ_HEAD(, AioDrainOps) drain_ops;
+    bool drain_ops_updated;
 };
 
 /**
@@ -441,9 +453,9 @@  int64_t aio_compute_timeout(AioContext *ctx);
  *
  * Disable the further processing of external clients.
  */
-static inline void aio_disable_external(AioContext *ctx)
+static inline bool aio_disable_external(AioContext *ctx)
 {
-    atomic_inc(&ctx->external_disable_cnt);
+    return atomic_fetch_inc(&ctx->external_disable_cnt) == 0;
 }
 
 /**
@@ -452,7 +464,7 @@  static inline void aio_disable_external(AioContext *ctx)
  *
  * Enable the processing of external clients.
  */
-static inline void aio_enable_external(AioContext *ctx)
+static inline bool aio_enable_external(AioContext *ctx)
 {
     int old;
 
@@ -462,6 +474,7 @@  static inline void aio_enable_external(AioContext *ctx)
         /* Kick event loop so it re-arms file descriptors */
         aio_notify(ctx);
     }
+    return old == 1;
 }
 
 /**
@@ -564,4 +577,12 @@  void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
                                  int64_t grow, int64_t shrink,
                                  Error **errp);
 
+void aio_context_drained_begin(AioContext *ctx);
+void aio_context_drained_end(AioContext *ctx);
+
+void aio_context_add_drain_ops(AioContext *ctx,
+                               AioDrainFn *begin, AioDrainFn *end, void *opaque);
+void aio_context_del_drain_ops(AioContext *ctx,
+                               AioDrainFn *begin, AioDrainFn *end, void *opaque);
+
 #endif
diff --git a/util/async.c b/util/async.c
index 4dd9d95a9e..cca0efd263 100644
--- a/util/async.c
+++ b/util/async.c
@@ -402,6 +402,7 @@  AioContext *aio_context_new(Error **errp)
     AioContext *ctx;
 
     ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
+    QTAILQ_INIT(&ctx->drain_ops);
     aio_context_setup(ctx);
 
     ret = event_notifier_init(&ctx->notifier, false);
@@ -506,3 +507,75 @@  void aio_context_release(AioContext *ctx)
 {
     qemu_rec_mutex_unlock(&ctx->lock);
 }
+
+/* Called with ctx->lock */
+void aio_context_drained_begin(AioContext *ctx)
+{
+    AioDrainOps *ops;
+
+    /* TODO: When all external fds are handled in the following drain_ops
+     * callbacks, aio_disable_external can be dropped. */
+    aio_disable_external(ctx);
+restart:
+    ctx->drain_ops_updated = false;
+    QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+        ops->drained_begin(ops->opaque);
+        if (ctx->drain_ops_updated) {
+            goto restart;
+        }
+    }
+}
+
+/* Called with ctx->lock */
+void aio_context_drained_end(AioContext *ctx)
+{
+    AioDrainOps *ops;
+
+restart:
+    ctx->drain_ops_updated = false;
+    QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+        if (ops->is_new) {
+            continue;
+        }
+        ops->drained_end(ops->opaque);
+        if (ctx->drain_ops_updated) {
+            goto restart;
+        }
+    }
+    if (aio_enable_external(ctx)) {
+        QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+            ops->is_new = false;
+        }
+    }
+}
+
+/* Called with ctx->lock */
+void aio_context_add_drain_ops(AioContext *ctx,
+                               AioDrainFn *begin, AioDrainFn *end, void *opaque)
+{
+    AioDrainOps *ops = g_new0(AioDrainOps, 1);
+    ops->drained_begin = begin;
+    ops->drained_end = end;
+    ops->opaque = opaque;
+    ops->is_new = true;
+    QTAILQ_INSERT_TAIL(&ctx->drain_ops, ops, next);
+    ctx->drain_ops_updated = true;
+}
+
+/* Called with ctx->lock */
+void aio_context_del_drain_ops(AioContext *ctx,
+                               AioDrainFn *begin, AioDrainFn *end, void *opaque)
+{
+    AioDrainOps *ops;
+
+    QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+        if (ops->drained_begin == begin &&
+            ops->drained_end == end &&
+            ops->opaque == opaque) {
+            QTAILQ_REMOVE(&ctx->drain_ops, ops, next);
+            ctx->drain_ops_updated = true;
+            g_free(ops);
+            return;
+        }
+    }
+}