diff mbox

[2/3] Make paio subsystem use threadlets

Message ID 20101019174320.16514.92329.stgit@localhost6.localdomain6
State New
Headers show

Commit Message

Arun Bharadwaj Oct. 19, 2010, 5:43 p.m. UTC
From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 posix-aio-compat.c |  166 +++++++++-------------------------------------------
 1 files changed, 30 insertions(+), 136 deletions(-)

Comments

Balbir Singh Oct. 20, 2010, 2:24 a.m. UTC | #1
* Arun R B <arun@linux.vnet.ibm.com> [2010-10-19 23:13:20]:

> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
> 
> The patch has been tested with fstress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>

This change seems reasonable to me

Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
Kevin Wolf Oct. 20, 2010, 8:42 a.m. UTC | #2
Am 19.10.2010 19:43, schrieb Arun R Bharadwaj:
> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
> 
> The patch has been tested with fstress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>  1 files changed, 30 insertions(+), 136 deletions(-)
> 
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
>  #include "block_int.h"
>  
>  #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>  
>  
>  struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
>      struct qemu_paiocb *next;
>  
>      int async_context_id;
> +    ThreadletWork work;
>  };
>  
>  typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>  } PosixAioState;
>  
>  
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
>  #ifdef CONFIG_PREADV
>  static int preadv_present = 1;
>  #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
>      die2(errno, what);
>  }
>  
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_lock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_unlock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> -                           struct timespec *ts)
> -{
> -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> -    return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> -    int ret = pthread_cond_signal(cond);
> -    if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> -                          void *(*start_routine)(void*), void *arg)
> -{
> -    int ret = pthread_create(thread, attr, start_routine, arg);
> -    if (ret) die2(ret, "pthread_create");
> -}
> -
>  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>  {
>      int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
>  
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>      pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>  
>      pid = getpid();
> +    aiocb->active = 1;
>  
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>      }
>  
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> +    aiocb->ret = ret;
>  
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> -
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) die("kill failed");

Please move the die on a line of itself and add braces.

>  }
>  
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
>      aiocb->ret = -EINPROGRESS;
>      aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_threadletwork(&aiocb->work);
>  }
>  
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>      ssize_t ret;
>  
> -    mutex_lock(&lock);
>      ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>      return ret;
>  }
>  
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>      struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>      int active = 0;
>  
> -    mutex_lock(&lock);
>      if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!cancel_threadletwork(&acb->work))
> +            acb->ret = -ECANCELED;
> +         else
> +            active = 1;

Missing braces.

Kevin
Stefan Hajnoczi Oct. 20, 2010, 9:30 a.m. UTC | #3
On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>  1 files changed, 30 insertions(+), 136 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
>  #include "block_int.h"
>
>  #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>
>
>  struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
>     struct qemu_paiocb *next;
>
>     int async_context_id;
> +    ThreadletWork work;

The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.

>  };
>
>  typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>  } PosixAioState;
>
>
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
>  #ifdef CONFIG_PREADV
>  static int preadv_present = 1;
>  #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
>     die2(errno, what);
>  }
>
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_lock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_unlock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> -                           struct timespec *ts)
> -{
> -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> -    return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> -    int ret = pthread_cond_signal(cond);
> -    if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> -                          void *(*start_routine)(void*), void *arg)
> -{
> -    int ret = pthread_create(thread, attr, start_routine, arg);
> -    if (ret) die2(ret, "pthread_create");
> -}
> -
>  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>  {
>     int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>     return nbytes;
>  }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>     pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>
>     pid = getpid();
> +    aiocb->active = 1;
>
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>     }
>
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> +    aiocb->ret = ret;
>
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> -
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
>  }
>
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
>     aiocb->ret = -EINPROGRESS;
>     aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_threadletwork(&aiocb->work);
>  }
>
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>     return ret;
>  }
>
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>     int active = 0;
>
> -    mutex_lock(&lock);
>     if (!acb->active) {

I'm not sure the active field serves any purpose.  No memory barriers
are used so the value of active is 0 before the work is executed and 0
*or* 1 while the work is executed.

The cancel_threadletwork() function already indicates whether
cancellation succeeded.  Why not just try to cancel instead of using
the active field?

> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!cancel_threadletwork(&acb->work))
> +            acb->ret = -ECANCELED;
> +         else
> +            active = 1;

The 0 and 1 return value from cancel_threadletwork() is inverted.  See
also my comment on patch 1/3 in this series.

>     } else if (acb->ret == -EINPROGRESS) {
>         active = 1;
>     }
> -    mutex_unlock(&lock);
>
>     if (active) {
>         /* fail safe: if the aio could not be canceled, we wait for

while (qemu_paio_error(acb) == EINPROGRESS)
    ;

Tight loop with no memory barrier reading a memory location that is
updated by another thread.  We shouldn't communicate between threads
without barriers.

Stefan
Anthony Liguori Oct. 20, 2010, 1:16 p.m. UTC | #4
On 10/20/2010 04:30 AM, Stefan Hajnoczi wrote:
>
>>      } else if (acb->ret == -EINPROGRESS) {
>>          active = 1;
>>      }
>> -    mutex_unlock(&lock);
>>
>>      if (active) {
>>          /* fail safe: if the aio could not be canceled, we wait for
>>      
> while (qemu_paio_error(acb) == EINPROGRESS)
>      ;
>
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread.  We shouldn't communicate between threads
> without barriers.
>    

We shouldn't use a tight loop period.  A condition should be used if 
signalling is needed.

And we shouldn't rely on atomic assignments to communicate between 
threads.  Just use a mutex and avoid being fancier than we need to be.

Regards,

Anthony Liguori

> Stefan
>
>
Arun Bharadwaj Oct. 21, 2010, 8:40 a.m. UTC | #5
* Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:

> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > From: Gautham R Shenoy <ego@in.ibm.com>
> >
> > This patch makes the paio subsystem use the threadlet framework thereby
> > decoupling asynchronous threading framework portion out of
> > posix-aio-compat.c
> >
> > The patch has been tested with fstress.
> >
> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> > ---
> >  posix-aio-compat.c |  166 +++++++++-------------------------------------------
> >  1 files changed, 30 insertions(+), 136 deletions(-)
> >
> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> > index 7b862b5..6977c18 100644
> > --- a/posix-aio-compat.c
> > +++ b/posix-aio-compat.c
> > @@ -29,6 +29,7 @@
> >  #include "block_int.h"
> >
> >  #include "block/raw-posix-aio.h"
> > +#include "qemu-threadlets.h"
> >
> >
> >  struct qemu_paiocb {
> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
> >     struct qemu_paiocb *next;
> >
> >     int async_context_id;
> > +    ThreadletWork work;
> 
> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
> 
> >  };
> >
> >  typedef struct PosixAioState {
> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> >  } PosixAioState;
> >
> >
> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> > -static pthread_t thread_id;
> > -static pthread_attr_t attr;
> > -static int max_threads = 64;
> > -static int cur_threads = 0;
> > -static int idle_threads = 0;
> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> > -
> >  #ifdef CONFIG_PREADV
> >  static int preadv_present = 1;
> >  #else
> > @@ -85,39 +78,6 @@ static void die(const char *what)
> >     die2(errno, what);
> >  }
> >
> > -static void mutex_lock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_lock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_lock");
> > -}
> > -
> > -static void mutex_unlock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_unlock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_unlock");
> > -}
> > -
> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> > -                           struct timespec *ts)
> > -{
> > -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> > -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> > -    return ret;
> > -}
> > -
> > -static void cond_signal(pthread_cond_t *cond)
> > -{
> > -    int ret = pthread_cond_signal(cond);
> > -    if (ret) die2(ret, "pthread_cond_signal");
> > -}
> > -
> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> > -                          void *(*start_routine)(void*), void *arg)
> > -{
> > -    int ret = pthread_create(thread, attr, start_routine, arg);
> > -    if (ret) die2(ret, "pthread_create");
> > -}
> > -
> >  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> >  {
> >     int ret;
> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> >     return nbytes;
> >  }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> >  {
> >     pid_t pid;
> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> > +    ssize_t ret = 0;
> >
> >     pid = getpid();
> > +    aiocb->active = 1;
> >
> > -    while (1) {
> > -        struct qemu_paiocb *aiocb;
> > -        ssize_t ret = 0;
> > -        qemu_timeval tv;
> > -        struct timespec ts;
> > -
> > -        qemu_gettimeofday(&tv);
> > -        ts.tv_sec = tv.tv_sec + 10;
> > -        ts.tv_nsec = 0;
> > -
> > -        mutex_lock(&lock);
> > -
> > -        while (QTAILQ_EMPTY(&request_list) &&
> > -               !(ret == ETIMEDOUT)) {
> > -            ret = cond_timedwait(&cond, &lock, &ts);
> > -        }
> > -
> > -        if (QTAILQ_EMPTY(&request_list))
> > -            break;
> > -
> > -        aiocb = QTAILQ_FIRST(&request_list);
> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
> > -        aiocb->active = 1;
> > -        idle_threads--;
> > -        mutex_unlock(&lock);
> > -
> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > -        case QEMU_AIO_READ:
> > -        case QEMU_AIO_WRITE:
> > -            ret = handle_aiocb_rw(aiocb);
> > -            break;
> > -        case QEMU_AIO_FLUSH:
> > -            ret = handle_aiocb_flush(aiocb);
> > -            break;
> > -        case QEMU_AIO_IOCTL:
> > -            ret = handle_aiocb_ioctl(aiocb);
> > -            break;
> > -        default:
> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > -            ret = -EINVAL;
> > -            break;
> > -        }
> > -
> > -        mutex_lock(&lock);
> > -        aiocb->ret = ret;
> > -        idle_threads++;
> > -        mutex_unlock(&lock);
> > -
> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > +    case QEMU_AIO_READ:
> > +    case QEMU_AIO_WRITE:
> > +        ret = handle_aiocb_rw(aiocb);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        ret = handle_aiocb_flush(aiocb);
> > +        break;
> > +    case QEMU_AIO_IOCTL:
> > +        ret = handle_aiocb_ioctl(aiocb);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > +        ret = -EINVAL;
> > +        break;
> >     }
> >
> > -    idle_threads--;
> > -    cur_threads--;
> > -    mutex_unlock(&lock);
> > +    aiocb->ret = ret;
> >
> > -    return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > -    sigset_t set, oldset;
> > -
> > -    cur_threads++;
> > -    idle_threads++;
> > -
> > -    /* block all signals */
> > -    if (sigfillset(&set)) die("sigfillset");
> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
> >  }
> >
> >  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> >  {
> >     aiocb->ret = -EINPROGRESS;
> >     aiocb->active = 0;
> > -    mutex_lock(&lock);
> > -    if (idle_threads == 0 && cur_threads < max_threads)
> > -        spawn_thread();
> > -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> > -    mutex_unlock(&lock);
> > -    cond_signal(&cond);
> > +
> > +    aiocb->work.func = aio_thread;
> > +    submit_threadletwork(&aiocb->work);
> >  }
> >
> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> >  {
> >     ssize_t ret;
> >
> > -    mutex_lock(&lock);
> >     ret = aiocb->ret;
> > -    mutex_unlock(&lock);
> > -
> >     return ret;
> >  }
> >
> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> >     int active = 0;
> >
> > -    mutex_lock(&lock);
> >     if (!acb->active) {
> 
> I'm not sure the active field serves any purpose.  No memory barriers
> are used so the value of active is 0 before the work is executed and 0
> *or* 1 while the work is executed.
> 
> The cancel_threadletwork() function already indicates whether
> cancellation succeeded.  Why not just try to cancel instead of using
> the active field?
> 

This series does not touch the active field anywhere. So I feel we can
implement this as a separate patch instead of clubbing it with this.

-arun
> > -        QTAILQ_REMOVE(&request_list, acb, node);
> > -        acb->ret = -ECANCELED;
> > +        if (!cancel_threadletwork(&acb->work))
> > +            acb->ret = -ECANCELED;
> > +         else
> > +            active = 1;
> 
> The 0 and 1 return value from cancel_threadletwork() is inverted.  See
> also my comment on patch 1/3 in this series.
> 
> >     } else if (acb->ret == -EINPROGRESS) {
> >         active = 1;
> >     }
> > -    mutex_unlock(&lock);
> >
> >     if (active) {
> >         /* fail safe: if the aio could not be canceled, we wait for
> 
> while (qemu_paio_error(acb) == EINPROGRESS)
>     ;
> 
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread.  We shouldn't communicate between threads
> without barriers.
> 
> Stefan
>
Stefan Hajnoczi Oct. 21, 2010, 9:17 a.m. UTC | #6
On Thu, Oct 21, 2010 at 9:40 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:
>
>> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>> > From: Gautham R Shenoy <ego@in.ibm.com>
>> >
>> > This patch makes the paio subsystem use the threadlet framework thereby
>> > decoupling asynchronous threading framework portion out of
>> > posix-aio-compat.c
>> >
>> > The patch has been tested with fstress.
>> >
>> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
>> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
>> > ---
>> >  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>> >  1 files changed, 30 insertions(+), 136 deletions(-)
>> >
>> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>> > index 7b862b5..6977c18 100644
>> > --- a/posix-aio-compat.c
>> > +++ b/posix-aio-compat.c
>> > @@ -29,6 +29,7 @@
>> >  #include "block_int.h"
>> >
>> >  #include "block/raw-posix-aio.h"
>> > +#include "qemu-threadlets.h"
>> >
>> >
>> >  struct qemu_paiocb {
>> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
>> >     struct qemu_paiocb *next;
>> >
>> >     int async_context_id;
>> > +    ThreadletWork work;
>>
>> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
>>
>> >  };
>> >
>> >  typedef struct PosixAioState {
>> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>> >  } PosixAioState;
>> >
>> >
>> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>> > -static pthread_t thread_id;
>> > -static pthread_attr_t attr;
>> > -static int max_threads = 64;
>> > -static int cur_threads = 0;
>> > -static int idle_threads = 0;
>> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
>> > -
>> >  #ifdef CONFIG_PREADV
>> >  static int preadv_present = 1;
>> >  #else
>> > @@ -85,39 +78,6 @@ static void die(const char *what)
>> >     die2(errno, what);
>> >  }
>> >
>> > -static void mutex_lock(pthread_mutex_t *mutex)
>> > -{
>> > -    int ret = pthread_mutex_lock(mutex);
>> > -    if (ret) die2(ret, "pthread_mutex_lock");
>> > -}
>> > -
>> > -static void mutex_unlock(pthread_mutex_t *mutex)
>> > -{
>> > -    int ret = pthread_mutex_unlock(mutex);
>> > -    if (ret) die2(ret, "pthread_mutex_unlock");
>> > -}
>> > -
>> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
>> > -                           struct timespec *ts)
>> > -{
>> > -    int ret = pthread_cond_timedwait(cond, mutex, ts);
>> > -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
>> > -    return ret;
>> > -}
>> > -
>> > -static void cond_signal(pthread_cond_t *cond)
>> > -{
>> > -    int ret = pthread_cond_signal(cond);
>> > -    if (ret) die2(ret, "pthread_cond_signal");
>> > -}
>> > -
>> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
>> > -                          void *(*start_routine)(void*), void *arg)
>> > -{
>> > -    int ret = pthread_create(thread, attr, start_routine, arg);
>> > -    if (ret) die2(ret, "pthread_create");
>> > -}
>> > -
>> >  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>> >  {
>> >     int ret;
>> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>> >     return nbytes;
>> >  }
>> >
>> > -static void *aio_thread(void *unused)
>> > +static void aio_thread(ThreadletWork *work)
>> >  {
>> >     pid_t pid;
>> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
>> > +    ssize_t ret = 0;
>> >
>> >     pid = getpid();
>> > +    aiocb->active = 1;
>> >
>> > -    while (1) {
>> > -        struct qemu_paiocb *aiocb;
>> > -        ssize_t ret = 0;
>> > -        qemu_timeval tv;
>> > -        struct timespec ts;
>> > -
>> > -        qemu_gettimeofday(&tv);
>> > -        ts.tv_sec = tv.tv_sec + 10;
>> > -        ts.tv_nsec = 0;
>> > -
>> > -        mutex_lock(&lock);
>> > -
>> > -        while (QTAILQ_EMPTY(&request_list) &&
>> > -               !(ret == ETIMEDOUT)) {
>> > -            ret = cond_timedwait(&cond, &lock, &ts);
>> > -        }
>> > -
>> > -        if (QTAILQ_EMPTY(&request_list))
>> > -            break;
>> > -
>> > -        aiocb = QTAILQ_FIRST(&request_list);
>> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
>> > -        aiocb->active = 1;
>> > -        idle_threads--;
>> > -        mutex_unlock(&lock);
>> > -
>> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > -        case QEMU_AIO_READ:
>> > -        case QEMU_AIO_WRITE:
>> > -            ret = handle_aiocb_rw(aiocb);
>> > -            break;
>> > -        case QEMU_AIO_FLUSH:
>> > -            ret = handle_aiocb_flush(aiocb);
>> > -            break;
>> > -        case QEMU_AIO_IOCTL:
>> > -            ret = handle_aiocb_ioctl(aiocb);
>> > -            break;
>> > -        default:
>> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > -            ret = -EINVAL;
>> > -            break;
>> > -        }
>> > -
>> > -        mutex_lock(&lock);
>> > -        aiocb->ret = ret;
>> > -        idle_threads++;
>> > -        mutex_unlock(&lock);
>> > -
>> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > +    case QEMU_AIO_READ:
>> > +    case QEMU_AIO_WRITE:
>> > +        ret = handle_aiocb_rw(aiocb);
>> > +        break;
>> > +    case QEMU_AIO_FLUSH:
>> > +        ret = handle_aiocb_flush(aiocb);
>> > +        break;
>> > +    case QEMU_AIO_IOCTL:
>> > +        ret = handle_aiocb_ioctl(aiocb);
>> > +        break;
>> > +    default:
>> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > +        ret = -EINVAL;
>> > +        break;
>> >     }
>> >
>> > -    idle_threads--;
>> > -    cur_threads--;
>> > -    mutex_unlock(&lock);
>> > +    aiocb->ret = ret;
>> >
>> > -    return NULL;
>> > -}
>> > -
>> > -static void spawn_thread(void)
>> > -{
>> > -    sigset_t set, oldset;
>> > -
>> > -    cur_threads++;
>> > -    idle_threads++;
>> > -
>> > -    /* block all signals */
>> > -    if (sigfillset(&set)) die("sigfillset");
>> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
>> > -
>> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
>> > -
>> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
>> > +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> >  }
>> >
>> >  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>> >  {
>> >     aiocb->ret = -EINPROGRESS;
>> >     aiocb->active = 0;
>> > -    mutex_lock(&lock);
>> > -    if (idle_threads == 0 && cur_threads < max_threads)
>> > -        spawn_thread();
>> > -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
>> > -    mutex_unlock(&lock);
>> > -    cond_signal(&cond);
>> > +
>> > +    aiocb->work.func = aio_thread;
>> > +    submit_threadletwork(&aiocb->work);
>> >  }
>> >
>> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>> >  {
>> >     ssize_t ret;
>> >
>> > -    mutex_lock(&lock);
>> >     ret = aiocb->ret;
>> > -    mutex_unlock(&lock);
>> > -
>> >     return ret;
>> >  }
>> >
>> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> >     int active = 0;
>> >
>> > -    mutex_lock(&lock);
>> >     if (!acb->active) {
>>
>> I'm not sure the active field serves any purpose.  No memory barriers
>> are used so the value of active is 0 before the work is executed and 0
>> *or* 1 while the work is executed.
>>
>> The cancel_threadletwork() function already indicates whether
>> cancellation succeeded.  Why not just try to cancel instead of using
>> the active field?
>>
>
> This series does not touch the active field anywhere. So I feel we can
> implement this as a separate patch instead of clubbing it with this.

I'd prefer for this to be addressed in this patch because the active
field served a function before but no longer works with threadlets.
You're right that the patch doesn't touch it, and QEMU still compiles
fine with it, but it's still broken as I described in the previous
email.

In other words, the patch breaks the active field, please fix it.

Stefan
diff mbox

Patch

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@ 
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -51,6 +52,7 @@  struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +61,6 @@  typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +78,6 @@  static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +261,51 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +441,14 @@  static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!cancel_threadletwork(&acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@  int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +549,6 @@  int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }