diff mbox series

thread-pool: Use an EventNotifier to coordinate with AioContext

Message ID 20190325133435.27953-1-slp@redhat.com
State New
Headers show
Series thread-pool: Use an EventNotifier to coordinate with AioContext | expand

Commit Message

Sergio Lopez March 25, 2019, 1:34 p.m. UTC
Our current ThreadPool implementation lacks support for AioContext's
event notifications. This not only means that it can't take advantage
from the IOThread's adaptive polling, but also that the latter works
against it, as it delays the execution of the bottom-halves
completions.

Here we implement handlers for both io_poll and io_read, and an
EventNotifier which is signaled from the worker threads after
returning from the asynchronous function.

The original issue and the improvement obtained from this patch can be
illustrated by the following fio test:

 * Host: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
 * pseudo-storage: null_blk gb=10 nr_devices=2 irqmode=2
   completion_nsec=30000 (latency ~= NVMe)
 * fio cmdline: fio --time_based --runtime=30 --rw=randread
   --name=randread  --filename=/dev/vdb --direct=1 --ioengine=libaio
   --iodepth=1

                               ================
 ==============================| latency (us) |
 | master (poll-max-ns=50000)  |        69.87 |
 | master (poll-max-ns=0       |        56.11 |
 | patched (poll-max-ns=50000) |        49.45 |
 ==============================================

Signed-off-by: Sergio Lopez <slp@redhat.com>
---
 util/thread-pool.c | 48 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 46 insertions(+), 2 deletions(-)

Comments

Stefan Hajnoczi March 26, 2019, 9:03 a.m. UTC | #1
On Mon, Mar 25, 2019 at 02:34:36PM +0100, Sergio Lopez wrote:
> Our current ThreadPool implementation lacks support for AioContext's
> event notifications. This not only means that it can't take advantage
> from the IOThread's adaptive polling, but also that the latter works
> against it, as it delays the execution of the bottom-halves
> completions.

util/async.c:event_notifier_poll() participates in polling and should
detect that a bottom half was scheduled by a worker thread.

Can you investigate, because I'm not sure this commit description has
identified the root cause of the performance improvement?

> Here we implement handlers for both io_poll and io_read, and an
> EventNotifier which is signaled from the worker threads after
> returning from the asynchronous function.
> 
> The original issue and the improvement obtained from this patch can be
> illustrated by the following fio test:
> 
>  * Host: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
>  * pseudo-storage: null_blk gb=10 nr_devices=2 irqmode=2
>    completion_nsec=30000 (latency ~= NVMe)
>  * fio cmdline: fio --time_based --runtime=30 --rw=randread
>    --name=randread  --filename=/dev/vdb --direct=1 --ioengine=libaio
>    --iodepth=1
> 
>                                ================
>  ==============================| latency (us) |
>  | master (poll-max-ns=50000)  |        69.87 |
>  | master (poll-max-ns=0       |        56.11 |
>  | patched (poll-max-ns=50000) |        49.45 |
>  ==============================================
> 
> Signed-off-by: Sergio Lopez <slp@redhat.com>
> ---
>  util/thread-pool.c | 48 ++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 46 insertions(+), 2 deletions(-)
> 
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 610646d131..058ed7f0ae 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -61,6 +61,7 @@ struct ThreadPool {
>      QemuSemaphore sem;
>      int max_threads;
>      QEMUBH *new_thread_bh;
> +    EventNotifier e;
>  
>      /* The following variables are only accessed from one AioContext. */
>      QLIST_HEAD(, ThreadPoolElement) head;
> @@ -72,6 +73,7 @@ struct ThreadPool {
>      int new_threads;     /* backlog of threads we need to create */
>      int pending_threads; /* threads created but not running yet */
>      bool stopping;
> +    bool event_notifier_enabled;
>  };
>  
>  static void *worker_thread(void *opaque)
> @@ -108,6 +110,9 @@ static void *worker_thread(void *opaque)
>          /* Write ret before state.  */
>          smp_wmb();
>          req->state = THREAD_DONE;
> +        if (pool->event_notifier_enabled) {
> +            event_notifier_set(&pool->e);
> +        }
>  
>          qemu_mutex_lock(&pool->lock);
>  
> @@ -160,10 +165,10 @@ static void spawn_thread(ThreadPool *pool)
>      }
>  }
>  
> -static void thread_pool_completion_bh(void *opaque)
> +static bool thread_pool_process_completions(ThreadPool *pool)
>  {
> -    ThreadPool *pool = opaque;
>      ThreadPoolElement *elem, *next;
> +    bool progress = false;
>  
>      aio_context_acquire(pool->ctx);
>  restart:
> @@ -172,6 +177,8 @@ restart:
>              continue;
>          }
>  
> +        progress = true;
> +
>          trace_thread_pool_complete(pool, elem, elem->common.opaque,
>                                     elem->ret);
>          QLIST_REMOVE(elem, all);
> @@ -202,6 +209,32 @@ restart:
>          }
>      }
>      aio_context_release(pool->ctx);
> +
> +    return progress;
> +}
> +
> +static void thread_pool_completion_bh(void *opaque)
> +{
> +    ThreadPool *pool = opaque;
> +
> +    thread_pool_process_completions(pool);
> +}
> +
> +static void thread_pool_completion_cb(EventNotifier *e)
> +{
> +    ThreadPool *pool = container_of(e, ThreadPool, e);
> +
> +    if (event_notifier_test_and_clear(&pool->e)) {
> +        thread_pool_completion_bh(pool);
> +    }
> +}
> +
> +static bool thread_pool_poll_cb(void *opaque)
> +{
> +    EventNotifier *e = opaque;
> +    ThreadPool *pool = container_of(e, ThreadPool, e);
> +
> +    return thread_pool_process_completions(pool);
>  }
>  
>  static void thread_pool_cancel(BlockAIOCB *acb)
> @@ -311,6 +344,13 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
>      pool->max_threads = 64;
>      pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
>  
> +    if (event_notifier_init(&pool->e, false) >= 0) {
> +        aio_set_event_notifier(ctx, &pool->e, false,
> +                               thread_pool_completion_cb,
> +                               thread_pool_poll_cb);
> +        pool->event_notifier_enabled = true;
> +    }
> +
>      QLIST_INIT(&pool->head);
>      QTAILQ_INIT(&pool->request_list);
>  }
> @@ -346,6 +386,10 @@ void thread_pool_free(ThreadPool *pool)
>  
>      qemu_mutex_unlock(&pool->lock);
>  
> +    if (pool->event_notifier_enabled) {
> +        aio_set_event_notifier(pool->ctx, &pool->e, false, NULL, NULL);
> +        event_notifier_cleanup(&pool->e);
> +    }
>      qemu_bh_delete(pool->completion_bh);
>      qemu_sem_destroy(&pool->sem);
>      qemu_cond_destroy(&pool->worker_stopped);
> -- 
> 2.20.1
> 
>
Sergio Lopez March 26, 2019, 11:44 a.m. UTC | #2
Stefan Hajnoczi writes:

> On Mon, Mar 25, 2019 at 02:34:36PM +0100, Sergio Lopez wrote:
>> Our current ThreadPool implementation lacks support for AioContext's
>> event notifications. This not only means that it can't take advantage
>> from the IOThread's adaptive polling, but also that the latter works
>> against it, as it delays the execution of the bottom-halves
>> completions.
>
> util/async.c:event_notifier_poll() participates in polling and should
> detect that a bottom half was scheduled by a worker thread.
>
> Can you investigate, because I'm not sure this commit description has
> identified the root cause of the performance improvement?

The problem with event_notifier_poll() is that ctx->notifier is
explicitly ignored in run_poll_handlers_once(), so the BH notification
doesn't count as making progress and run_poll_handlers() keeps running
the polling loop:

513 static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
514 {   
515     bool progress = false;
516     AioHandler *node;
517 
518     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
519         if (!node->deleted && node->io_poll &&
520             aio_node_check(ctx, node->is_external) &&
521             node->io_poll(node->opaque)) {
522             *timeout = 0;
523             if (node->opaque != &ctx->notifier) {
524                 progress = true;
525             }
526         }
527 
528         /* Caller handles freeing deleted nodes.  Don't do it here. */
529     }
530 
531     return progress;
532 }

547 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
548 {
549     bool progress;
550     int64_t start_time, elapsed_time;
551 
552     assert(ctx->notify_me);
553     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
554 
555     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
556 
557     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
558     do {
559         progress = run_poll_handlers_once(ctx, timeout);
560         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
561     } while (!progress && elapsed_time < max_ns
562              && !atomic_read(&ctx->poll_disable_cnt));
(...)

Thanks,
Sergio.
Stefan Hajnoczi March 29, 2019, 10:48 a.m. UTC | #3
On Tue, Mar 26, 2019 at 12:44:11PM +0100, Sergio Lopez wrote:
> 
> Stefan Hajnoczi writes:
> 
> > On Mon, Mar 25, 2019 at 02:34:36PM +0100, Sergio Lopez wrote:
> >> Our current ThreadPool implementation lacks support for AioContext's
> >> event notifications. This not only means that it can't take advantage
> >> from the IOThread's adaptive polling, but also that the latter works
> >> against it, as it delays the execution of the bottom-halves
> >> completions.
> >
> > util/async.c:event_notifier_poll() participates in polling and should
> > detect that a bottom half was scheduled by a worker thread.
> >
> > Can you investigate, because I'm not sure this commit description has
> > identified the root cause of the performance improvement?
> 
> The problem with event_notifier_poll() is that ctx->notifier is
> explicitly ignored in run_poll_handlers_once(), so the BH notification
> doesn't count as making progress and run_poll_handlers() keeps running
> the polling loop:
> 
> 513 static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
> 514 {   
> 515     bool progress = false;
> 516     AioHandler *node;
> 517 
> 518     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
> 519         if (!node->deleted && node->io_poll &&
> 520             aio_node_check(ctx, node->is_external) &&
> 521             node->io_poll(node->opaque)) {
> 522             *timeout = 0;
> 523             if (node->opaque != &ctx->notifier) {
> 524                 progress = true;
> 525             }
> 526         }
> 527 
> 528         /* Caller handles freeing deleted nodes.  Don't do it here. */
> 529     }
> 530 
> 531     return progress;
> 532 }
> 
> 547 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
> 548 {
> 549     bool progress;
> 550     int64_t start_time, elapsed_time;
> 551 
> 552     assert(ctx->notify_me);
> 553     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
> 554 
> 555     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
> 556 
> 557     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> 558     do {
> 559         progress = run_poll_handlers_once(ctx, timeout);
> 560         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
> 561     } while (!progress && elapsed_time < max_ns
> 562              && !atomic_read(&ctx->poll_disable_cnt));
> (...)

For the record, Sergio, Paolo, and I discussed this more on IRC.  Sergio
is investigating the options here.

Stefan
diff mbox series

Patch

diff --git a/util/thread-pool.c b/util/thread-pool.c
index 610646d131..058ed7f0ae 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -61,6 +61,7 @@  struct ThreadPool {
     QemuSemaphore sem;
     int max_threads;
     QEMUBH *new_thread_bh;
+    EventNotifier e;
 
     /* The following variables are only accessed from one AioContext. */
     QLIST_HEAD(, ThreadPoolElement) head;
@@ -72,6 +73,7 @@  struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     bool stopping;
+    bool event_notifier_enabled;
 };
 
 static void *worker_thread(void *opaque)
@@ -108,6 +110,9 @@  static void *worker_thread(void *opaque)
         /* Write ret before state.  */
         smp_wmb();
         req->state = THREAD_DONE;
+        if (pool->event_notifier_enabled) {
+            event_notifier_set(&pool->e);
+        }
 
         qemu_mutex_lock(&pool->lock);
 
@@ -160,10 +165,10 @@  static void spawn_thread(ThreadPool *pool)
     }
 }
 
-static void thread_pool_completion_bh(void *opaque)
+static bool thread_pool_process_completions(ThreadPool *pool)
 {
-    ThreadPool *pool = opaque;
     ThreadPoolElement *elem, *next;
+    bool progress = false;
 
     aio_context_acquire(pool->ctx);
 restart:
@@ -172,6 +177,8 @@  restart:
             continue;
         }
 
+        progress = true;
+
         trace_thread_pool_complete(pool, elem, elem->common.opaque,
                                    elem->ret);
         QLIST_REMOVE(elem, all);
@@ -202,6 +209,32 @@  restart:
         }
     }
     aio_context_release(pool->ctx);
+
+    return progress;
+}
+
+static void thread_pool_completion_bh(void *opaque)
+{
+    ThreadPool *pool = opaque;
+
+    thread_pool_process_completions(pool);
+}
+
+static void thread_pool_completion_cb(EventNotifier *e)
+{
+    ThreadPool *pool = container_of(e, ThreadPool, e);
+
+    if (event_notifier_test_and_clear(&pool->e)) {
+        thread_pool_completion_bh(pool);
+    }
+}
+
+static bool thread_pool_poll_cb(void *opaque)
+{
+    EventNotifier *e = opaque;
+    ThreadPool *pool = container_of(e, ThreadPool, e);
+
+    return thread_pool_process_completions(pool);
 }
 
 static void thread_pool_cancel(BlockAIOCB *acb)
@@ -311,6 +344,13 @@  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
+    if (event_notifier_init(&pool->e, false) >= 0) {
+        aio_set_event_notifier(ctx, &pool->e, false,
+                               thread_pool_completion_cb,
+                               thread_pool_poll_cb);
+        pool->event_notifier_enabled = true;
+    }
+
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
 }
@@ -346,6 +386,10 @@  void thread_pool_free(ThreadPool *pool)
 
     qemu_mutex_unlock(&pool->lock);
 
+    if (pool->event_notifier_enabled) {
+        aio_set_event_notifier(pool->ctx, &pool->e, false, NULL, NULL);
+        event_notifier_cleanup(&pool->e);
+    }
     qemu_bh_delete(pool->completion_bh);
     qemu_sem_destroy(&pool->sem);
     qemu_cond_destroy(&pool->worker_stopped);