Message ID | 20110104052739.15887.60037.stgit@localhost6.localdomain6 |
---|---|
State | New |
Headers | show |
On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote: > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb) > } > } > > -static void paio_cancel(BlockDriverAIOCB *blockacb) > +/** > + * dequeue_work: Cancel a task queued on the global queue. > + * @work: Contains the information of the task that needs to be cancelled. > + * > + * Returns: 0 if the task is successfully cancelled. > + * 1 otherwise. > + */ > +static int dequeue_work(ThreadletWork *work) > { > - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; > - int active = 0; > + int ret = 1; > > qemu_mutex_lock(&globalqueue.lock); > - if (!acb->active) { > - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node); > - acb->ret = -ECANCELED; > - } else if (acb->ret == -EINPROGRESS) { > - active = 1; > - } > + QTAILQ_REMOVE(&globalqueue.request_list, work, node); > + ret = 0; > qemu_mutex_unlock(&globalqueue.lock); > > - qemu_mutex_lock(&aiocb_mutex); > - if (!active) { > - acb->ret = -ECANCELED; > - } else { > - while (acb->ret == -EINPROGRESS) { > - /* > - * fail safe: if the aio could not be canceled, > - * we wait for it > - */ > - qemu_cond_wait(&aiocb_completion, &aiocb_mutex); > + return ret; It always succeeds? Why bother with the ret local variable? > +} > + > +static void paio_cancel(BlockDriverAIOCB *blockacb) > +{ > + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; > + if (!acb->active) { > + if (dequeue_work(&acb->work) != 0) { > + /* Wait for running work item to complete */ > + qemu_mutex_lock(&aiocb_mutex); > + while (acb->ret == -EINPROGRESS) { > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex); > + } > + qemu_mutex_unlock(&aiocb_mutex); > } > } > - qemu_mutex_unlock(&aiocb_mutex); > + > paio_remove(acb); I'm not convinced this function works. If the request is active in a worker thread and paio_cancel() is called then we invoke paio_remove(). Stefan
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]: > On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote: > > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb) > > } > > } > > > > -static void paio_cancel(BlockDriverAIOCB *blockacb) > > +/** > > + * dequeue_work: Cancel a task queued on the global queue. > > + * @work: Contains the information of the task that needs to be cancelled. > > + * > > + * Returns: 0 if the task is successfully cancelled. > > + * 1 otherwise. > > + */ > > +static int dequeue_work(ThreadletWork *work) > > { > > - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; > > - int active = 0; > > + int ret = 1; > > > > qemu_mutex_lock(&globalqueue.lock); > > - if (!acb->active) { > > - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node); > > - acb->ret = -ECANCELED; > > - } else if (acb->ret == -EINPROGRESS) { > > - active = 1; > > - } > > + QTAILQ_REMOVE(&globalqueue.request_list, work, node); > > + ret = 0; > > qemu_mutex_unlock(&globalqueue.lock); > > > > - qemu_mutex_lock(&aiocb_mutex); > > - if (!active) { > > - acb->ret = -ECANCELED; > > - } else { > > - while (acb->ret == -EINPROGRESS) { > > - /* > > - * fail safe: if the aio could not be canceled, > > - * we wait for it > > - */ > > - qemu_cond_wait(&aiocb_completion, &aiocb_mutex); > > + return ret; > > It always succeeds? Why bother with the ret local variable? > Yes, I'll remove this. > > +} > > + > > +static void paio_cancel(BlockDriverAIOCB *blockacb) > > +{ > > + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; > > + if (!acb->active) { > > + if (dequeue_work(&acb->work) != 0) { > > + /* Wait for running work item to complete */ > > + qemu_mutex_lock(&aiocb_mutex); > > + while (acb->ret == -EINPROGRESS) { > > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex); > > + } > > + qemu_mutex_unlock(&aiocb_mutex); > > } > > } > > - qemu_mutex_unlock(&aiocb_mutex); > > + > > paio_remove(acb); > > I'm not convinced this function works. If the request is active in a > worker thread and paio_cancel() is called then we invoke paio_remove(). > True. So can we do this: Since we have a patch which separately removes the active field [PATCH 7/13], can we fold patch 7 and this patch into a single patch? So that way we can maintain the correctness, because we are actually waiting for the active work to complete by doing a while (acb->ret == -EINPROGRESS) -arun > Stefan
On Thu, Jan 6, 2011 at 10:43 AM, Arun R Bharadwaj <arun@linux.vnet.ibm.com> wrote: > * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]: > >> On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote: >> > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb) >> > } >> > } >> > >> > -static void paio_cancel(BlockDriverAIOCB *blockacb) >> > +/** >> > + * dequeue_work: Cancel a task queued on the global queue. >> > + * @work: Contains the information of the task that needs to be cancelled. >> > + * >> > + * Returns: 0 if the task is successfully cancelled. >> > + * 1 otherwise. >> > + */ >> > +static int dequeue_work(ThreadletWork *work) >> > { >> > - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; >> > - int active = 0; >> > + int ret = 1; >> > >> > qemu_mutex_lock(&globalqueue.lock); >> > - if (!acb->active) { >> > - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node); >> > - acb->ret = -ECANCELED; >> > - } else if (acb->ret == -EINPROGRESS) { >> > - active = 1; >> > - } >> > + QTAILQ_REMOVE(&globalqueue.request_list, work, node); >> > + ret = 0; >> > qemu_mutex_unlock(&globalqueue.lock); >> > >> > - qemu_mutex_lock(&aiocb_mutex); >> > - if (!active) { >> > - acb->ret = -ECANCELED; >> > - } else { >> > - while (acb->ret == -EINPROGRESS) { >> > - /* >> > - * fail safe: if the aio could not be canceled, >> > - * we wait for it >> > - */ >> > - qemu_cond_wait(&aiocb_completion, &aiocb_mutex); >> > + return ret; >> >> It always succeeds? Why bother with the ret local variable? >> > > Yes, I'll remove this. > >> > +} >> > + >> > +static void paio_cancel(BlockDriverAIOCB *blockacb) >> > +{ >> > + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; >> > + if (!acb->active) { >> > + if (dequeue_work(&acb->work) != 0) { >> > + /* Wait for running work item to complete */ >> > + qemu_mutex_lock(&aiocb_mutex); >> > + while (acb->ret == -EINPROGRESS) { >> > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex); >> > + } >> > + qemu_mutex_unlock(&aiocb_mutex); >> > } >> > } >> > - qemu_mutex_unlock(&aiocb_mutex); >> > + >> > paio_remove(acb); >> >> I'm not convinced this function works. If the request is active in a >> worker thread and paio_cancel() is called then we invoke paio_remove(). >> > > True. So can we do this: Since we have a patch which separately > removes the active field [PATCH 7/13], can we fold patch 7 and this > patch into a single patch? So that way we can maintain the > correctness, because we are actually waiting for the active work to > complete by doing a while (acb->ret == -EINPROGRESS) Sounds good. IIRC the paio_cancel() code in the last version of the patch looked correct, so it probably *is* just a case of not splitting this part up into separate patches. Stefan
diff --git a/posix-aio-compat.c b/posix-aio-compat.c index ff6e08b..8f1a9b6 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb) } } -static void paio_cancel(BlockDriverAIOCB *blockacb) +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +static int dequeue_work(ThreadletWork *work) { - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; - int active = 0; + int ret = 1; qemu_mutex_lock(&globalqueue.lock); - if (!acb->active) { - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node); - acb->ret = -ECANCELED; - } else if (acb->ret == -EINPROGRESS) { - active = 1; - } + QTAILQ_REMOVE(&globalqueue.request_list, work, node); + ret = 0; qemu_mutex_unlock(&globalqueue.lock); - qemu_mutex_lock(&aiocb_mutex); - if (!active) { - acb->ret = -ECANCELED; - } else { - while (acb->ret == -EINPROGRESS) { - /* - * fail safe: if the aio could not be canceled, - * we wait for it - */ - qemu_cond_wait(&aiocb_completion, &aiocb_mutex); + return ret; +} + +static void paio_cancel(BlockDriverAIOCB *blockacb) +{ + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; + if (!acb->active) { + if (dequeue_work(&acb->work) != 0) { + /* Wait for running work item to complete */ + qemu_mutex_lock(&aiocb_mutex); + while (acb->ret == -EINPROGRESS) { + qemu_cond_wait(&aiocb_completion, &aiocb_mutex); + } + qemu_mutex_unlock(&aiocb_mutex); } } - qemu_mutex_unlock(&aiocb_mutex); + paio_remove(acb); }
This patch adds dequeue_work threadlet API and shows how the paio_cancel changes to dequeue_work. Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com> --- posix-aio-compat.c | 46 ++++++++++++++++++++++++++-------------------- 1 files changed, 26 insertions(+), 20 deletions(-)