diff mbox

sheepdog: use coroutines

Message ID 1313152395-25248-1-git-send-email-morita.kazutaka@lab.ntt.co.jp
State New
Headers show

Commit Message

MORITA Kazutaka Aug. 12, 2011, 12:33 p.m. UTC
This makes the sheepdog block driver support bdrv_co_readv/writev
instead of bdrv_aio_readv/writev.

With this patch, Sheepdog network I/O becomes fully asynchronous.  The
block driver yields back when send/recv returns EAGAIN, and is resumed
when the sheepdog network connection is ready for the operation.

Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
---
 block/sheepdog.c |  150 +++++++++++++++++++++++++++++++++--------------------
 1 files changed, 93 insertions(+), 57 deletions(-)

Comments

Kevin Wolf Aug. 23, 2011, 12:29 p.m. UTC | #1
Am 12.08.2011 14:33, schrieb MORITA Kazutaka:
> This makes the sheepdog block driver support bdrv_co_readv/writev
> instead of bdrv_aio_readv/writev.
> 
> With this patch, Sheepdog network I/O becomes fully asynchronous.  The
> block driver yields back when send/recv returns EAGAIN, and is resumed
> when the sheepdog network connection is ready for the operation.
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> ---
>  block/sheepdog.c |  150 +++++++++++++++++++++++++++++++++--------------------
>  1 files changed, 93 insertions(+), 57 deletions(-)
> 
> diff --git a/block/sheepdog.c b/block/sheepdog.c
> index e150ac0..e283c34 100644
> --- a/block/sheepdog.c
> +++ b/block/sheepdog.c
> @@ -274,7 +274,7 @@ struct SheepdogAIOCB {
>      int ret;
>      enum AIOCBState aiocb_type;
>  
> -    QEMUBH *bh;
> +    Coroutine *coroutine;
>      void (*aio_done_func)(SheepdogAIOCB *);
>  
>      int canceled;
> @@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
>      char *port;
>      int fd;
>  
> +    CoMutex lock;
> +    Coroutine *co_send;
> +    Coroutine *co_recv;
> +
>      uint32_t aioreq_seq_num;
>      QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
>  } BDRVSheepdogState;
> @@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
>  /*
>   * Sheepdog I/O handling:
>   *
> - * 1. In the sd_aio_readv/writev, read/write requests are added to the
> - *    QEMU Bottom Halves.
> - *
> - * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
> - *    requests to the server and link the requests to the
> - *    outstanding_list in the BDRVSheepdogState.  we exits the
> - *    function without waiting for receiving the response.
> + * 1. In sd_co_rw_vector, we send the I/O requests to the server and
> + *    link the requests to the outstanding_list in the
> + *    BDRVSheepdogState.  The function exits without waiting for
> + *    receiving the response.
>   *
> - * 3. We receive the response in aio_read_response, the fd handler to
> + * 2. We receive the response in aio_read_response, the fd handler to
>   *    the sheepdog connection.  If metadata update is needed, we send
>   *    the write request to the vdi object in sd_write_done, the write
> - *    completion function.  The AIOCB callback is not called until all
> - *    the requests belonging to the AIOCB are finished.
> + *    completion function.  We switch back to sd_co_readv/writev after
> + *    all the requests belonging to the AIOCB are finished.
>   */
>  
>  static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
> @@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
>  static void sd_finish_aiocb(SheepdogAIOCB *acb)
>  {
>      if (!acb->canceled) {
> -        acb->common.cb(acb->common.opaque, acb->ret);
> +        qemu_coroutine_enter(acb->coroutine, NULL);
>      }
>      qemu_aio_release(acb);
>  }
> @@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
>       * Sheepdog cannot cancel the requests which are already sent to
>       * the servers, so we just complete the request with -EIO here.
>       */
> -    acb->common.cb(acb->common.opaque, -EIO);
> +    acb->ret = -EIO;
> +    qemu_coroutine_enter(acb->coroutine, NULL);
>      acb->canceled = 1;
>  }
>  
> @@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
>  
>      acb->aio_done_func = NULL;
>      acb->canceled = 0;
> -    acb->bh = NULL;
> +    acb->coroutine = qemu_coroutine_self();
>      acb->ret = 0;
>      QLIST_INIT(&acb->aioreq_head);
>      return acb;
>  }
>  
> -static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
> -{
> -    if (acb->bh) {
> -        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
> -        return -EIO;
> -    }
> -
> -    acb->bh = qemu_bh_new(cb, acb);
> -    qemu_bh_schedule(acb->bh);
> -    return 0;
> -}
> -
>  #ifdef _WIN32
>  
>  struct msghdr {
> @@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
>  again:
>      ret = do_send_recv(sockfd, iov, len, iov_offset, write);
>      if (ret < 0) {
> -        if (errno == EINTR || errno == EAGAIN) {
> +        if (errno == EINTR) {
> +            goto again;
> +        }
> +        if (errno == EAGAIN) {
> +            if (qemu_in_coroutine()) {
> +                qemu_coroutine_yield();
> +            }

Who reenters the coroutine if we yield here?

And of course for a coroutine based block driver I would like to see
much less code in callback functions. But it's a good start anyway.

Kevin
MORITA Kazutaka Aug. 23, 2011, 5:14 p.m. UTC | #2
At Tue, 23 Aug 2011 14:29:50 +0200,
Kevin Wolf wrote:
> 
> Am 12.08.2011 14:33, schrieb MORITA Kazutaka:
> > This makes the sheepdog block driver support bdrv_co_readv/writev
> > instead of bdrv_aio_readv/writev.
> > 
> > With this patch, Sheepdog network I/O becomes fully asynchronous.  The
> > block driver yields back when send/recv returns EAGAIN, and is resumed
> > when the sheepdog network connection is ready for the operation.
> > 
> > Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> > ---
> >  block/sheepdog.c |  150 +++++++++++++++++++++++++++++++++--------------------
> >  1 files changed, 93 insertions(+), 57 deletions(-)
> > 
> > diff --git a/block/sheepdog.c b/block/sheepdog.c
> > index e150ac0..e283c34 100644
> > --- a/block/sheepdog.c
> > +++ b/block/sheepdog.c
> > @@ -274,7 +274,7 @@ struct SheepdogAIOCB {
> >      int ret;
> >      enum AIOCBState aiocb_type;
> >  
> > -    QEMUBH *bh;
> > +    Coroutine *coroutine;
> >      void (*aio_done_func)(SheepdogAIOCB *);
> >  
> >      int canceled;
> > @@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
> >      char *port;
> >      int fd;
> >  
> > +    CoMutex lock;
> > +    Coroutine *co_send;
> > +    Coroutine *co_recv;
> > +
> >      uint32_t aioreq_seq_num;
> >      QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
> >  } BDRVSheepdogState;
> > @@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
> >  /*
> >   * Sheepdog I/O handling:
> >   *
> > - * 1. In the sd_aio_readv/writev, read/write requests are added to the
> > - *    QEMU Bottom Halves.
> > - *
> > - * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
> > - *    requests to the server and link the requests to the
> > - *    outstanding_list in the BDRVSheepdogState.  we exits the
> > - *    function without waiting for receiving the response.
> > + * 1. In sd_co_rw_vector, we send the I/O requests to the server and
> > + *    link the requests to the outstanding_list in the
> > + *    BDRVSheepdogState.  The function exits without waiting for
> > + *    receiving the response.
> >   *
> > - * 3. We receive the response in aio_read_response, the fd handler to
> > + * 2. We receive the response in aio_read_response, the fd handler to
> >   *    the sheepdog connection.  If metadata update is needed, we send
> >   *    the write request to the vdi object in sd_write_done, the write
> > - *    completion function.  The AIOCB callback is not called until all
> > - *    the requests belonging to the AIOCB are finished.
> > + *    completion function.  We switch back to sd_co_readv/writev after
> > + *    all the requests belonging to the AIOCB are finished.
> >   */
> >  
> >  static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
> > @@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
> >  static void sd_finish_aiocb(SheepdogAIOCB *acb)
> >  {
> >      if (!acb->canceled) {
> > -        acb->common.cb(acb->common.opaque, acb->ret);
> > +        qemu_coroutine_enter(acb->coroutine, NULL);
> >      }
> >      qemu_aio_release(acb);
> >  }
> > @@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
> >       * Sheepdog cannot cancel the requests which are already sent to
> >       * the servers, so we just complete the request with -EIO here.
> >       */
> > -    acb->common.cb(acb->common.opaque, -EIO);
> > +    acb->ret = -EIO;
> > +    qemu_coroutine_enter(acb->coroutine, NULL);
> >      acb->canceled = 1;
> >  }
> >  
> > @@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> >  
> >      acb->aio_done_func = NULL;
> >      acb->canceled = 0;
> > -    acb->bh = NULL;
> > +    acb->coroutine = qemu_coroutine_self();
> >      acb->ret = 0;
> >      QLIST_INIT(&acb->aioreq_head);
> >      return acb;
> >  }
> >  
> > -static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
> > -{
> > -    if (acb->bh) {
> > -        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
> > -        return -EIO;
> > -    }
> > -
> > -    acb->bh = qemu_bh_new(cb, acb);
> > -    qemu_bh_schedule(acb->bh);
> > -    return 0;
> > -}
> > -
> >  #ifdef _WIN32
> >  
> >  struct msghdr {
> > @@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
> >  again:
> >      ret = do_send_recv(sockfd, iov, len, iov_offset, write);
> >      if (ret < 0) {
> > -        if (errno == EINTR || errno == EAGAIN) {
> > +        if (errno == EINTR) {
> > +            goto again;
> > +        }
> > +        if (errno == EAGAIN) {
> > +            if (qemu_in_coroutine()) {
> > +                qemu_coroutine_yield();
> > +            }
> 
> Who reenters the coroutine if we yield here?

The fd handlers, co_write_request() and co_read_response(), will call
qemu_coroutine_enter() for the coroutine.  It will be restarted after
the sheepdog network connection becomes ready.

> 
> And of course for a coroutine based block driver I would like to see
> much less code in callback functions. But it's a good start anyway.

Yes, actually they can be much simpler.  This patch adds asynchrous
I/O support with a minimal change based on a coroutine, and I think
code simplification would be the next step.


Thanks,

Kazutaka
Kevin Wolf Aug. 24, 2011, 12:56 p.m. UTC | #3
Am 23.08.2011 19:14, schrieb MORITA Kazutaka:
> At Tue, 23 Aug 2011 14:29:50 +0200,
> Kevin Wolf wrote:
>>
>> Am 12.08.2011 14:33, schrieb MORITA Kazutaka:
>>> This makes the sheepdog block driver support bdrv_co_readv/writev
>>> instead of bdrv_aio_readv/writev.
>>>
>>> With this patch, Sheepdog network I/O becomes fully asynchronous.  The
>>> block driver yields back when send/recv returns EAGAIN, and is resumed
>>> when the sheepdog network connection is ready for the operation.
>>>
>>> Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
>>> ---
>>>  block/sheepdog.c |  150 +++++++++++++++++++++++++++++++++--------------------
>>>  1 files changed, 93 insertions(+), 57 deletions(-)
>>>
>>> diff --git a/block/sheepdog.c b/block/sheepdog.c
>>> index e150ac0..e283c34 100644
>>> --- a/block/sheepdog.c
>>> +++ b/block/sheepdog.c
>>> @@ -274,7 +274,7 @@ struct SheepdogAIOCB {
>>>      int ret;
>>>      enum AIOCBState aiocb_type;
>>>  
>>> -    QEMUBH *bh;
>>> +    Coroutine *coroutine;
>>>      void (*aio_done_func)(SheepdogAIOCB *);
>>>  
>>>      int canceled;
>>> @@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState {
>>>      char *port;
>>>      int fd;
>>>  
>>> +    CoMutex lock;
>>> +    Coroutine *co_send;
>>> +    Coroutine *co_recv;
>>> +
>>>      uint32_t aioreq_seq_num;
>>>      QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
>>>  } BDRVSheepdogState;
>>> @@ -346,19 +350,16 @@ static const char * sd_strerror(int err)
>>>  /*
>>>   * Sheepdog I/O handling:
>>>   *
>>> - * 1. In the sd_aio_readv/writev, read/write requests are added to the
>>> - *    QEMU Bottom Halves.
>>> - *
>>> - * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
>>> - *    requests to the server and link the requests to the
>>> - *    outstanding_list in the BDRVSheepdogState.  we exits the
>>> - *    function without waiting for receiving the response.
>>> + * 1. In sd_co_rw_vector, we send the I/O requests to the server and
>>> + *    link the requests to the outstanding_list in the
>>> + *    BDRVSheepdogState.  The function exits without waiting for
>>> + *    receiving the response.
>>>   *
>>> - * 3. We receive the response in aio_read_response, the fd handler to
>>> + * 2. We receive the response in aio_read_response, the fd handler to
>>>   *    the sheepdog connection.  If metadata update is needed, we send
>>>   *    the write request to the vdi object in sd_write_done, the write
>>> - *    completion function.  The AIOCB callback is not called until all
>>> - *    the requests belonging to the AIOCB are finished.
>>> + *    completion function.  We switch back to sd_co_readv/writev after
>>> + *    all the requests belonging to the AIOCB are finished.
>>>   */
>>>  
>>>  static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
>>> @@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
>>>  static void sd_finish_aiocb(SheepdogAIOCB *acb)
>>>  {
>>>      if (!acb->canceled) {
>>> -        acb->common.cb(acb->common.opaque, acb->ret);
>>> +        qemu_coroutine_enter(acb->coroutine, NULL);
>>>      }
>>>      qemu_aio_release(acb);
>>>  }
>>> @@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
>>>       * Sheepdog cannot cancel the requests which are already sent to
>>>       * the servers, so we just complete the request with -EIO here.
>>>       */
>>> -    acb->common.cb(acb->common.opaque, -EIO);
>>> +    acb->ret = -EIO;
>>> +    qemu_coroutine_enter(acb->coroutine, NULL);
>>>      acb->canceled = 1;
>>>  }
>>>  
>>> @@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
>>>  
>>>      acb->aio_done_func = NULL;
>>>      acb->canceled = 0;
>>> -    acb->bh = NULL;
>>> +    acb->coroutine = qemu_coroutine_self();
>>>      acb->ret = 0;
>>>      QLIST_INIT(&acb->aioreq_head);
>>>      return acb;
>>>  }
>>>  
>>> -static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
>>> -{
>>> -    if (acb->bh) {
>>> -        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
>>> -        return -EIO;
>>> -    }
>>> -
>>> -    acb->bh = qemu_bh_new(cb, acb);
>>> -    qemu_bh_schedule(acb->bh);
>>> -    return 0;
>>> -}
>>> -
>>>  #ifdef _WIN32
>>>  
>>>  struct msghdr {
>>> @@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len,
>>>  again:
>>>      ret = do_send_recv(sockfd, iov, len, iov_offset, write);
>>>      if (ret < 0) {
>>> -        if (errno == EINTR || errno == EAGAIN) {
>>> +        if (errno == EINTR) {
>>> +            goto again;
>>> +        }
>>> +        if (errno == EAGAIN) {
>>> +            if (qemu_in_coroutine()) {
>>> +                qemu_coroutine_yield();
>>> +            }
>>
>> Who reenters the coroutine if we yield here?
> 
> The fd handlers, co_write_request() and co_read_response(), will call
> qemu_coroutine_enter() for the coroutine.  It will be restarted after
> the sheepdog network connection becomes ready.

Makes sense. Applied to the block branch,

>> And of course for a coroutine based block driver I would like to see
>> much less code in callback functions. But it's a good start anyway.
> 
> Yes, actually they can be much simpler.  This patch adds asynchrous
> I/O support with a minimal change based on a coroutine, and I think
> code simplification would be the next step.

Ok, if you're looking into this, perfect. :-)

Kevin
Christoph Hellwig Dec. 23, 2011, 1:38 p.m. UTC | #4
FYI, this causes segfaults when doing large streaming writes when
running against a sheepdog cluster which:

  a) has relatively fast SSDs

and

  b) uses buffered I/O.

Unfortunately I can't get a useful backtrace out of gdb.  When running just
this commit I at least get some debugging messages:

qemu-system-x86_64: failed to recv a rsp, Socket operation on non-socket
qemu-system-x86_64: failed to get the header, Socket operation on non-socket

but on least qemu these don't show up either.
Christoph Hellwig Dec. 29, 2011, 12:06 p.m. UTC | #5
On Fri, Dec 23, 2011 at 02:38:50PM +0100, Christoph Hellwig wrote:
> FYI, this causes segfaults when doing large streaming writes when
> running against a sheepdog cluster which:
> 
>   a) has relatively fast SSDs
> 
> and
> 
>   b) uses buffered I/O.
> 
> Unfortunately I can't get a useful backtrace out of gdb.  When running just
> this commit I at least get some debugging messages:
> 
> qemu-system-x86_64: failed to recv a rsp, Socket operation on non-socket
> qemu-system-x86_64: failed to get the header, Socket operation on non-socket
> 
> but on least qemu these don't show up either.

s/least/latest/

Some more debugging.  Just for the call that eventually segfaults s->fd
turns from its normal value (normall 13 for me) into 0.  This is entirely
reproducable in my testing, and given that the sheepdog driver never
assigns to that value except opening the device this seems to point to
an issue in the coroutine code to me.
Stefan Hajnoczi Dec. 30, 2011, 10:35 a.m. UTC | #6
On Thu, Dec 29, 2011 at 01:06:26PM +0100, Christoph Hellwig wrote:
> On Fri, Dec 23, 2011 at 02:38:50PM +0100, Christoph Hellwig wrote:
> > FYI, this causes segfaults when doing large streaming writes when
> > running against a sheepdog cluster which:
> > 
> >   a) has relatively fast SSDs
> > 
> > and
> > 
> >   b) uses buffered I/O.
> > 
> > Unfortunately I can't get a useful backtrace out of gdb.  When running just
> > this commit I at least get some debugging messages:
> > 
> > qemu-system-x86_64: failed to recv a rsp, Socket operation on non-socket
> > qemu-system-x86_64: failed to get the header, Socket operation on non-socket
> > 
> > but on least qemu these don't show up either.
> 
> s/least/latest/
> 
> Some more debugging.  Just for the call that eventually segfaults s->fd
> turns from its normal value (normall 13 for me) into 0.  This is entirely
> reproducable in my testing, and given that the sheepdog driver never
> assigns to that value except opening the device this seems to point to
> an issue in the coroutine code to me.

Are you building with gcc 4.5.3 or later?  (Earlier versions may
mis-compile, see https://bugs.launchpad.net/qemu/+bug/902148.)

If you can reproduce this bug and suspect coroutines are involved then I
suggest using gdb to observe the last valid field values of s and the
address of s.  When the coroutine re-enters make sure that s still has
the same address and check if the field values are the same as before.

I don't have a sheepdog setup here but if there's an easy way to
reproduce please let me know and I'll take a look.

Stefan
Christoph Hellwig Jan. 2, 2012, 3:39 p.m. UTC | #7
On Fri, Dec 30, 2011 at 10:35:01AM +0000, Stefan Hajnoczi wrote:
> Are you building with gcc 4.5.3 or later?  (Earlier versions may
> mis-compile, see https://bugs.launchpad.net/qemu/+bug/902148.)

I'm using "gcc version 4.6.2 (Debian 4.6.2-9)", so that should not
be an issue. 

> If you can reproduce this bug and suspect coroutines are involved then I

It's entirely reproducable.

I've played around a bit and switched from the ucontext to the gthreads
coroutine implementation.  The result seems odd, but starts to make sense.

Running the workload I now get the following message from qemu:

Co-routine re-entered recursively

and the gdb backtrace looks like:

(gdb) bt
#0  0x00007f2fff36f405 in *__GI_raise (sig=<optimized out>)
    at ../nptl/sysdeps/unix/sysv/linux/raise.c:64
#1  0x00007f2fff372680 in *__GI_abort () at abort.c:92
#2  0x00007f30019a6616 in qemu_coroutine_enter (co=0x7f3004d4d7b0, opaque=0x0)
    at qemu-coroutine.c:53
#3  0x00007f30019a5e82 in qemu_co_queue_next_bh (opaque=<optimized out>)
    at qemu-coroutine-lock.c:43
#4  0x00007f30018d5a72 in qemu_bh_poll () at async.c:71
#5  0x00007f3001982990 in main_loop_wait (nonblocking=<optimized out>)
    at main-loop.c:472
#6  0x00007f30018cf714 in main_loop () at /home/hch/work/qemu/vl.c:1481
#7  main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>)
    at /home/hch/work/qemu/vl.c:3479

adding some printks suggest this happens when calling add_aio_request from
aio_read_response when either delaying creates, or updating metadata,
although not everytime one of these cases happens.

I've tried to understand how the recursive calling happens, but unfortunately
the whole coroutine code lacks any sort of documentation how it should
behave or what it asserts about the callers.

> I don't have a sheepdog setup here but if there's an easy way to
> reproduce please let me know and I'll take a look.

With the small patch below applied to the sheppdog source I can reproduce
the issue on my laptop using the following setup:

for port in 7000 7001 7002; do
    mkdir -p /mnt/sheepdog/$port
    /usr/sbin/sheep -p $port -c local /mnt/sheepdog/$port
    sleep 2
done

collie cluster format
collie vdi create test 20G

then start a qemu instance that uses the the sheepdog volume using the
following device and drive lines:

	-drive if=none,file=sheepdog:test,cache=none,id=test \
	-device virtio-blk-pci,drive=test,id=testdev \

finally, in the guest run:

	dd if=/dev/zero of=/dev/vdX bs=67108864 count=128 oflag=direct
Stefan Hajnoczi Jan. 2, 2012, 10:38 p.m. UTC | #8
On Mon, Jan 2, 2012 at 3:39 PM, Christoph Hellwig <hch@lst.de> wrote:
> On Fri, Dec 30, 2011 at 10:35:01AM +0000, Stefan Hajnoczi wrote:
>> If you can reproduce this bug and suspect coroutines are involved then I
>
> It's entirely reproducable.
>
> I've played around a bit and switched from the ucontext to the gthreads
> coroutine implementation.  The result seems odd, but starts to make sense.
>
> Running the workload I now get the following message from qemu:
>
> Co-routine re-entered recursively
>
> and the gdb backtrace looks like:
>
> (gdb) bt
> #0  0x00007f2fff36f405 in *__GI_raise (sig=<optimized out>)
>    at ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> #1  0x00007f2fff372680 in *__GI_abort () at abort.c:92
> #2  0x00007f30019a6616 in qemu_coroutine_enter (co=0x7f3004d4d7b0, opaque=0x0)
>    at qemu-coroutine.c:53
> #3  0x00007f30019a5e82 in qemu_co_queue_next_bh (opaque=<optimized out>)
>    at qemu-coroutine-lock.c:43
> #4  0x00007f30018d5a72 in qemu_bh_poll () at async.c:71
> #5  0x00007f3001982990 in main_loop_wait (nonblocking=<optimized out>)
>    at main-loop.c:472
> #6  0x00007f30018cf714 in main_loop () at /home/hch/work/qemu/vl.c:1481
> #7  main (argc=<optimized out>, argv=<optimized out>, envp=<optimized out>)
>    at /home/hch/work/qemu/vl.c:3479
>
> adding some printks suggest this happens when calling add_aio_request from
> aio_read_response when either delaying creates, or updating metadata,
> although not everytime one of these cases happens.
>
> I've tried to understand how the recursive calling happens, but unfortunately
> the whole coroutine code lacks any sort of documentation how it should
> behave or what it asserts about the callers.
>
>> I don't have a sheepdog setup here but if there's an easy way to
>> reproduce please let me know and I'll take a look.
>
> With the small patch below applied to the sheppdog source I can reproduce
> the issue on my laptop using the following setup:
>
> for port in 7000 7001 7002; do
>    mkdir -p /mnt/sheepdog/$port
>    /usr/sbin/sheep -p $port -c local /mnt/sheepdog/$port
>    sleep 2
> done
>
> collie cluster format
> collie vdi create test 20G
>
> then start a qemu instance that uses the the sheepdog volume using the
> following device and drive lines:
>
>        -drive if=none,file=sheepdog:test,cache=none,id=test \
>        -device virtio-blk-pci,drive=test,id=testdev \
>
> finally, in the guest run:
>
>        dd if=/dev/zero of=/dev/vdX bs=67108864 count=128 oflag=direct

Thanks for these instructions.  I can reproduce the issue here.

It seems suspicious the way that BDRVSheepdogState->co_recv and
->co_send work.  The code adds select(2) read/write callback functions
on the sheepdog socket file descriptor.  When the socket becomes
writeable or readable the co_send or co_recv coroutines are entered.
So far, so good, this is how a coroutine is integrated into the main
loop of QEMU.

The problem is that this patch is mixing things.  The co_recv
coroutine runs aio_read_response(), which invokes send_pending_req().
send_pending_req() invokes add_aio_request().  That function isn't
suitable for co_recv's context because it actually sends data and hits
a few blocking (yield) points.  It takes a coroutine mutex - but the
select(2) read callback is still in place.  We're now still in the
aio_read_response() call chain except we're actually not reading at
all, we're trying to write!  And we'll get spurious wakeups if there
is any data readable on the socket.

So the co_recv coroutine has two things in the system that will try to enter it:
1. The select(2) read callback on the sheepdog socket.
2. The aio_add_request() blocking operations, including a coroutine mutex.

This is bad, a yielded coroutine should only have one thing that will
enter it.  It's rare that it makes sense to have multiple things
entering a coroutine.

It's late here but I hope this gives Kazutaka some thoughts on what is
causing the issue with this patch.

Stefan
Stefan Hajnoczi Jan. 3, 2012, 8:16 a.m. UTC | #9
On Mon, Jan 02, 2012 at 04:39:59PM +0100, Christoph Hellwig wrote:
> I've tried to understand how the recursive calling happens, but unfortunately
> the whole coroutine code lacks any sort of documentation how it should
> behave or what it asserts about the callers.

There is documentation on the public coroutine functions, see
qemu-coroutine.h.

If you have specific questions I can improve the documentation.  Please
let me know.

Stefan
Christoph Hellwig Jan. 4, 2012, 3:58 p.m. UTC | #10
On Tue, Jan 03, 2012 at 08:16:55AM +0000, Stefan Hajnoczi wrote:
> On Mon, Jan 02, 2012 at 04:39:59PM +0100, Christoph Hellwig wrote:
> > I've tried to understand how the recursive calling happens, but unfortunately
> > the whole coroutine code lacks any sort of documentation how it should
> > behave or what it asserts about the callers.
> 
> There is documentation on the public coroutine functions, see
> qemu-coroutine.h.

Ok, I was looking in the source files and the documentation directory only
as that is where I expected the documentation to sit.

Btw, what is the plan forward for the block I/O interface?  Only
qcow2, sheepdog and nbd implement the coroutine interfaces, and none of the
hardware emulations calls them directly.  Also at least for interfaces
like the librbd callbacks coroutines don't even seem to be that useful.
Paolo Bonzini Jan. 4, 2012, 5:03 p.m. UTC | #11
On 01/04/2012 04:58 PM, Christoph Hellwig wrote:
> Btw, what is the plan forward for the block I/O interface?  Only
> qcow2, sheepdog and nbd implement the coroutine interfaces

Actually all formats implement a coroutine interface except raw, qed and 
vdi (IIRC).  But the "minor" formats do not implement scatter/gather 
I/O, and use a bounce buffer instead.

Among the protocols, only nbd implements coroutines, the others use AIO.

> , and none of the
> hardware emulations calls them directly.

Yes, AIO is often simpler to use for hardware emulation (or anyway it 
has smaller advantages).  But I do expect that SD could easily switch 
from synchronous to coroutines, for example.  Floppy is a mess.

IMO both coroutines and AIO have their uses.  Converting qed to 
coroutines would make sense, but this is much less true for converting 
raw and even less for rbd or iscsi (the external libraries are 
callback-based).

On the other hand, changing all formats to support scatter/gather I/O 
should not be hard, and would enable some nice simplifications in the 
code.  I have enough other cleanups on my plate, though. :/

Paolo
diff mbox

Patch

diff --git a/block/sheepdog.c b/block/sheepdog.c
index e150ac0..e283c34 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -274,7 +274,7 @@  struct SheepdogAIOCB {
     int ret;
     enum AIOCBState aiocb_type;
 
-    QEMUBH *bh;
+    Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
     int canceled;
@@ -295,6 +295,10 @@  typedef struct BDRVSheepdogState {
     char *port;
     int fd;
 
+    CoMutex lock;
+    Coroutine *co_send;
+    Coroutine *co_recv;
+
     uint32_t aioreq_seq_num;
     QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
 } BDRVSheepdogState;
@@ -346,19 +350,16 @@  static const char * sd_strerror(int err)
 /*
  * Sheepdog I/O handling:
  *
- * 1. In the sd_aio_readv/writev, read/write requests are added to the
- *    QEMU Bottom Halves.
- *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- *    requests to the server and link the requests to the
- *    outstanding_list in the BDRVSheepdogState.  we exits the
- *    function without waiting for receiving the response.
+ * 1. In sd_co_rw_vector, we send the I/O requests to the server and
+ *    link the requests to the outstanding_list in the
+ *    BDRVSheepdogState.  The function exits without waiting for
+ *    receiving the response.
  *
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 2. We receive the response in aio_read_response, the fd handler to
  *    the sheepdog connection.  If metadata update is needed, we send
  *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  The AIOCB callback is not called until all
- *    the requests belonging to the AIOCB are finished.
+ *    completion function.  We switch back to sd_co_readv/writev after
+ *    all the requests belonging to the AIOCB are finished.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -398,7 +399,7 @@  static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 static void sd_finish_aiocb(SheepdogAIOCB *acb)
 {
     if (!acb->canceled) {
-        acb->common.cb(acb->common.opaque, acb->ret);
+        qemu_coroutine_enter(acb->coroutine, NULL);
     }
     qemu_aio_release(acb);
 }
@@ -411,7 +412,8 @@  static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
      * Sheepdog cannot cancel the requests which are already sent to
      * the servers, so we just complete the request with -EIO here.
      */
-    acb->common.cb(acb->common.opaque, -EIO);
+    acb->ret = -EIO;
+    qemu_coroutine_enter(acb->coroutine, NULL);
     acb->canceled = 1;
 }
 
@@ -435,24 +437,12 @@  static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 
     acb->aio_done_func = NULL;
     acb->canceled = 0;
-    acb->bh = NULL;
+    acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     QLIST_INIT(&acb->aioreq_head);
     return acb;
 }
 
-static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
-{
-    if (acb->bh) {
-        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
-        return -EIO;
-    }
-
-    acb->bh = qemu_bh_new(cb, acb);
-    qemu_bh_schedule(acb->bh);
-    return 0;
-}
-
 #ifdef _WIN32
 
 struct msghdr {
@@ -635,7 +625,13 @@  static int do_readv_writev(int sockfd, struct iovec *iov, int len,
 again:
     ret = do_send_recv(sockfd, iov, len, iov_offset, write);
     if (ret < 0) {
-        if (errno == EINTR || errno == EAGAIN) {
+        if (errno == EINTR) {
+            goto again;
+        }
+        if (errno == EAGAIN) {
+            if (qemu_in_coroutine()) {
+                qemu_coroutine_yield();
+            }
             goto again;
         }
         error_report("failed to recv a rsp, %s", strerror(errno));
@@ -793,14 +789,14 @@  static void aio_read_response(void *opaque)
     unsigned long idx;
 
     if (QLIST_EMPTY(&s->outstanding_aio_head)) {
-        return;
+        goto out;
     }
 
     /* read a header */
     ret = do_read(fd, &rsp, sizeof(rsp));
     if (ret) {
         error_report("failed to get the header, %s", strerror(errno));
-        return;
+        goto out;
     }
 
     /* find the right aio_req from the outstanding_aio list */
@@ -811,7 +807,7 @@  static void aio_read_response(void *opaque)
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        return;
+        goto out;
     }
 
     acb = aio_req->aiocb;
@@ -847,7 +843,7 @@  static void aio_read_response(void *opaque)
                        aio_req->iov_offset);
         if (ret) {
             error_report("failed to get the data, %s", strerror(errno));
-            return;
+            goto out;
         }
         break;
     }
@@ -861,10 +857,30 @@  static void aio_read_response(void *opaque)
     if (!rest) {
         /*
          * We've finished all requests which belong to the AIOCB, so
-         * we can call the callback now.
+         * we can switch back to sd_co_readv/writev now.
          */
         acb->aio_done_func(acb);
     }
+out:
+    s->co_recv = NULL;
+}
+
+static void co_read_response(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    if (!s->co_recv) {
+        s->co_recv = qemu_coroutine_create(aio_read_response);
+    }
+
+    qemu_coroutine_enter(s->co_recv, opaque);
+}
+
+static void co_write_request(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    qemu_coroutine_enter(s->co_send, NULL);
 }
 
 static int aio_flush_request(void *opaque)
@@ -924,7 +940,7 @@  static int get_sheep_fd(BDRVSheepdogState *s)
         return -1;
     }
 
-    qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
+    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request,
                             NULL, s);
     return fd;
 }
@@ -1091,6 +1107,10 @@  static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 
     hdr.id = aio_req->id;
 
+    qemu_co_mutex_lock(&s->lock);
+    s->co_send = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
+                            aio_flush_request, NULL, s);
     set_cork(s->fd, 1);
 
     /* send a header */
@@ -1109,6 +1129,9 @@  static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     }
 
     set_cork(s->fd, 0);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
+                            aio_flush_request, NULL, s);
+    qemu_co_mutex_unlock(&s->lock);
 
     return 0;
 }
@@ -1225,6 +1248,7 @@  static int sd_open(BlockDriverState *bs, const char *filename, int flags)
 
     bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
     strncpy(s->name, vdi, sizeof(s->name));
+    qemu_co_mutex_init(&s->lock);
     qemu_free(buf);
     return 0;
 out:
@@ -1491,7 +1515,7 @@  static int sd_truncate(BlockDriverState *bs, int64_t offset)
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this calls the AIOCB callback.
+ * Otherwise, this switches back to sd_co_readv/writev.
  */
 static void sd_write_done(SheepdogAIOCB *acb)
 {
@@ -1587,8 +1611,11 @@  out:
  * waiting the response.  The responses are received in the
  * `aio_read_response' function which is called from the main loop as
  * a fd handler.
+ *
+ * Returns 1 when we need to wait a response, 0 when there is no sent
+ * request and -errno in error cases.
  */
-static void sd_readv_writev_bh_cb(void *p)
+static int sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -1600,9 +1627,6 @@  static void sd_readv_writev_bh_cb(void *p)
     SheepdogInode *inode = &s->inode;
     AIOReq *aio_req;
 
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-
     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
         /*
          * In the case we open the snapshot VDI, Sheepdog creates the
@@ -1684,42 +1708,47 @@  static void sd_readv_writev_bh_cb(void *p)
     }
 out:
     if (QLIST_EMPTY(&acb->aioreq_head)) {
-        sd_finish_aiocb(acb);
+        return acb->ret;
     }
+    return 1;
 }
 
-static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num,
-                                       QEMUIOVector *qiov, int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque)
+static int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
+    int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
         /* TODO: shouldn't block here */
         if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
-            return NULL;
+            return -EIO;
         }
         bs->total_sectors = sector_num + nb_sectors;
     }
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
-static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
-                                      QEMUIOVector *qiov, int nb_sectors,
-                                      BlockDriverCompletionFunc *cb,
-                                      void *opaque)
+static int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                       int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
-    int i;
+    int i, ret;
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aiocb_type = AIOCB_READ_UDATA;
     acb->aio_done_func = sd_finish_aiocb;
 
@@ -1731,8 +1760,15 @@  static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
         memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
     }
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2062,8 +2098,8 @@  BlockDriver bdrv_sheepdog = {
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
-    .bdrv_aio_readv     = sd_aio_readv,
-    .bdrv_aio_writev    = sd_aio_writev,
+    .bdrv_co_readv  = sd_co_readv,
+    .bdrv_co_writev = sd_co_writev,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,