diff mbox

[RFC/,2/2] qemu: Convert paio code to use the generic threading infrastructure.

Message ID 20100510061555.6600.69442.stgit@localhost.localdomain
State New
Headers show

Commit Message

Gautham R Shenoy May 10, 2010, 6:15 a.m. UTC
This patch makes the paio subsystem use the generic work offloading
infrastructure.

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 posix-aio-compat.c |  156 ++++++++++------------------------------------------
 1 files changed, 30 insertions(+), 126 deletions(-)

Comments

jvrao May 13, 2010, 5:52 a.m. UTC | #1
Gautham R Shenoy wrote:
> This patch makes the paio subsystem use the generic work offloading
> infrastructure.
> 
> The patch has been tested with fstress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>

Gauthem, surprisingly this makes the code really readable. 

Few comments below.

> ---
>  posix-aio-compat.c |  156 ++++++++++------------------------------------------
>  1 files changed, 30 insertions(+), 126 deletions(-)
> 
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index b43c531..1b405e4 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -28,6 +28,7 @@
>  #include "block_int.h"

<SKIP>

> -
>  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>  {
>  	int ret;
> @@ -300,47 +262,27 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
> 
> -static void *aio_thread(void *unused)
> +static void aio_thread(struct work_item *work)

May be we should name it different as it is not a thread by itself?

>  {
> -    pid_t pid;
> 
> -    pid = getpid();
> -
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> +    pid_t pid;
> 
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> +    struct qemu_paiocb *aiocb = (struct qemu_paiocb *) work->private;
> +    ssize_t ret = 0;
> 
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> +    pid = getpid();
> 
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> +    aiocb->active = 1;
> 
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
>  		ret = handle_aiocb_rw(aiocb);
>  		break;
> -        case QEMU_AIO_FLUSH:
> -                ret = handle_aiocb_flush(aiocb);
> -                break;
> -        case QEMU_AIO_IOCTL:
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
>  		ret = handle_aiocb_ioctl(aiocb);
>  		break;
>  	default:
> @@ -349,57 +291,29 @@ static void *aio_thread(void *unused)
>  		break;
>  	}
> 
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> -    }
> -
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> +    aiocb->ret = ret;
> 
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> -
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) die("kill failed");

Instead of doing a write/sending signal in the handler context, why not move this 
also to the thread pool infrastructure. That way, handlers can be little more segregated
from the aio thread mechanism.

Thanks,
JV

> +    async_work_release(&aio_request_list, work);
>  }
> 
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
> +    struct work_item *work;
> +
>      aiocb->ret = -EINPROGRESS;
>      aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +
> +    work = async_work_init(&aio_request_list, aio_thread, aiocb);
> +    aiocb->work = work;
> +    qemu_async_submit(&aio_request_list, work);
>  }
> 
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>      ssize_t ret;
> 
> -    mutex_lock(&lock);
>      ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>      return ret;
>  }
> 
> @@ -535,14 +449,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>      struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>      int active = 0;
> 
> -    mutex_lock(&lock);
>      if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!qemu_async_cancel_work(&aio_request_list, acb->work))
> +            acb->ret = -ECANCELED;
> +         else
> +            active = 1;
>      } else if (acb->ret == -EINPROGRESS) {
>          active = 1;
>      }
> -    mutex_unlock(&lock);
> 
>      if (active) {
>          /* fail safe: if the aio could not be canceled, we wait for
> @@ -615,7 +529,6 @@ int paio_init(void)
>      struct sigaction act;
>      PosixAioState *s;
>      int fds[2];
> -    int ret;
> 
>      if (posix_aio_state)
>          return 0;
> @@ -642,16 +555,7 @@ int paio_init(void)
>      qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
>          posix_aio_process_queue, s);
> 
> -    ret = pthread_attr_init(&attr);
> -    if (ret)
> -        die2(ret, "pthread_attr_init");
> -
> -    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> -    if (ret)
> -        die2(ret, "pthread_attr_setdetachstate");
> -
> -    QTAILQ_INIT(&request_list);
> -
>      posix_aio_state = s;
> +    async_queue_init(&aio_request_list, max_threads, max_threads);
>      return 0;
>  }
> 
> 
>
diff mbox

Patch

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index b43c531..1b405e4 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,6 +28,7 @@ 
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "async-work.h"
 
 
 struct qemu_paiocb {
@@ -50,6 +51,7 @@  struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    struct work_item *work;
 };
 
 typedef struct PosixAioState {
@@ -57,15 +59,8 @@  typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
 static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static struct async_queue aio_request_list;
 
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
@@ -84,39 +79,6 @@  static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
 	int ret;
@@ -300,47 +262,27 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(struct work_item *work)
 {
-    pid_t pid;
 
-    pid = getpid();
-
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
+    pid_t pid;
 
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
+    struct qemu_paiocb *aiocb = (struct qemu_paiocb *) work->private;
+    ssize_t ret = 0;
 
-        if (QTAILQ_EMPTY(&request_list))
-            break;
+    pid = getpid();
 
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
+    aiocb->active = 1;
 
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
 		ret = handle_aiocb_rw(aiocb);
 		break;
-        case QEMU_AIO_FLUSH:
-                ret = handle_aiocb_flush(aiocb);
-                break;
-        case QEMU_AIO_IOCTL:
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
 		ret = handle_aiocb_ioctl(aiocb);
 		break;
 	default:
@@ -349,57 +291,29 @@  static void *aio_thread(void *unused)
 		break;
 	}
 
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
-    }
-
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    async_work_release(&aio_request_list, work);
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
+    struct work_item *work;
+
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    work = async_work_init(&aio_request_list, aio_thread, aiocb);
+    aiocb->work = work;
+    qemu_async_submit(&aio_request_list, work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -535,14 +449,14 @@  static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!qemu_async_cancel_work(&aio_request_list, acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -615,7 +529,6 @@  int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -642,16 +555,7 @@  int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
+    async_queue_init(&aio_request_list, max_threads, max_threads);
     return 0;
 }