Patchwork rbd block driver fix race between aio completition and aio cancel

login
register
mail settings
Submitter Stefan Priebe - Profihost AG
Date Nov. 22, 2012, 10 a.m.
Message ID <1353578419-5481-1-git-send-email-s.priebe@profihost.ag>
Download mbox | patch
Permalink /patch/201031/
State New
Headers show

Comments

Stefan Priebe - Profihost AG - Nov. 22, 2012, 10 a.m.
This one fixes a race which qemu had also in iscsi block driver
between cancellation and io completition.

qemu_rbd_aio_cancel was not synchronously waiting for the end of
the command.

To archieve this it introduces a new status flag which uses
-EINPROGRESS.

Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
---
 block/rbd.c |   23 ++++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)
Blue Swirl - Nov. 24, 2012, 7:54 p.m.
On Thu, Nov 22, 2012 at 10:00 AM, Stefan Priebe <s.priebe@profihost.ag> wrote:
> This one fixes a race which qemu had also in iscsi block driver
> between cancellation and io completition.
>
> qemu_rbd_aio_cancel was not synchronously waiting for the end of
> the command.
>
> To archieve this it introduces a new status flag which uses
> -EINPROGRESS.
>
> Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
> ---
>  block/rbd.c |   23 ++++++++++++++---------
>  1 file changed, 14 insertions(+), 9 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 0384c6c..783c3d7 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -77,6 +77,7 @@ typedef struct RBDAIOCB {
>      int error;
>      struct BDRVRBDState *s;
>      int cancelled;
> +    int status;
>  } RBDAIOCB;
>
>  typedef struct RADOSCB {
> @@ -376,12 +377,6 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>      RBDAIOCB *acb = rcb->acb;
>      int64_t r;
>
> -    if (acb->cancelled) {
> -        qemu_vfree(acb->bounce);
> -        qemu_aio_release(acb);
> -        goto done;
> -    }
> -
>      r = rcb->ret;
>
>      if (acb->cmd == RBD_AIO_WRITE ||
> @@ -406,10 +401,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>              acb->ret = r;
>          }
>      }
> +    acb->status = 0;
> +
>      /* Note that acb->bh can be NULL in case where the aio was cancelled */
>      acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>      qemu_bh_schedule(acb->bh);
> -done:
>      g_free(rcb);
>  }
>
> @@ -574,6 +570,12 @@ static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>  {
>      RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>      acb->cancelled = 1;
> +
> +    while (acb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +
> +    qemu_aio_release(acb);
>  }
>
>  static AIOPool rbd_aio_pool = {
> @@ -646,7 +648,8 @@ static void rbd_aio_bh_cb(void *opaque)
>      qemu_bh_delete(acb->bh);
>      acb->bh = NULL;
>
> -    qemu_aio_release(acb);
> +    if (!acb->cancelled)

Missing braces, please read CODING_STYLE.

> +        qemu_aio_release(acb);
>  }
>
>  static int rbd_aio_discard_wrapper(rbd_image_t image,
> @@ -691,6 +694,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>      acb->s = s;
>      acb->cancelled = 0;
>      acb->bh = NULL;
> +    acb->status = -EINPROGRESS;
>
>      if (cmd == RBD_AIO_WRITE) {
>          qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
> @@ -737,7 +741,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>  failed:
>      g_free(rcb);
>      s->qemu_aio_count--;
> -    qemu_aio_release(acb);
> +    if (!acb->cancelled)
> +        qemu_aio_release(acb);

Also here.

>      return NULL;
>  }
>
> --
> 1.7.10.4
>
>
Stefan Priebe - Profihost AG - Nov. 24, 2012, 8:21 p.m.
Am 24.11.2012 20:54, schrieb Blue Swirl:
> On Thu, Nov 22, 2012 at 10:00 AM, Stefan Priebe <s.priebe@profihost.ag> wrote:
>> This one fixes a race which qemu had also in iscsi block driver
>> between cancellation and io completition.
>>
>> qemu_rbd_aio_cancel was not synchronously waiting for the end of
>> the command.
>>
>> To archieve this it introduces a new status flag which uses
>> -EINPROGRESS.
>>
>> Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
...
>>
>> -    qemu_aio_release(acb);
>> +    if (!acb->cancelled)
>
> Missing braces, please read CODING_STYLE.

Will fix this if the rest is OK. Waiting for Stefan and Paolo.

Stefan
Josh Durgin - Nov. 27, 2012, 10:42 p.m.
On 11/22/2012 02:00 AM, Stefan Priebe wrote:
> This one fixes a race which qemu had also in iscsi block driver
> between cancellation and io completition.
>
> qemu_rbd_aio_cancel was not synchronously waiting for the end of
> the command.
>
> To archieve this it introduces a new status flag which uses
> -EINPROGRESS.
>
> Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
> ---
>   block/rbd.c |   23 ++++++++++++++---------
>   1 file changed, 14 insertions(+), 9 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 0384c6c..783c3d7 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -77,6 +77,7 @@ typedef struct RBDAIOCB {
>       int error;
>       struct BDRVRBDState *s;
>       int cancelled;
> +    int status;
>   } RBDAIOCB;
>
>   typedef struct RADOSCB {
> @@ -376,12 +377,6 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>       RBDAIOCB *acb = rcb->acb;
>       int64_t r;
>
> -    if (acb->cancelled) {
> -        qemu_vfree(acb->bounce);
> -        qemu_aio_release(acb);
> -        goto done;
> -    }
> -
>       r = rcb->ret;
>
>       if (acb->cmd == RBD_AIO_WRITE ||
> @@ -406,10 +401,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>               acb->ret = r;
>           }
>       }
> +    acb->status = 0;
> +
>       /* Note that acb->bh can be NULL in case where the aio was cancelled */
>       acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>       qemu_bh_schedule(acb->bh);
> -done:
>       g_free(rcb);
>   }
>
> @@ -574,6 +570,12 @@ static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>   {
>       RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>       acb->cancelled = 1;
> +
> +    while (acb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +

There should be a qemu_vfree(acb->bounce); here

> +    qemu_aio_release(acb);
>   }
>
>   static AIOPool rbd_aio_pool = {
> @@ -646,7 +648,8 @@ static void rbd_aio_bh_cb(void *opaque)
>       qemu_bh_delete(acb->bh);
>       acb->bh = NULL;
>
> -    qemu_aio_release(acb);
> +    if (!acb->cancelled)
> +        qemu_aio_release(acb);
>   }
>
>   static int rbd_aio_discard_wrapper(rbd_image_t image,
> @@ -691,6 +694,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>       acb->s = s;
>       acb->cancelled = 0;
>       acb->bh = NULL;
> +    acb->status = -EINPROGRESS;
>
>       if (cmd == RBD_AIO_WRITE) {
>           qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
> @@ -737,7 +741,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>   failed:
>       g_free(rcb);
>       s->qemu_aio_count--;
> -    qemu_aio_release(acb);
> +    if (!acb->cancelled)

qemu_vfree(acb->bounce) should be here as well, although that's a
separate bug that's probably never hit.

> +        qemu_aio_release(acb);
>       return NULL;
>   }
>
>
Stefan Hajnoczi - Nov. 29, 2012, 1:58 p.m.
On Thu, Nov 22, 2012 at 11:00:19AM +0100, Stefan Priebe wrote:
> @@ -406,10 +401,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>              acb->ret = r;
>          }
>      }
> +    acb->status = 0;
> +

I suggest doing this in the BH.  The qemu_aio_wait() loop in
qemu_rbd_aio_cancel() needs to wait until the BH has executed.  By
clearing status in the BH we ensure that no matter in which order
qemu_aio_wait() invokes BHs and callbacks, we'll always wait until the
BH has completed before ending the while loop in qemu_rbd_aio_cancel().

> @@ -737,7 +741,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>  failed:
>      g_free(rcb);
>      s->qemu_aio_count--;
> -    qemu_aio_release(acb);
> +    if (!acb->cancelled)
> +        qemu_aio_release(acb);
>      return NULL;
>  }

This scenario is impossible.  We haven't returned the acb back to the
caller yet so they could not have invoked qemu_aio_cancel().

Stefan
Stefan Priebe - Profihost AG - Nov. 29, 2012, 2:32 p.m.
Hi,

i hope i've done everything correctly. I've send a new v4 patch.

Am 29.11.2012 14:58, schrieb Stefan Hajnoczi:
> On Thu, Nov 22, 2012 at 11:00:19AM +0100, Stefan Priebe wrote:
>> @@ -406,10 +401,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>>               acb->ret = r;
>>           }
>>       }
>> +    acb->status = 0;
>> +
>
> I suggest doing this in the BH.  The qemu_aio_wait() loop in
> qemu_rbd_aio_cancel() needs to wait until the BH has executed.  By
> clearing status in the BH we ensure that no matter in which order
> qemu_aio_wait() invokes BHs and callbacks, we'll always wait until the
> BH has completed before ending the while loop in qemu_rbd_aio_cancel().
>
>> @@ -737,7 +741,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>>   failed:
>>       g_free(rcb);
>>       s->qemu_aio_count--;
>> -    qemu_aio_release(acb);
>> +    if (!acb->cancelled)
>> +        qemu_aio_release(acb);
>>       return NULL;
>>   }
>
> This scenario is impossible.  We haven't returned the acb back to the
> caller yet so they could not have invoked qemu_aio_cancel().

Greets,
Stefan
Paolo Bonzini - Nov. 29, 2012, 3:24 p.m.
> > @@ -574,6 +570,12 @@ static void
> > qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> >   {
> >       RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> >       acb->cancelled = 1;
> > +
> > +    while (acb->status == -EINPROGRESS) {
> > +        qemu_aio_wait();
> > +    }
> > +
> 
> There should be a qemu_vfree(acb->bounce); here

No, because the BH will have run at this point and you'd doubly-free
the buffer.

Paolo

> > +    qemu_aio_release(acb);
> >   }
> >
> >   static AIOPool rbd_aio_pool = {
> > @@ -646,7 +648,8 @@ static void rbd_aio_bh_cb(void *opaque)
> >       qemu_bh_delete(acb->bh);
> >       acb->bh = NULL;
> >
> > -    qemu_aio_release(acb);
> > +    if (!acb->cancelled)
> > +        qemu_aio_release(acb);
> >   }
> >
> >   static int rbd_aio_discard_wrapper(rbd_image_t image,
> > @@ -691,6 +694,7 @@ static BlockDriverAIOCB
> > *rbd_start_aio(BlockDriverState *bs,
> >       acb->s = s;
> >       acb->cancelled = 0;
> >       acb->bh = NULL;
> > +    acb->status = -EINPROGRESS;
> >
> >       if (cmd == RBD_AIO_WRITE) {
> >           qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
> > @@ -737,7 +741,8 @@ static BlockDriverAIOCB
> > *rbd_start_aio(BlockDriverState *bs,
> >   failed:
> >       g_free(rcb);
> >       s->qemu_aio_count--;
> > -    qemu_aio_release(acb);
> > +    if (!acb->cancelled)
> 
> qemu_vfree(acb->bounce) should be here as well, although that's a
> separate bug that's probably never hit.
> 
> > +        qemu_aio_release(acb);
> >       return NULL;
> >   }
> >
> >
> 
>

Patch

diff --git a/block/rbd.c b/block/rbd.c
index 0384c6c..783c3d7 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -77,6 +77,7 @@  typedef struct RBDAIOCB {
     int error;
     struct BDRVRBDState *s;
     int cancelled;
+    int status;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
@@ -376,12 +377,6 @@  static void qemu_rbd_complete_aio(RADOSCB *rcb)
     RBDAIOCB *acb = rcb->acb;
     int64_t r;
 
-    if (acb->cancelled) {
-        qemu_vfree(acb->bounce);
-        qemu_aio_release(acb);
-        goto done;
-    }
-
     r = rcb->ret;
 
     if (acb->cmd == RBD_AIO_WRITE ||
@@ -406,10 +401,11 @@  static void qemu_rbd_complete_aio(RADOSCB *rcb)
             acb->ret = r;
         }
     }
+    acb->status = 0;
+
     /* Note that acb->bh can be NULL in case where the aio was cancelled */
     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
     qemu_bh_schedule(acb->bh);
-done:
     g_free(rcb);
 }
 
@@ -574,6 +570,12 @@  static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
     acb->cancelled = 1;
+
+    while (acb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    qemu_aio_release(acb);
 }
 
 static AIOPool rbd_aio_pool = {
@@ -646,7 +648,8 @@  static void rbd_aio_bh_cb(void *opaque)
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
 
-    qemu_aio_release(acb);
+    if (!acb->cancelled)
+        qemu_aio_release(acb);
 }
 
 static int rbd_aio_discard_wrapper(rbd_image_t image,
@@ -691,6 +694,7 @@  static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     acb->s = s;
     acb->cancelled = 0;
     acb->bh = NULL;
+    acb->status = -EINPROGRESS;
 
     if (cmd == RBD_AIO_WRITE) {
         qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
@@ -737,7 +741,8 @@  static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
 failed:
     g_free(rcb);
     s->qemu_aio_count--;
-    qemu_aio_release(acb);
+    if (!acb->cancelled)
+        qemu_aio_release(acb);
     return NULL;
 }