diff mbox

rbd block driver fix race between aio completition and aio cancel

Message ID 1353357585-31746-2-git-send-email-s.priebe@profihost.ag
State New
Headers show

Commit Message

Stefan Priebe - Profihost AG Nov. 19, 2012, 8:39 p.m. UTC
From: Stefan Priebe <s.priebe@profhost.ag>

This one fixes a race qemu also had in iscsi block driver between
cancellation and io completition.

qemu_rbd_aio_cancel was not synchronously waiting for the end of
the command.

It also removes the useless cancelled flag and introduces instead
a status flag with EINPROGRESS like iscsi block driver.

Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
---
 block/rbd.c |   19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

Comments

Josh Durgin Nov. 21, 2012, 8:56 a.m. UTC | #1
On 11/19/2012 12:39 PM, Stefan Priebe wrote:
> From: Stefan Priebe <s.priebe@profhost.ag>
>
> This one fixes a race qemu also had in iscsi block driver between
> cancellation and io completition.
>
> qemu_rbd_aio_cancel was not synchronously waiting for the end of
> the command.
>
> It also removes the useless cancelled flag and introduces instead
> a status flag with EINPROGRESS like iscsi block driver.
>
> Signed-off-by: Stefan Priebe <s.priebe@profihost.ag>
> ---
>   block/rbd.c |   19 ++++++++++++-------
>   1 file changed, 12 insertions(+), 7 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 5a0f79f..7b3bcbb 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -76,7 +76,7 @@ typedef struct RBDAIOCB {
>       int64_t sector_num;
>       int error;
>       struct BDRVRBDState *s;
> -    int cancelled;
> +    int status;
>   } RBDAIOCB;
>
>   typedef struct RADOSCB {
> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>       RBDAIOCB *acb = rcb->acb;
>       int64_t r;
>
> -    if (acb->cancelled) {
> -        qemu_vfree(acb->bounce);
> -        qemu_aio_release(acb);
> +    if (acb->bh) {
>           goto done;
>       }

I don't think this is necessary at all anymore, since this callback
will never be called more than once, and it's the only thing that will
allocate acb->bh. Removing this block (and the done label) altogether
should have the intended effect, and the bh scheduled will free/release
things as usual.

>
> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>               acb->ret = r;
>           }
>       }
> +    acb->status = acb->ret;
> +
>       /* Note that acb->bh can be NULL in case where the aio was cancelled */
>       acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
>       qemu_bh_schedule(acb->bh);
> +
>   done:
>       g_free(rcb);
>   }
> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs)
>   static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>   {
>       RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> -    acb->cancelled = 1;
> +
> +    while (acb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
>   }
>
>   static AIOPool rbd_aio_pool = {
> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque)
>           qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
>       }
>       qemu_vfree(acb->bounce);
> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>       qemu_bh_delete(acb->bh);
>       acb->bh = NULL;
>
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +

I'm not sure changing the order matters here, but maybe I'm missing
something.

>       qemu_aio_release(acb);
>   }
>
> @@ -689,8 +694,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
>       acb->ret = 0;
>       acb->error = 0;
>       acb->s = s;
> -    acb->cancelled = 0;
>       acb->bh = NULL;
> +    acb->status = -EINPROGRESS;
>
>       if (cmd == RBD_AIO_WRITE) {
>           qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
>
Stefan Hajnoczi Nov. 21, 2012, 9:07 a.m. UTC | #2
On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote:
> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>      RBDAIOCB *acb = rcb->acb;
>      int64_t r;
>  
> -    if (acb->cancelled) {
> -        qemu_vfree(acb->bounce);
> -        qemu_aio_release(acb);
> +    if (acb->bh) {
>          goto done;
>      }

When is acb->bh != NULL here?

>  
> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>              acb->ret = r;
>          }
>      }
> +    acb->status = acb->ret;

How about initializing acb->ret with -EINPROGRESS and using it instead
of adding a new status field?

> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs)
>  static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>  {
>      RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> -    acb->cancelled = 1;
> +
> +    while (acb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
>  }

Need to take care that acb is still valid (not yet released!) when the
while loop iterates.

One way of doing this is to mark the acb as cancelled so the completion
handler won't release it.  Then the cancellation function can release
the acb - it's the last piece of code that needs a reference to acb.  In
this case the acb->cancelled field is useful.

> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque)
>          qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
>      }
>      qemu_vfree(acb->bounce);
> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>      qemu_bh_delete(acb->bh);
>      acb->bh = NULL;
>  
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +
>      qemu_aio_release(acb);
>  }
>  

What does this hunk do?
Stefan Priebe - Profihost AG Nov. 21, 2012, 9:19 a.m. UTC | #3
Hello Stefan,
    hello Paolo,

most of the ideas and removing the whole cancellation stuff came from 
Paolo. Maybe he can comment also?

I would then make a new patch.

Greets,
Stefan
Am 21.11.2012 10:07, schrieb Stefan Hajnoczi:
> On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote:
>> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>>       RBDAIOCB *acb = rcb->acb;
>>       int64_t r;
>>
>> -    if (acb->cancelled) {
>> -        qemu_vfree(acb->bounce);
>> -        qemu_aio_release(acb);
>> +    if (acb->bh) {
>>           goto done;
>>       }
>
> When is acb->bh != NULL here?
>
>>
>> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>>               acb->ret = r;
>>           }
>>       }
>> +    acb->status = acb->ret;
>
> How about initializing acb->ret with -EINPROGRESS and using it instead
> of adding a new status field?
>
>> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs)
>>   static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>>   {
>>       RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> -    acb->cancelled = 1;
>> +
>> +    while (acb->status == -EINPROGRESS) {
>> +        qemu_aio_wait();
>> +    }
>>   }
>
> Need to take care that acb is still valid (not yet released!) when the
> while loop iterates.
>
> One way of doing this is to mark the acb as cancelled so the completion
> handler won't release it.  Then the cancellation function can release
> the acb - it's the last piece of code that needs a reference to acb.  In
> this case the acb->cancelled field is useful.
>
>> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque)
>>           qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
>>       }
>>       qemu_vfree(acb->bounce);
>> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>>       qemu_bh_delete(acb->bh);
>>       acb->bh = NULL;
>>
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +
>>       qemu_aio_release(acb);
>>   }
>>
>
> What does this hunk do?
>
Paolo Bonzini Nov. 21, 2012, 3:34 p.m. UTC | #4
Il 21/11/2012 10:19, Stefan Priebe - Profihost AG ha scritto:
> Hello Stefan,
>    hello Paolo,
> 
> most of the ideas and removing the whole cancellation stuff came from
> Paolo. Maybe he can comment also?

I agree with all of Stefan's comments.  This includes putting back
acb->cancelled---but this time done right: note that
qemu_rbd_complete_aio was always releasing the AIOCB

    if (acb->cancelled) {
        qemu_vfree(acb->bounce);
        qemu_aio_release(acb);
        goto done;
    }

and according to Stefan's review it shouldn't.

Paolo
Stefan Priebe - Profihost AG Nov. 22, 2012, 10:01 a.m. UTC | #5
Hello,

i've send a new patch which hopefully cares about all your comments.

[PATCH] rbd block driver fix race between aio completition and aio cancel

Greets
Stefan

Am 21.11.2012 10:07, schrieb Stefan Hajnoczi:
> On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote:
>> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>>       RBDAIOCB *acb = rcb->acb;
>>       int64_t r;
>>
>> -    if (acb->cancelled) {
>> -        qemu_vfree(acb->bounce);
>> -        qemu_aio_release(acb);
>> +    if (acb->bh) {
>>           goto done;
>>       }
>
> When is acb->bh != NULL here?
>
>>
>> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
>>               acb->ret = r;
>>           }
>>       }
>> +    acb->status = acb->ret;
>
> How about initializing acb->ret with -EINPROGRESS and using it instead
> of adding a new status field?
>
>> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs)
>>   static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
>>   {
>>       RBDAIOCB *acb = (RBDAIOCB *) blockacb;
>> -    acb->cancelled = 1;
>> +
>> +    while (acb->status == -EINPROGRESS) {
>> +        qemu_aio_wait();
>> +    }
>>   }
>
> Need to take care that acb is still valid (not yet released!) when the
> while loop iterates.
>
> One way of doing this is to mark the acb as cancelled so the completion
> handler won't release it.  Then the cancellation function can release
> the acb - it's the last piece of code that needs a reference to acb.  In
> this case the acb->cancelled field is useful.
>
>> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque)
>>           qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
>>       }
>>       qemu_vfree(acb->bounce);
>> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>>       qemu_bh_delete(acb->bh);
>>       acb->bh = NULL;
>>
>> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> +
>>       qemu_aio_release(acb);
>>   }
>>
>
> What does this hunk do?
>
diff mbox

Patch

diff --git a/block/rbd.c b/block/rbd.c
index 5a0f79f..7b3bcbb 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -76,7 +76,7 @@  typedef struct RBDAIOCB {
     int64_t sector_num;
     int error;
     struct BDRVRBDState *s;
-    int cancelled;
+    int status;
 } RBDAIOCB;
 
 typedef struct RADOSCB {
@@ -376,9 +376,7 @@  static void qemu_rbd_complete_aio(RADOSCB *rcb)
     RBDAIOCB *acb = rcb->acb;
     int64_t r;
 
-    if (acb->cancelled) {
-        qemu_vfree(acb->bounce);
-        qemu_aio_release(acb);
+    if (acb->bh) {
         goto done;
     }
 
@@ -406,9 +404,12 @@  static void qemu_rbd_complete_aio(RADOSCB *rcb)
             acb->ret = r;
         }
     }
+    acb->status = acb->ret;
+
     /* Note that acb->bh can be NULL in case where the aio was cancelled */
     acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
     qemu_bh_schedule(acb->bh);
+
 done:
     g_free(rcb);
 }
@@ -573,7 +574,10 @@  static void qemu_rbd_close(BlockDriverState *bs)
 static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     RBDAIOCB *acb = (RBDAIOCB *) blockacb;
-    acb->cancelled = 1;
+
+    while (acb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
 }
 
 static AIOPool rbd_aio_pool = {
@@ -642,10 +646,11 @@  static void rbd_aio_bh_cb(void *opaque)
         qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
     }
     qemu_vfree(acb->bounce);
-    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
 
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+
     qemu_aio_release(acb);
 }
 
@@ -689,8 +694,8 @@  static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-    acb->cancelled = 0;
     acb->bh = NULL;
+    acb->status = -EINPROGRESS;
 
     if (cmd == RBD_AIO_WRITE) {
         qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);