diff mbox series

[v4,3/3] thread-pool: use ThreadPool from the running thread

Message ID 20221031125936.3458740-4-eesposit@redhat.com
State New
Headers show
Series AioContext removal: LinuxAioState and ThreadPool | expand

Commit Message

Emanuele Giuseppe Esposito Oct. 31, 2022, 12:59 p.m. UTC
Use qemu_get_current_aio_context() where possible, since we always
submit work to the current thread anyways.

We want to also be sure that the thread submitting the work is
the same as the one processing the pool, to avoid adding
synchronization to the pool list.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block/file-posix.c    | 21 ++++++++++-----------
 block/file-win32.c    |  2 +-
 block/qcow2-threads.c |  2 +-
 util/thread-pool.c    |  9 ++++-----
 4 files changed, 16 insertions(+), 18 deletions(-)

Comments

Stefan Hajnoczi Oct. 31, 2022, 7:48 p.m. UTC | #1
On Mon, Oct 31, 2022 at 08:59:36AM -0400, Emanuele Giuseppe Esposito wrote:
> @@ -251,6 +247,9 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,

Documentation must be added to explain that thread_pool_submit_aio(),
thread_pool_submit_co(), and thread_pool_submit() must be called on the
thread's current AioContext's ThreadPool.

>  {
>      ThreadPoolElement *req;
>  
> +    /* Assert that the thread submitting work is the same running the pool */
> +    assert(pool->ctx == qemu_get_current_aio_context());

Did you decide not to remove the ThreadPool *pool argument from this
function because it requires too many changes? All callers must pass
aio_get_thread_pool(qemu_get_current_aio_context()) so it seems like the
argument is unnecessary?
diff mbox series

Patch

diff --git a/block/file-posix.c b/block/file-posix.c
index 3800dbd222..28f12b08c8 100644
--- a/block/file-posix.c
+++ b/block/file-posix.c
@@ -2044,11 +2044,10 @@  out:
     return result;
 }
 
-static int coroutine_fn raw_thread_pool_submit(BlockDriverState *bs,
-                                               ThreadPoolFunc func, void *arg)
+static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
 {
     /* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
-    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
     return thread_pool_submit_co(pool, func, arg);
 }
 
@@ -2116,7 +2115,7 @@  static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
     };
 
     assert(qiov->size == bytes);
-    return raw_thread_pool_submit(bs, handle_aiocb_rw, &acb);
+    return raw_thread_pool_submit(handle_aiocb_rw, &acb);
 }
 
 static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
@@ -2186,7 +2185,7 @@  static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
         return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
     }
 #endif
-    return raw_thread_pool_submit(bs, handle_aiocb_flush, &acb);
+    return raw_thread_pool_submit(handle_aiocb_flush, &acb);
 }
 
 static void raw_aio_attach_aio_context(BlockDriverState *bs,
@@ -2248,7 +2247,7 @@  raw_regular_truncate(BlockDriverState *bs, int fd, int64_t offset,
         },
     };
 
-    return raw_thread_pool_submit(bs, handle_aiocb_truncate, &acb);
+    return raw_thread_pool_submit(handle_aiocb_truncate, &acb);
 }
 
 static int coroutine_fn raw_co_truncate(BlockDriverState *bs, int64_t offset,
@@ -2998,7 +2997,7 @@  raw_do_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes,
         acb.aio_type |= QEMU_AIO_BLKDEV;
     }
 
-    ret = raw_thread_pool_submit(bs, handle_aiocb_discard, &acb);
+    ret = raw_thread_pool_submit(handle_aiocb_discard, &acb);
     raw_account_discard(s, bytes, ret);
     return ret;
 }
@@ -3073,7 +3072,7 @@  raw_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
         handler = handle_aiocb_write_zeroes;
     }
 
-    return raw_thread_pool_submit(bs, handler, &acb);
+    return raw_thread_pool_submit(handler, &acb);
 }
 
 static int coroutine_fn raw_co_pwrite_zeroes(
@@ -3284,7 +3283,7 @@  static int coroutine_fn raw_co_copy_range_to(BlockDriverState *bs,
         },
     };
 
-    return raw_thread_pool_submit(bs, handle_aiocb_copy_range, &acb);
+    return raw_thread_pool_submit(handle_aiocb_copy_range, &acb);
 }
 
 BlockDriver bdrv_file = {
@@ -3614,7 +3613,7 @@  hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
         struct sg_io_hdr *io_hdr = buf;
         if (io_hdr->cmdp[0] == PERSISTENT_RESERVE_OUT ||
             io_hdr->cmdp[0] == PERSISTENT_RESERVE_IN) {
-            return pr_manager_execute(s->pr_mgr, bdrv_get_aio_context(bs),
+            return pr_manager_execute(s->pr_mgr, qemu_get_current_aio_context(),
                                       s->fd, io_hdr);
         }
     }
@@ -3630,7 +3629,7 @@  hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
         },
     };
 
-    return raw_thread_pool_submit(bs, handle_aiocb_ioctl, &acb);
+    return raw_thread_pool_submit(handle_aiocb_ioctl, &acb);
 }
 #endif /* linux */
 
diff --git a/block/file-win32.c b/block/file-win32.c
index ec9d64d0e4..3d7f59a592 100644
--- a/block/file-win32.c
+++ b/block/file-win32.c
@@ -167,7 +167,7 @@  static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
     acb->aio_offset = offset;
 
     trace_file_paio_submit(acb, opaque, offset, count, type);
-    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    pool = aio_get_thread_pool(qemu_get_current_aio_context());
     return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
diff --git a/block/qcow2-threads.c b/block/qcow2-threads.c
index 1914baf456..9e370acbb3 100644
--- a/block/qcow2-threads.c
+++ b/block/qcow2-threads.c
@@ -42,7 +42,7 @@  qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
 {
     int ret;
     BDRVQcow2State *s = bs->opaque;
-    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
 
     qemu_co_mutex_lock(&s->lock);
     while (s->nb_threads >= QCOW2_MAX_THREADS) {
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 31113b5860..a70abb8a59 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -48,7 +48,7 @@  struct ThreadPoolElement {
     /* Access to this list is protected by lock.  */
     QTAILQ_ENTRY(ThreadPoolElement) reqs;
 
-    /* Access to this list is protected by the global mutex.  */
+    /* This list is only written by the thread pool's mother thread.  */
     QLIST_ENTRY(ThreadPoolElement) all;
 };
 
@@ -175,7 +175,6 @@  static void thread_pool_completion_bh(void *opaque)
     ThreadPool *pool = opaque;
     ThreadPoolElement *elem, *next;
 
-    aio_context_acquire(pool->ctx);
 restart:
     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
         if (elem->state != THREAD_DONE) {
@@ -195,9 +194,7 @@  restart:
              */
             qemu_bh_schedule(pool->completion_bh);
 
-            aio_context_release(pool->ctx);
             elem->common.cb(elem->common.opaque, elem->ret);
-            aio_context_acquire(pool->ctx);
 
             /* We can safely cancel the completion_bh here regardless of someone
              * else having scheduled it meanwhile because we reenter the
@@ -211,7 +208,6 @@  restart:
             qemu_aio_unref(elem);
         }
     }
-    aio_context_release(pool->ctx);
 }
 
 static void thread_pool_cancel(BlockAIOCB *acb)
@@ -251,6 +247,9 @@  BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
 {
     ThreadPoolElement *req;
 
+    /* Assert that the thread submitting work is the same running the pool */
+    assert(pool->ctx == qemu_get_current_aio_context());
+
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
     req->func = func;
     req->arg = arg;