diff mbox

[2/3] Make paio subsystem use threadlets

Message ID 20101021121050.25166.4411.stgit@localhost6.localdomain6
State New
Headers show

Commit Message

Arun Bharadwaj Oct. 21, 2010, 12:10 p.m. UTC
From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
---
 posix-aio-compat.c |  170 ++++++++++------------------------------------------
 1 files changed, 33 insertions(+), 137 deletions(-)

Comments

Stefan Hajnoczi Oct. 23, 2010, 11:57 a.m. UTC | #1
On Thu, Oct 21, 2010 at 1:10 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>     return ret;
>  }
>
> @@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>     int active = 0;
>
> -    mutex_lock(&lock);
>     if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!deque_threadletwork(&acb->work)) {
> +            acb->ret = -ECANCELED;
> +         } else {
> +            active = 1;
> +         }
>     } else if (acb->ret == -EINPROGRESS) {
>         active = 1;
>     }
> -    mutex_unlock(&lock);
>
>     if (active) {
>         /* fail safe: if the aio could not be canceled, we wait for

Here is the assembly listing of what happens next:

 454:posix-aio-compat.c ****         while (qemu_paio_error(acb) == EINPROGRESS)
 539 0347 48F7D8   		negq	%rax
 540 034a 83F873   		cmpl	$115, %eax
 541 034d 7581     		jne	.L46
 543              	.L58:
 545 0350 EBFD     		jmp	.L58

This while loop is an infinite loop.  The compiler doesn't need to
load acb->ret from memory.

The reason this loop worked before threadlets was because
qemu_paio_return() used to acquire a lock to access acb->ret, forcing
the return value to be loaded each iteration:

static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
    ssize_t ret;

    mutex_lock(&lock);
    ret = aiocb->ret;
    mutex_unlock(&lock);

    return ret;
}

Please use synchronization to wait on the request like Anthony suggested.

Stefan
diff mbox

Patch

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..2e47736 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@ 
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -44,13 +45,13 @@  struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +60,6 @@  typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +77,6 @@  static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +260,53 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
+    aiocb->ret = ret;
 
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+        die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +442,15 @@  static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!deque_threadletwork(&acb->work)) {
+            acb->ret = -ECANCELED;
+         } else {
+            active = 1;
+         }
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +525,6 @@  int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +551,6 @@  int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }