Patchwork [06/13] Threadlet: Add dequeue_work threadlet API

login
register
mail settings
Submitter Arun Bharadwaj
Date Jan. 4, 2011, 5:27 a.m.
Message ID <20110104052739.15887.60037.stgit@localhost6.localdomain6>
Download mbox | patch
Permalink /patch/77383/
State New
Headers show

Comments

Arun Bharadwaj - Jan. 4, 2011, 5:27 a.m.
This patch adds dequeue_work threadlet API and shows how
the paio_cancel changes to dequeue_work.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 posix-aio-compat.c |   46 ++++++++++++++++++++++++++--------------------
 1 files changed, 26 insertions(+), 20 deletions(-)
Stefan Hajnoczi - Jan. 5, 2011, 7:55 p.m.
On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
> @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
>      }
>  }
>  
> -static void paio_cancel(BlockDriverAIOCB *blockacb)
> +/**
> + * dequeue_work: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + *          1 otherwise.
> + */
> +static int dequeue_work(ThreadletWork *work)
>  {
> -    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> -    int active = 0;
> +    int ret = 1;
>  
>      qemu_mutex_lock(&globalqueue.lock);
> -    if (!acb->active) {
> -        QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
> -        acb->ret = -ECANCELED;
> -    } else if (acb->ret == -EINPROGRESS) {
> -        active = 1;
> -    }
> +    QTAILQ_REMOVE(&globalqueue.request_list, work, node);
> +    ret = 0;
>      qemu_mutex_unlock(&globalqueue.lock);
>  
> -    qemu_mutex_lock(&aiocb_mutex);
> -    if (!active) {
> -        acb->ret = -ECANCELED;
> -    } else {
> -        while (acb->ret == -EINPROGRESS) {
> -            /*
> -             * fail safe: if the aio could not be canceled,
> -             * we wait for it
> -             */
> -            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> +    return ret;

It always succeeds?  Why bother with the ret local variable?

> +}
> +
> +static void paio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> +    if (!acb->active) {
> +        if (dequeue_work(&acb->work) != 0) {
> +            /* Wait for running work item to complete */
> +            qemu_mutex_lock(&aiocb_mutex);
> +            while (acb->ret == -EINPROGRESS) {
> +                qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> +            }
> +            qemu_mutex_unlock(&aiocb_mutex);
>          }
>      }
> -    qemu_mutex_unlock(&aiocb_mutex);
> +
>      paio_remove(acb);

I'm not convinced this function works.  If the request is active in a
worker thread and paio_cancel() is called then we invoke paio_remove().

Stefan
Arun Bharadwaj - Jan. 6, 2011, 10:43 a.m.
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]:

> On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
> > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
> >      }
> >  }
> >  
> > -static void paio_cancel(BlockDriverAIOCB *blockacb)
> > +/**
> > + * dequeue_work: Cancel a task queued on the global queue.
> > + * @work: Contains the information of the task that needs to be cancelled.
> > + *
> > + * Returns: 0 if the task is successfully cancelled.
> > + *          1 otherwise.
> > + */
> > +static int dequeue_work(ThreadletWork *work)
> >  {
> > -    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> > -    int active = 0;
> > +    int ret = 1;
> >  
> >      qemu_mutex_lock(&globalqueue.lock);
> > -    if (!acb->active) {
> > -        QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
> > -        acb->ret = -ECANCELED;
> > -    } else if (acb->ret == -EINPROGRESS) {
> > -        active = 1;
> > -    }
> > +    QTAILQ_REMOVE(&globalqueue.request_list, work, node);
> > +    ret = 0;
> >      qemu_mutex_unlock(&globalqueue.lock);
> >  
> > -    qemu_mutex_lock(&aiocb_mutex);
> > -    if (!active) {
> > -        acb->ret = -ECANCELED;
> > -    } else {
> > -        while (acb->ret == -EINPROGRESS) {
> > -            /*
> > -             * fail safe: if the aio could not be canceled,
> > -             * we wait for it
> > -             */
> > -            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> > +    return ret;
> 
> It always succeeds?  Why bother with the ret local variable?
>

Yes, I'll remove this.
 
> > +}
> > +
> > +static void paio_cancel(BlockDriverAIOCB *blockacb)
> > +{
> > +    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> > +    if (!acb->active) {
> > +        if (dequeue_work(&acb->work) != 0) {
> > +            /* Wait for running work item to complete */
> > +            qemu_mutex_lock(&aiocb_mutex);
> > +            while (acb->ret == -EINPROGRESS) {
> > +                qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> > +            }
> > +            qemu_mutex_unlock(&aiocb_mutex);
> >          }
> >      }
> > -    qemu_mutex_unlock(&aiocb_mutex);
> > +
> >      paio_remove(acb);
> 
> I'm not convinced this function works.  If the request is active in a
> worker thread and paio_cancel() is called then we invoke paio_remove().
> 

True. So can we do this: Since we have a patch which separately
removes the active field [PATCH 7/13], can we fold patch 7 and this
patch into a single patch? So that way we can maintain the
correctness, because we are actually waiting for the active work to
complete by doing a while (acb->ret == -EINPROGRESS)

-arun

> Stefan
Stefan Hajnoczi - Jan. 7, 2011, 11:06 a.m.
On Thu, Jan 6, 2011 at 10:43 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]:
>
>> On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
>> > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
>> >      }
>> >  }
>> >
>> > -static void paio_cancel(BlockDriverAIOCB *blockacb)
>> > +/**
>> > + * dequeue_work: Cancel a task queued on the global queue.
>> > + * @work: Contains the information of the task that needs to be cancelled.
>> > + *
>> > + * Returns: 0 if the task is successfully cancelled.
>> > + *          1 otherwise.
>> > + */
>> > +static int dequeue_work(ThreadletWork *work)
>> >  {
>> > -    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> > -    int active = 0;
>> > +    int ret = 1;
>> >
>> >      qemu_mutex_lock(&globalqueue.lock);
>> > -    if (!acb->active) {
>> > -        QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
>> > -        acb->ret = -ECANCELED;
>> > -    } else if (acb->ret == -EINPROGRESS) {
>> > -        active = 1;
>> > -    }
>> > +    QTAILQ_REMOVE(&globalqueue.request_list, work, node);
>> > +    ret = 0;
>> >      qemu_mutex_unlock(&globalqueue.lock);
>> >
>> > -    qemu_mutex_lock(&aiocb_mutex);
>> > -    if (!active) {
>> > -        acb->ret = -ECANCELED;
>> > -    } else {
>> > -        while (acb->ret == -EINPROGRESS) {
>> > -            /*
>> > -             * fail safe: if the aio could not be canceled,
>> > -             * we wait for it
>> > -             */
>> > -            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
>> > +    return ret;
>>
>> It always succeeds?  Why bother with the ret local variable?
>>
>
> Yes, I'll remove this.
>
>> > +}
>> > +
>> > +static void paio_cancel(BlockDriverAIOCB *blockacb)
>> > +{
>> > +    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> > +    if (!acb->active) {
>> > +        if (dequeue_work(&acb->work) != 0) {
>> > +            /* Wait for running work item to complete */
>> > +            qemu_mutex_lock(&aiocb_mutex);
>> > +            while (acb->ret == -EINPROGRESS) {
>> > +                qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
>> > +            }
>> > +            qemu_mutex_unlock(&aiocb_mutex);
>> >          }
>> >      }
>> > -    qemu_mutex_unlock(&aiocb_mutex);
>> > +
>> >      paio_remove(acb);
>>
>> I'm not convinced this function works.  If the request is active in a
>> worker thread and paio_cancel() is called then we invoke paio_remove().
>>
>
> True. So can we do this: Since we have a patch which separately
> removes the active field [PATCH 7/13], can we fold patch 7 and this
> patch into a single patch? So that way we can maintain the
> correctness, because we are actually waiting for the active work to
> complete by doing a while (acb->ret == -EINPROGRESS)

Sounds good.  IIRC the paio_cancel() code in the last version of the
patch looked correct, so it probably *is* just a case of not splitting
this part up into separate patches.

Stefan

Patch

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index ff6e08b..8f1a9b6 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -574,33 +574,39 @@  static void paio_remove(struct qemu_paiocb *acb)
     }
 }
 
-static void paio_cancel(BlockDriverAIOCB *blockacb)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
 {
-    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
-    int active = 0;
+    int ret = 1;
 
     qemu_mutex_lock(&globalqueue.lock);
-    if (!acb->active) {
-        QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
-        acb->ret = -ECANCELED;
-    } else if (acb->ret == -EINPROGRESS) {
-        active = 1;
-    }
+    QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+    ret = 0;
     qemu_mutex_unlock(&globalqueue.lock);
 
-    qemu_mutex_lock(&aiocb_mutex);
-    if (!active) {
-        acb->ret = -ECANCELED;
-    } else {
-        while (acb->ret == -EINPROGRESS) {
-            /*
-             * fail safe: if the aio could not be canceled,
-             * we wait for it
-             */
-            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+    return ret;
+}
+
+static void paio_cancel(BlockDriverAIOCB *blockacb)
+{
+    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
+    if (!acb->active) {
+        if (dequeue_work(&acb->work) != 0) {
+            /* Wait for running work item to complete */
+            qemu_mutex_lock(&aiocb_mutex);
+            while (acb->ret == -EINPROGRESS) {
+                qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+            }
+            qemu_mutex_unlock(&aiocb_mutex);
         }
     }
-    qemu_mutex_unlock(&aiocb_mutex);
+
     paio_remove(acb);
 }