Patchwork [v3,02/10] Switch POSIX compat AIO to QEMU abstractions

login
register
mail settings
Submitter Jan Kiszka
Date April 5, 2012, 10:59 a.m.
Message ID <e0644ce36bcb5d00e3764e181529ed1750c62976.1333623555.git.jan.kiszka@siemens.com>
Download mbox | patch
Permalink /patch/151219/
State New
Headers show

Comments

Jan Kiszka - April 5, 2012, 10:59 a.m.
Although there is nothing to wrap for non-POSIX here, redirecting thread
and synchronization services to our core simplifies managements jobs
like scheduling parameter adjustment. It also frees compat AIO from some
duplicate code (/wrt qemu-thread).

Signed-off-by: Jan Kiszka <jan.kiszka@siemens.com>
---
 posix-aio-compat.c |  118 +++++++++++++++------------------------------------
 1 files changed, 35 insertions(+), 83 deletions(-)

Patch

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index d311d13..c9b8ebf 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -15,7 +15,6 @@ 
 
 #include <sys/ioctl.h>
 #include <sys/types.h>
-#include <pthread.h>
 #include <unistd.h>
 #include <errno.h>
 #include <time.h>
@@ -29,9 +28,12 @@ 
 #include "qemu-common.h"
 #include "trace.h"
 #include "block_int.h"
+#include "qemu-thread.h"
 
 #include "block/raw-posix-aio.h"
 
+#define AIO_THREAD_IDLE_TIMEOUT 10000 /* 10 s */
+
 static void do_spawn_thread(void);
 
 struct qemu_paiocb {
@@ -59,10 +61,9 @@  typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
+static QemuMutex lock;
+static QemuCond cond;
+static QemuThread thread;
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
@@ -88,39 +89,6 @@  static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -313,28 +281,26 @@  static void posix_aio_notify_event(void);
 
 static void *aio_thread(void *unused)
 {
-    mutex_lock(&lock);
+    qemu_mutex_lock(&lock);
     pending_threads--;
-    mutex_unlock(&lock);
+    qemu_mutex_unlock(&lock);
     do_spawn_thread();
 
     while (1) {
         struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
+        bool timed_out;
+        ssize_t ret;
 
-        mutex_lock(&lock);
+        qemu_mutex_lock(&lock);
 
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
+        while (QTAILQ_EMPTY(&request_list)) {
             idle_threads++;
-            ret = cond_timedwait(&cond, &lock, &ts);
+            timed_out = !qemu_cond_timedwait(&cond, &lock,
+                                             AIO_THREAD_IDLE_TIMEOUT);
             idle_threads--;
+            if (timed_out) {
+                break;
+            }
         }
 
         if (QTAILQ_EMPTY(&request_list))
@@ -343,7 +309,7 @@  static void *aio_thread(void *unused)
         aiocb = QTAILQ_FIRST(&request_list);
         QTAILQ_REMOVE(&request_list, aiocb, node);
         aiocb->active = 1;
-        mutex_unlock(&lock);
+        qemu_mutex_unlock(&lock);
 
         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
         case QEMU_AIO_READ:
@@ -375,41 +341,33 @@  static void *aio_thread(void *unused)
             break;
         }
 
-        mutex_lock(&lock);
+        qemu_mutex_lock(&lock);
         aiocb->ret = ret;
-        mutex_unlock(&lock);
+        qemu_mutex_unlock(&lock);
 
         posix_aio_notify_event();
     }
 
     cur_threads--;
-    mutex_unlock(&lock);
+    qemu_mutex_unlock(&lock);
 
     return NULL;
 }
 
 static void do_spawn_thread(void)
 {
-    sigset_t set, oldset;
-
-    mutex_lock(&lock);
+    qemu_mutex_lock(&lock);
     if (!new_threads) {
-        mutex_unlock(&lock);
+        qemu_mutex_unlock(&lock);
         return;
     }
 
     new_threads--;
     pending_threads++;
 
-    mutex_unlock(&lock);
+    qemu_mutex_unlock(&lock);
 
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    qemu_thread_create(&thread, aio_thread, NULL, QEMU_THREAD_DETACHED);
 }
 
 static void spawn_thread_bh_fn(void *opaque)
@@ -437,21 +395,21 @@  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
+    qemu_mutex_lock(&lock);
     if (idle_threads == 0 && cur_threads < max_threads)
         spawn_thread();
     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+    qemu_mutex_unlock(&lock);
+    qemu_cond_signal(&cond);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
+    qemu_mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
+    qemu_mutex_unlock(&lock);
 
     return ret;
 }
@@ -582,14 +540,14 @@  static void paio_cancel(BlockDriverAIOCB *blockacb)
 
     trace_paio_cancel(acb, acb->common.opaque);
 
-    mutex_lock(&lock);
+    qemu_mutex_lock(&lock);
     if (!acb->active) {
         QTAILQ_REMOVE(&request_list, acb, node);
         acb->ret = -ECANCELED;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
+    qemu_mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -655,11 +613,13 @@  int paio_init(void)
 {
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
 
+    qemu_mutex_init(&lock);
+    qemu_cond_init(&cond);
+
     s = g_malloc(sizeof(PosixAioState));
 
     s->first_aio = NULL;
@@ -678,14 +638,6 @@  int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
     QTAILQ_INIT(&request_list);
     new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);