diff mbox

[RFC,11/12] qcow2: Convert qcow2 to use coroutines for async I/O

Message ID 1295688567-25496-12-git-send-email-stefanha@linux.vnet.ibm.com
State New
Headers show

Commit Message

Stefan Hajnoczi Jan. 22, 2011, 9:29 a.m. UTC
Converting qcow2 to use coroutines is fairly simple since most of qcow2
is synchronous.  The synchronous I/O functions likes bdrv_pread() now
transparently work when called from a coroutine, so all the synchronous
code just works.

The explicitly asynchronous code is adjusted to repeatedly call
qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
At that point the coroutine will return from its entry function and its
resources are freed.

The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
from a BH.  This is necessary since the user callback code does not
expect to be executed from a coroutine.

This conversion is not completely correct because the safety the
synchronous code does not carry over to the coroutine version.
Previously, a synchronous code path could assume that it will never be
interleaved with another request executing.  This is no longer true
because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
other requests can be processed during that time.

The solution is to carefully introduce checks so that pending requests
do not step on each other's toes.  That is left for a future patch...

Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 block/qcow2.c |  160 ++++++++++++++++++++++++++++++---------------------------
 1 files changed, 85 insertions(+), 75 deletions(-)

Comments

Anthony Liguori Jan. 23, 2011, 11:40 p.m. UTC | #1
On 01/22/2011 03:29 AM, Stefan Hajnoczi wrote:
> Converting qcow2 to use coroutines is fairly simple since most of qcow2
> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
> transparently work when called from a coroutine, so all the synchronous
> code just works.
>
> The explicitly asynchronous code is adjusted to repeatedly call
> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
> At that point the coroutine will return from its entry function and its
> resources are freed.
>
> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
> from a BH.  This is necessary since the user callback code does not
> expect to be executed from a coroutine.
>
> This conversion is not completely correct because the safety the
> synchronous code does not carry over to the coroutine version.
> Previously, a synchronous code path could assume that it will never be
> interleaved with another request executing.  This is no longer true
> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
> other requests can be processed during that time.
>
> The solution is to carefully introduce checks so that pending requests
> do not step on each other's toes.  That is left for a future patch...
>
> Signed-off-by: Stefan Hajnoczi<stefanha@linux.vnet.ibm.com>
>    

As an alternative approach, could we trap async calls from the block 
device, implement them in a synchronous fashion, then issue the callback 
immediately?

This would mean that qcow_aio_write() would become fully synchronous 
which also means that you can track when the operation is completed 
entirely within the block layer.  IOW, it should be possible to do this 
with almost no change to qcow2.

I think this is the right approach too.  If we're using coroutines, we 
shouldn't do anything asynchronous in the image formats.  The good bit 
about this is that we can probably dramatically simplify the block layer 
API but eliminating the sync/async versions of everything.

Regards,

Anthony Liguori

> ---
>   block/qcow2.c |  160 ++++++++++++++++++++++++++++++---------------------------
>   1 files changed, 85 insertions(+), 75 deletions(-)
>
> diff --git a/block/qcow2.c b/block/qcow2.c
> index b6b094c..4b33ef3 100644
> --- a/block/qcow2.c
> +++ b/block/qcow2.c
> @@ -361,19 +361,20 @@ typedef struct QCowAIOCB {
>       uint64_t bytes_done;
>       uint64_t cluster_offset;
>       uint8_t *cluster_data;
> -    BlockDriverAIOCB *hd_aiocb;
>       QEMUIOVector hd_qiov;
>       QEMUBH *bh;
>       QCowL2Meta l2meta;
>       QLIST_ENTRY(QCowAIOCB) next_depend;
> +    Coroutine *coroutine;
> +    int ret; /* return code for user callback */
>   } QCowAIOCB;
>
>   static void qcow2_aio_cancel(BlockDriverAIOCB *blockacb)
>   {
>       QCowAIOCB *acb = container_of(blockacb, QCowAIOCB, common);
> -    if (acb->hd_aiocb)
> -        bdrv_aio_cancel(acb->hd_aiocb);
>       qemu_aio_release(acb);
> +    /* XXX This function looks broken, we could be in the middle of a request
> +     * and releasing the acb is not a good idea */
>   }
>
>   static AIOPool qcow2_aio_pool = {
> @@ -381,13 +382,14 @@ static AIOPool qcow2_aio_pool = {
>       .cancel             = qcow2_aio_cancel,
>   };
>
> -static void qcow2_aio_read_cb(void *opaque, int ret);
> -static void qcow2_aio_read_bh(void *opaque)
> +static void qcow2_aio_bh(void *opaque)
>   {
>       QCowAIOCB *acb = opaque;
>       qemu_bh_delete(acb->bh);
>       acb->bh = NULL;
> -    qcow2_aio_read_cb(opaque, 0);
> +    acb->common.cb(acb->common.opaque, acb->ret);
> +    qemu_iovec_destroy(&acb->hd_qiov);
> +    qemu_aio_release(acb);
>   }
>
>   static int qcow2_schedule_bh(QEMUBHFunc *cb, QCowAIOCB *acb)
> @@ -404,14 +406,13 @@ static int qcow2_schedule_bh(QEMUBHFunc *cb, QCowAIOCB *acb)
>       return 0;
>   }
>
> -static void qcow2_aio_read_cb(void *opaque, int ret)
> +static int coroutine_fn qcow2_aio_read_cb(void *opaque, int ret)
>   {
>       QCowAIOCB *acb = opaque;
>       BlockDriverState *bs = acb->common.bs;
>       BDRVQcowState *s = bs->opaque;
>       int index_in_cluster, n1;
>
> -    acb->hd_aiocb = NULL;
>       if (ret<  0)
>           goto done;
>
> @@ -469,22 +470,13 @@ static void qcow2_aio_read_cb(void *opaque, int ret)
>                   acb->sector_num, acb->cur_nr_sectors);
>               if (n1>  0) {
>                   BLKDBG_EVENT(bs->file, BLKDBG_READ_BACKING_AIO);
> -                acb->hd_aiocb = bdrv_aio_readv(bs->backing_hd, acb->sector_num,
> -&acb->hd_qiov, acb->cur_nr_sectors,
> -				    qcow2_aio_read_cb, acb);
> -                if (acb->hd_aiocb == NULL)
> -                    goto done;
> -            } else {
> -                ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
> -                if (ret<  0)
> +                ret = bdrv_co_readv(bs->backing_hd, acb->sector_num,&acb->hd_qiov, acb->cur_nr_sectors);
> +                if (ret<  0) {
>                       goto done;
> +                }
>               }
>           } else {
> -            /* Note: in this case, no need to wait */
>               qemu_iovec_memset(&acb->hd_qiov, 0, 512 * acb->cur_nr_sectors);
> -            ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
> -            if (ret<  0)
> -                goto done;
>           }
>       } else if (acb->cluster_offset&  QCOW_OFLAG_COMPRESSED) {
>           /* add AIO support for compressed blocks ? */
> @@ -494,10 +486,6 @@ static void qcow2_aio_read_cb(void *opaque, int ret)
>           qemu_iovec_from_buffer(&acb->hd_qiov,
>               s->cluster_cache + index_in_cluster * 512,
>               512 * acb->cur_nr_sectors);
> -
> -        ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
> -        if (ret<  0)
> -            goto done;
>       } else {
>           if ((acb->cluster_offset&  511) != 0) {
>               ret = -EIO;
> @@ -522,34 +510,50 @@ static void qcow2_aio_read_cb(void *opaque, int ret)
>           }
>
>           BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
> -        acb->hd_aiocb = bdrv_aio_readv(bs->file,
> +        ret = bdrv_co_readv(bs->file,
>                               (acb->cluster_offset>>  9) + index_in_cluster,
> -&acb->hd_qiov, acb->cur_nr_sectors,
> -                            qcow2_aio_read_cb, acb);
> -        if (acb->hd_aiocb == NULL) {
> -            ret = -EIO;
> +&acb->hd_qiov, acb->cur_nr_sectors);
> +        if (ret<  0) {
>               goto done;
>           }
>       }
>
> -    return;
> +    return 1;
>   done:
> -    acb->common.cb(acb->common.opaque, ret);
> -    qemu_iovec_destroy(&acb->hd_qiov);
> -    qemu_aio_release(acb);
> +    acb->ret = ret;
> +    qcow2_schedule_bh(qcow2_aio_bh, acb);
> +    return 0;
> +}
> +
> +static void * coroutine_fn qcow2_co_read(void *opaque)
> +{
> +    QCowAIOCB *acb = opaque;
> +
> +    while (qcow2_aio_read_cb(acb, 0)) {
> +    }
> +    return NULL;
> +}
> +
> +static int coroutine_fn qcow2_aio_write_cb(void *opaque, int ret);
> +static void * coroutine_fn qcow2_co_write(void *opaque)
> +{
> +    QCowAIOCB *acb = opaque;
> +
> +    while (qcow2_aio_write_cb(acb, 0)) {
> +    }
> +    return NULL;
>   }
>
> -static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
> -                                  QEMUIOVector *qiov, int nb_sectors,
> -                                  BlockDriverCompletionFunc *cb,
> -                                  void *opaque, int is_write)
> +static BlockDriverAIOCB *qcow2_aio_setup(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque, int is_write)
>   {
>       QCowAIOCB *acb;
> +    Coroutine *coroutine;
>
>       acb = qemu_aio_get(&qcow2_aio_pool, bs, cb, opaque);
>       if (!acb)
>           return NULL;
> -    acb->hd_aiocb = NULL;
>       acb->sector_num = sector_num;
>       acb->qiov = qiov;
>
> @@ -561,7 +565,12 @@ static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
>       acb->cluster_offset = 0;
>       acb->l2meta.nb_clusters = 0;
>       QLIST_INIT(&acb->l2meta.dependent_requests);
> -    return acb;
> +
> +    coroutine = qemu_coroutine_create(is_write ? qcow2_co_write
> +                                               : qcow2_co_read);
> +    acb->coroutine = coroutine;
> +    qemu_coroutine_enter(coroutine, acb);
> +    return&acb->common;
>   }
>
>   static BlockDriverAIOCB *qcow2_aio_readv(BlockDriverState *bs,
> @@ -570,38 +579,48 @@ static BlockDriverAIOCB *qcow2_aio_readv(BlockDriverState *bs,
>                                            BlockDriverCompletionFunc *cb,
>                                            void *opaque)
>   {
> -    QCowAIOCB *acb;
> -
> -    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> -    if (!acb)
> -        return NULL;
> -
> -    qcow2_aio_read_cb(acb, 0);
> -    return&acb->common;
> +    return qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
>   }
>
> -static void qcow2_aio_write_cb(void *opaque, int ret);
> -
> -static void run_dependent_requests(QCowL2Meta *m)
> +static void qcow2_co_run_dependent_requests(void *opaque)
>   {
> +    QCowAIOCB *acb = opaque;
>       QCowAIOCB *req;
>       QCowAIOCB *next;
>
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +
> +    /* Restart all dependent requests */
> +    QLIST_FOREACH_SAFE(req,&acb->l2meta.dependent_requests, next_depend, next) {
> +        qemu_coroutine_enter(req->coroutine, NULL);
> +    }
> +
> +    /* Reenter the original request */
> +    qemu_coroutine_enter(acb->coroutine, NULL);
> +}
> +
> +static void run_dependent_requests(QCowL2Meta *m)
> +{
>       /* Take the request off the list of running requests */
>       if (m->nb_clusters != 0) {
>           QLIST_REMOVE(m, next_in_flight);
>       }
>
> -    /* Restart all dependent requests */
> -    QLIST_FOREACH_SAFE(req,&m->dependent_requests, next_depend, next) {
> -        qcow2_aio_write_cb(req, 0);
> +    if (!QLIST_EMPTY(&m->dependent_requests)) {
> +        /* TODO This is a hack to get at the acb, may not be correct if called
> +         * with a QCowL2Meta that is not part of a QCowAIOCB.
> +         */
> +        QCowAIOCB *acb = container_of(m, QCowAIOCB, l2meta);
> +        qcow2_schedule_bh(qcow2_co_run_dependent_requests, acb);
> +        qemu_coroutine_yield(NULL);
>       }
>
>       /* Empty the list for the next part of the request */
>       QLIST_INIT(&m->dependent_requests);
>   }
>
> -static void qcow2_aio_write_cb(void *opaque, int ret)
> +static int coroutine_fn qcow2_aio_write_cb(void *opaque, int ret)
>   {
>       QCowAIOCB *acb = opaque;
>       BlockDriverState *bs = acb->common.bs;
> @@ -609,8 +628,6 @@ static void qcow2_aio_write_cb(void *opaque, int ret)
>       int index_in_cluster;
>       int n_end;
>
> -    acb->hd_aiocb = NULL;
> -
>       if (ret>= 0) {
>           ret = qcow2_alloc_cluster_link_l2(bs,&acb->l2meta);
>       }
> @@ -648,7 +665,8 @@ static void qcow2_aio_write_cb(void *opaque, int ret)
>       if (acb->l2meta.nb_clusters == 0&&  acb->l2meta.depends_on != NULL) {
>           QLIST_INSERT_HEAD(&acb->l2meta.depends_on->dependent_requests,
>               acb, next_depend);
> -        return;
> +        qemu_coroutine_yield(NULL);
> +        return 1;
>       }
>
>       assert((acb->cluster_offset&  511) == 0);
> @@ -675,25 +693,22 @@ static void qcow2_aio_write_cb(void *opaque, int ret)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
> -    acb->hd_aiocb = bdrv_aio_writev(bs->file,
> -                                    (acb->cluster_offset>>  9) + index_in_cluster,
> -&acb->hd_qiov, acb->cur_nr_sectors,
> -                                    qcow2_aio_write_cb, acb);
> -    if (acb->hd_aiocb == NULL) {
> -        ret = -EIO;
> +    ret = bdrv_co_writev(bs->file,
> +                         (acb->cluster_offset>>  9) + index_in_cluster,
> +&acb->hd_qiov, acb->cur_nr_sectors);
> +    if (ret<  0) {
>           goto fail;
>       }
> -
> -    return;
> +    return 1;
>
>   fail:
>       if (acb->l2meta.nb_clusters != 0) {
>           QLIST_REMOVE(&acb->l2meta, next_in_flight);
>       }
>   done:
> -    acb->common.cb(acb->common.opaque, ret);
> -    qemu_iovec_destroy(&acb->hd_qiov);
> -    qemu_aio_release(acb);
> +    acb->ret = ret;
> +    qcow2_schedule_bh(qcow2_aio_bh, acb);
> +    return 0;
>   }
>
>   static BlockDriverAIOCB *qcow2_aio_writev(BlockDriverState *bs,
> @@ -703,16 +718,10 @@ static BlockDriverAIOCB *qcow2_aio_writev(BlockDriverState *bs,
>                                             void *opaque)
>   {
>       BDRVQcowState *s = bs->opaque;
> -    QCowAIOCB *acb;
>
>       s->cluster_cache_offset = -1; /* disable compressed cache */
>
> -    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> -    if (!acb)
> -        return NULL;
> -
> -    qcow2_aio_write_cb(acb, 0);
> -    return&acb->common;
> +    return qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
>   }
>
>   static void qcow2_close(BlockDriverState *bs)
> @@ -824,6 +833,7 @@ static int qcow2_change_backing_file(BlockDriverState *bs,
>       return qcow2_update_ext_header(bs, backing_file, backing_fmt);
>   }
>
> +/* TODO did we break this for coroutines? */
>   static int preallocate(BlockDriverState *bs)
>   {
>       uint64_t nb_sectors;
>
Stefan Hajnoczi Jan. 24, 2011, 11:09 a.m. UTC | #2
On Sun, Jan 23, 2011 at 11:40 PM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 01/22/2011 03:29 AM, Stefan Hajnoczi wrote:
>>
>> Converting qcow2 to use coroutines is fairly simple since most of qcow2
>> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
>> transparently work when called from a coroutine, so all the synchronous
>> code just works.
>>
>> The explicitly asynchronous code is adjusted to repeatedly call
>> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
>> At that point the coroutine will return from its entry function and its
>> resources are freed.
>>
>> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
>> from a BH.  This is necessary since the user callback code does not
>> expect to be executed from a coroutine.
>>
>> This conversion is not completely correct because the safety the
>> synchronous code does not carry over to the coroutine version.
>> Previously, a synchronous code path could assume that it will never be
>> interleaved with another request executing.  This is no longer true
>> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
>> other requests can be processed during that time.
>>
>> The solution is to carefully introduce checks so that pending requests
>> do not step on each other's toes.  That is left for a future patch...
>>
>> Signed-off-by: Stefan Hajnoczi<stefanha@linux.vnet.ibm.com>
>>
>
> As an alternative approach, could we trap async calls from the block device,
> implement them in a synchronous fashion, then issue the callback
> immediately?
>
> This would mean that qcow_aio_write() would become fully synchronous which
> also means that you can track when the operation is completed entirely
> within the block layer.  IOW, it should be possible to do this with almost
> no change to qcow2.

I'm not sure I understand what you're suggesting.  Right now
bdrv_read() for coroutines is implemented on top of bdrv_aio_readv().
And bdrv_pread() is implemented on top of bdrv_read().  It doesn't
make sense to me to implement bdrv_aio_readv() in terms of
bdrv_read().  Also is it safe to invoke the callback without a BH?

> I think this is the right approach too.  If we're using coroutines, we
> shouldn't do anything asynchronous in the image formats.  The good bit about
> this is that we can probably dramatically simplify the block layer API but
> eliminating the sync/async versions of everything.

Hardware emulation needs the asynchronous API so I don't think we can
get rid of bdrv_aio_readv(), bdrv_aio_writev(), and bdrv_aio_flush()
completely.  IDE and SCSI also like to be able to cancel their aio
requests.

Non-invasive coroutines support in the block layer will allow us to
easily make the more obscure image formats asynchronous too :).

Stefan
Avi Kivity Jan. 26, 2011, 3:40 p.m. UTC | #3
On 01/22/2011 11:29 AM, Stefan Hajnoczi wrote:
> Converting qcow2 to use coroutines is fairly simple since most of qcow2
> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
> transparently work when called from a coroutine, so all the synchronous
> code just works.
>
> The explicitly asynchronous code is adjusted to repeatedly call
> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
> At that point the coroutine will return from its entry function and its
> resources are freed.
>
> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
> from a BH.  This is necessary since the user callback code does not
> expect to be executed from a coroutine.
>
> This conversion is not completely correct because the safety the
> synchronous code does not carry over to the coroutine version.
> Previously, a synchronous code path could assume that it will never be
> interleaved with another request executing.  This is no longer true
> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
> other requests can be processed during that time.
>
> The solution is to carefully introduce checks so that pending requests
> do not step on each other's toes.  That is left for a future patch...
>


The way I thought of doing this is:

qcow_aio_write(...)
{
     execute_in_coroutine {
         co_mutex_lock(&bs->mutex);
         do_qcow_aio_write(...); // original qcow code
         co_mutex_release(&bs->mutex);
     }
}

(similar changes for the the other callbacks)

if the code happens to be asynchronous (no metadata changes), we'll take 
the mutex and release it immediately after submitting the I/O, so no 
extra serialization happens.  If the code does issue a synchronous 
metadata write, we'll lock out all other operations on the same block 
device, but still allow the vcpu to execute, since all the locking 
happens in a coroutine.

Essentially, a mutex becomes the dependency tracking mechnism.  A global 
mutex means all synchronous operations are dependent.  Later, we can 
convert the metadata cache entry dependency lists to local mutexes 
inside the cache entry structures.
Kevin Wolf Jan. 26, 2011, 3:50 p.m. UTC | #4
Am 26.01.2011 16:40, schrieb Avi Kivity:
> On 01/22/2011 11:29 AM, Stefan Hajnoczi wrote:
>> Converting qcow2 to use coroutines is fairly simple since most of qcow2
>> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
>> transparently work when called from a coroutine, so all the synchronous
>> code just works.
>>
>> The explicitly asynchronous code is adjusted to repeatedly call
>> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
>> At that point the coroutine will return from its entry function and its
>> resources are freed.
>>
>> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
>> from a BH.  This is necessary since the user callback code does not
>> expect to be executed from a coroutine.
>>
>> This conversion is not completely correct because the safety the
>> synchronous code does not carry over to the coroutine version.
>> Previously, a synchronous code path could assume that it will never be
>> interleaved with another request executing.  This is no longer true
>> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
>> other requests can be processed during that time.
>>
>> The solution is to carefully introduce checks so that pending requests
>> do not step on each other's toes.  That is left for a future patch...
> 
> The way I thought of doing this is:
> 
> qcow_aio_write(...)
> {
>      execute_in_coroutine {
>          co_mutex_lock(&bs->mutex);
>          do_qcow_aio_write(...); // original qcow code
>          co_mutex_release(&bs->mutex);
>      }
> }
> 
> (similar changes for the the other callbacks)
> 
> if the code happens to be asynchronous (no metadata changes), we'll take 
> the mutex and release it immediately after submitting the I/O, so no 
> extra serialization happens.  If the code does issue a synchronous 
> metadata write, we'll lock out all other operations on the same block 
> device, but still allow the vcpu to execute, since all the locking 
> happens in a coroutine.
> 
> Essentially, a mutex becomes the dependency tracking mechnism.  A global 
> mutex means all synchronous operations are dependent.  Later, we can 
> convert the metadata cache entry dependency lists to local mutexes 
> inside the cache entry structures.

I thought a bit about it since you mentioned it in the call yesterday
and I think this approach makes sense. Even immediately after the
conversion we should be in a better state than with Stefan's approach
because I/O without metadata disk access won't be serialized.

In the other thread you mentioned that you have written some code
independently. Do you have it in some public git repository?

Kevin
Anthony Liguori Jan. 26, 2011, 4:08 p.m. UTC | #5
On 01/26/2011 09:50 AM, Kevin Wolf wrote:
> Am 26.01.2011 16:40, schrieb Avi Kivity:
>    
>> On 01/22/2011 11:29 AM, Stefan Hajnoczi wrote:
>>      
>>> Converting qcow2 to use coroutines is fairly simple since most of qcow2
>>> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
>>> transparently work when called from a coroutine, so all the synchronous
>>> code just works.
>>>
>>> The explicitly asynchronous code is adjusted to repeatedly call
>>> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
>>> At that point the coroutine will return from its entry function and its
>>> resources are freed.
>>>
>>> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
>>> from a BH.  This is necessary since the user callback code does not
>>> expect to be executed from a coroutine.
>>>
>>> This conversion is not completely correct because the safety the
>>> synchronous code does not carry over to the coroutine version.
>>> Previously, a synchronous code path could assume that it will never be
>>> interleaved with another request executing.  This is no longer true
>>> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
>>> other requests can be processed during that time.
>>>
>>> The solution is to carefully introduce checks so that pending requests
>>> do not step on each other's toes.  That is left for a future patch...
>>>        
>> The way I thought of doing this is:
>>
>> qcow_aio_write(...)
>> {
>>       execute_in_coroutine {
>>           co_mutex_lock(&bs->mutex);
>>           do_qcow_aio_write(...); // original qcow code
>>           co_mutex_release(&bs->mutex);
>>      

The release has to be executed in the call back.

I think it's a bit nicer to not do a mutex, but rather to have a notion 
of freezing/unfreezing the block queue and instead do:

completion() {
    bdrv_unfreeze(bs);
}

coroutine {
    bdrv_freeze(bs);
    do_qcow_aio_write(completion);
}

Freeze/unfreeze is useful in a number of other places too (like 
snapshotting).

Regards,

Anthony Liguori

>>       }
>> }
>>
>> (similar changes for the the other callbacks)
>>
>> if the code happens to be asynchronous (no metadata changes), we'll take
>> the mutex and release it immediately after submitting the I/O, so no
>> extra serialization happens.  If the code does issue a synchronous
>> metadata write, we'll lock out all other operations on the same block
>> device, but still allow the vcpu to execute, since all the locking
>> happens in a coroutine.
>>
>> Essentially, a mutex becomes the dependency tracking mechnism.  A global
>> mutex means all synchronous operations are dependent.  Later, we can
>> convert the metadata cache entry dependency lists to local mutexes
>> inside the cache entry structures.
>>      
> I thought a bit about it since you mentioned it in the call yesterday
> and I think this approach makes sense. Even immediately after the
> conversion we should be in a better state than with Stefan's approach
> because I/O without metadata disk access won't be serialized.
>
> In the other thread you mentioned that you have written some code
> independently. Do you have it in some public git repository?
>
> Kevin
>
>
Avi Kivity Jan. 26, 2011, 4:08 p.m. UTC | #6
On 01/26/2011 05:50 PM, Kevin Wolf wrote:
> In the other thread you mentioned that you have written some code
> independently. Do you have it in some public git repository?

I've written it mostly in my mind... IIRC I have the basic coroutine 
equivalents (in my series they are simply threads, without an explicit 
yield_to, but basically the same thing).  I've started work on making 
aio emulation use coroutines/uthreads, so vmdk and related automatically 
become non-blocking (but all requests serialize against each other 
against a built in BlockDriverState::mutex).  I have done nothing on 
qcow2 beyond figuring out how to do it (which I outlined above).
Avi Kivity Jan. 26, 2011, 4:13 p.m. UTC | #7
On 01/26/2011 06:08 PM, Anthony Liguori wrote:
> On 01/26/2011 09:50 AM, Kevin Wolf wrote:
>> Am 26.01.2011 16:40, schrieb Avi Kivity:
>>> On 01/22/2011 11:29 AM, Stefan Hajnoczi wrote:
>>>> Converting qcow2 to use coroutines is fairly simple since most of 
>>>> qcow2
>>>> is synchronous.  The synchronous I/O functions likes bdrv_pread() now
>>>> transparently work when called from a coroutine, so all the 
>>>> synchronous
>>>> code just works.
>>>>
>>>> The explicitly asynchronous code is adjusted to repeatedly call
>>>> qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request 
>>>> completes.
>>>> At that point the coroutine will return from its entry function and 
>>>> its
>>>> resources are freed.
>>>>
>>>> The bdrv_aio_readv() and bdrv_aio_writev() user callback is now 
>>>> invoked
>>>> from a BH.  This is necessary since the user callback code does not
>>>> expect to be executed from a coroutine.
>>>>
>>>> This conversion is not completely correct because the safety the
>>>> synchronous code does not carry over to the coroutine version.
>>>> Previously, a synchronous code path could assume that it will never be
>>>> interleaved with another request executing.  This is no longer true
>>>> because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield 
>>>> and
>>>> other requests can be processed during that time.
>>>>
>>>> The solution is to carefully introduce checks so that pending requests
>>>> do not step on each other's toes.  That is left for a future patch...
>>> The way I thought of doing this is:
>>>
>>> qcow_aio_write(...)
>>> {
>>>       execute_in_coroutine {
>>>           co_mutex_lock(&bs->mutex);
>>>           do_qcow_aio_write(...); // original qcow code
>>>           co_mutex_release(&bs->mutex);
>
> The release has to be executed in the call back.
>
> I think it's a bit nicer to not do a mutex, but rather to have a 
> notion of freezing/unfreezing the block queue and instead do:
>
> completion() {
>    bdrv_unfreeze(bs);
> }
>
> coroutine {
>    bdrv_freeze(bs);
>    do_qcow_aio_write(completion);
> }
>
> Freeze/unfreeze is useful in a number of other places too (like 
> snapshotting).

Serializing against a global mutex has the advantage that it can be 
treated as a global lock that is decomposed into fine-grained locks.

For example, we can start the code conversion from an explict async 
model to a threaded sync model by converting the mutex into a 
shared/exclusive lock.  Operations like read and write take the lock for 
shared access (and take a fine-grained mutex on the metadata cache 
entry), while operation like creating a snapshot take the lock for 
exclusive access.  That doesn't work with freeze/thaw.
Anthony Liguori Jan. 26, 2011, 4:28 p.m. UTC | #8
On 01/26/2011 10:13 AM, Avi Kivity wrote:
> Serializing against a global mutex has the advantage that it can be 
> treated as a global lock that is decomposed into fine-grained locks.
>
> For example, we can start the code conversion from an explict async 
> model to a threaded sync model by converting the mutex into a 
> shared/exclusive lock.  Operations like read and write take the lock 
> for shared access (and take a fine-grained mutex on the metadata cache 
> entry), while operation like creating a snapshot take the lock for 
> exclusive access.  That doesn't work with freeze/thaw.

The trouble with this is that you increase the amount of re-entrance 
whereas freeze/thaw doesn't.

The code from the beginning of the request to where the mutex is 
acquired will be executed for every single request even while requests 
are blocked at the mutex acquisition.

With freeze/thaw, you freeze the queue and prevent any request from 
starting until you thaw.  You only thaw and return control to allow 
another request to execute when you begin executing an asynchronous I/O 
callback.

I think my previous example was wrong, you really want to do:

qcow2_aio_writev() {
    coroutine {
        freeze();
        sync_io(); // existing qcow2 code
        thaw();
        // existing non I/O code
        bdrv_aio_writev(callback); // no explicit freeze/thaw needed
    }
}

This is equivalent to our existing code because no new re-entrance is 
introduced.  The only re-entrancy points are in the 
bdrv_aio_{readv,writev} calls.

Regards,

Anthony Liguori
Avi Kivity Jan. 26, 2011, 4:38 p.m. UTC | #9
On 01/26/2011 06:28 PM, Anthony Liguori wrote:
> On 01/26/2011 10:13 AM, Avi Kivity wrote:
>> Serializing against a global mutex has the advantage that it can be 
>> treated as a global lock that is decomposed into fine-grained locks.
>>
>> For example, we can start the code conversion from an explict async 
>> model to a threaded sync model by converting the mutex into a 
>> shared/exclusive lock.  Operations like read and write take the lock 
>> for shared access (and take a fine-grained mutex on the metadata 
>> cache entry), while operation like creating a snapshot take the lock 
>> for exclusive access.  That doesn't work with freeze/thaw.
>
> The trouble with this is that you increase the amount of re-entrance 
> whereas freeze/thaw doesn't.
>
> The code from the beginning of the request to where the mutex is 
> acquired will be executed for every single request even while requests 
> are blocked at the mutex acquisition.

It's just a few instructions.

>
> With freeze/thaw, you freeze the queue and prevent any request from 
> starting until you thaw.  You only thaw and return control to allow 
> another request to execute when you begin executing an asynchronous 
> I/O callback.
>

What do you actually save?  The longjmp() to the coroutine code, linking 
in to the mutex wait queue, and another switch back to the main 
coroutine?  Given that we don't expect to block often, it seems hardly a 
cost worth optimizing.

> I think my previous example was wrong, you really want to do:
>
> qcow2_aio_writev() {
>    coroutine {
>        freeze();
>        sync_io(); // existing qcow2 code
>        thaw();
>        // existing non I/O code
>        bdrv_aio_writev(callback); // no explicit freeze/thaw needed
>    }
> }
>
> This is equivalent to our existing code because no new re-entrance is 
> introduced.  The only re-entrancy points are in the 
> bdrv_aio_{readv,writev} calls.

This requires you to know which code is sync, and which code is async.  
My conversion allows you to wrap the code blindly with a mutex, and have 
it do the right thing automatically.  This is most useful where the code 
can be sync or async depending on data (which is the case for qcow2).
Anthony Liguori Jan. 26, 2011, 5:12 p.m. UTC | #10
On 01/26/2011 10:38 AM, Avi Kivity wrote:
> On 01/26/2011 06:28 PM, Anthony Liguori wrote:
>> On 01/26/2011 10:13 AM, Avi Kivity wrote:
>>> Serializing against a global mutex has the advantage that it can be 
>>> treated as a global lock that is decomposed into fine-grained locks.
>>>
>>> For example, we can start the code conversion from an explict async 
>>> model to a threaded sync model by converting the mutex into a 
>>> shared/exclusive lock.  Operations like read and write take the lock 
>>> for shared access (and take a fine-grained mutex on the metadata 
>>> cache entry), while operation like creating a snapshot take the lock 
>>> for exclusive access.  That doesn't work with freeze/thaw.
>>
>> The trouble with this is that you increase the amount of re-entrance 
>> whereas freeze/thaw doesn't.
>>
>> The code from the beginning of the request to where the mutex is 
>> acquired will be executed for every single request even while 
>> requests are blocked at the mutex acquisition.
>
> It's just a few instructions.
>
>>
>> With freeze/thaw, you freeze the queue and prevent any request from 
>> starting until you thaw.  You only thaw and return control to allow 
>> another request to execute when you begin executing an asynchronous 
>> I/O callback.
>>
>
> What do you actually save?  The longjmp() to the coroutine code, 
> linking in to the mutex wait queue, and another switch back to the 
> main coroutine?  Given that we don't expect to block often, it seems 
> hardly a cost worth optimizing.

It's a matter of correctness, not optimization.

Consider the following example:

coroutine {
    l2 = find_l2(offset);
    // allocates but does not increment max cluster offset
    l2[l2_offset(offset)] = alloc_cluster();

    co_mutex_lock(&lock);
    write_l2(l2);
    co_mutex_unlock(&lock);

    l1[l1_offset(offset)] = l2;

    co_mutex_lock(&lock);
    write_l1(l1);
    co_mutex_unlock(&lock);

    commit_cluster(l2[l2_offset(offset)]);
}

This code is incorrect.  The code before the first co_mutex_lock() may 
be called a dozen times before before anyone finishes this routine.  
That means the same cluster is reused multiple times.

This code was correct before we introduced coroutines because we 
effectively had one big lock around the whole thing.

Regards,

Anthony Liguori

>
>> I think my previous example was wrong, you really want to do:
>>
>> qcow2_aio_writev() {
>>    coroutine {
>>        freeze();
>>        sync_io(); // existing qcow2 code
>>        thaw();
>>        // existing non I/O code
>>        bdrv_aio_writev(callback); // no explicit freeze/thaw needed
>>    }
>> }
>>
>> This is equivalent to our existing code because no new re-entrance is 
>> introduced.  The only re-entrancy points are in the 
>> bdrv_aio_{readv,writev} calls.
>
> This requires you to know which code is sync, and which code is 
> async.  My conversion allows you to wrap the code blindly with a 
> mutex, and have it do the right thing automatically.  This is most 
> useful where the code can be sync or async depending on data (which is 
> the case for qcow2).
>
Avi Kivity Jan. 27, 2011, 9:25 a.m. UTC | #11
On 01/26/2011 07:12 PM, Anthony Liguori wrote:
>> What do you actually save?  The longjmp() to the coroutine code, 
>> linking in to the mutex wait queue, and another switch back to the 
>> main coroutine?  Given that we don't expect to block often, it seems 
>> hardly a cost worth optimizing.
>
>
> It's a matter of correctness, not optimization.
>
> Consider the following example:
>
> coroutine {
>    l2 = find_l2(offset);
>    // allocates but does not increment max cluster offset
>    l2[l2_offset(offset)] = alloc_cluster();
>
>    co_mutex_lock(&lock);
>    write_l2(l2);
>    co_mutex_unlock(&lock);
>
>    l1[l1_offset(offset)] = l2;
>
>    co_mutex_lock(&lock);
>    write_l1(l1);
>    co_mutex_unlock(&lock);
>
>    commit_cluster(l2[l2_offset(offset)]);
> }
>
> This code is incorrect.


Yes it's incorrect, but it isn't what I'm proposing.
Kevin Wolf Jan. 27, 2011, 9:27 a.m. UTC | #12
Am 26.01.2011 18:12, schrieb Anthony Liguori:
> On 01/26/2011 10:38 AM, Avi Kivity wrote:
>> On 01/26/2011 06:28 PM, Anthony Liguori wrote:
>>> On 01/26/2011 10:13 AM, Avi Kivity wrote:
>>>> Serializing against a global mutex has the advantage that it can be 
>>>> treated as a global lock that is decomposed into fine-grained locks.
>>>>
>>>> For example, we can start the code conversion from an explict async 
>>>> model to a threaded sync model by converting the mutex into a 
>>>> shared/exclusive lock.  Operations like read and write take the lock 
>>>> for shared access (and take a fine-grained mutex on the metadata 
>>>> cache entry), while operation like creating a snapshot take the lock 
>>>> for exclusive access.  That doesn't work with freeze/thaw.
>>>
>>> The trouble with this is that you increase the amount of re-entrance 
>>> whereas freeze/thaw doesn't.
>>>
>>> The code from the beginning of the request to where the mutex is 
>>> acquired will be executed for every single request even while 
>>> requests are blocked at the mutex acquisition.
>>
>> It's just a few instructions.
>>
>>>
>>> With freeze/thaw, you freeze the queue and prevent any request from 
>>> starting until you thaw.  You only thaw and return control to allow 
>>> another request to execute when you begin executing an asynchronous 
>>> I/O callback.
>>>
>>
>> What do you actually save?  The longjmp() to the coroutine code, 
>> linking in to the mutex wait queue, and another switch back to the 
>> main coroutine?  Given that we don't expect to block often, it seems 
>> hardly a cost worth optimizing.

And if you cared, you could probably optimize it in the mutex
implementation.

> It's a matter of correctness, not optimization.
> 
> Consider the following example:
> 
> coroutine {
>     l2 = find_l2(offset);
>     // allocates but does not increment max cluster offset
>     l2[l2_offset(offset)] = alloc_cluster();
> 
>     co_mutex_lock(&lock);
>     write_l2(l2);
>     co_mutex_unlock(&lock);
> 
>     l1[l1_offset(offset)] = l2;
> 
>     co_mutex_lock(&lock);
>     write_l1(l1);
>     co_mutex_unlock(&lock);
> 
>     commit_cluster(l2[l2_offset(offset)]);
> }
> 
> This code is incorrect.  The code before the first co_mutex_lock() may 
> be called a dozen times before before anyone finishes this routine.  
> That means the same cluster is reused multiple times.

But this is not a new problem. In qcow2 we have this today:

cluster_offset = alloc_cluster();
bdrv_aio_write()
...
update_l2()

There is already code to handle this case, even though it could probably
be simplified for coroutines. I don't think we want to make the code
worse during the conversion by basically serializing everything.

> This code was correct before we introduced coroutines because we 
> effectively had one big lock around the whole thing.

So what's the lesson to learn? You need to take care when you convert a
big lock into smaller ones?

>>> I think my previous example was wrong, you really want to do:
>>>
>>> qcow2_aio_writev() {
>>>    coroutine {
>>>        freeze();
>>>        sync_io(); // existing qcow2 code
>>>        thaw();
>>>        // existing non I/O code
>>>        bdrv_aio_writev(callback); // no explicit freeze/thaw needed
>>>    }
>>> }
>>>
>>> This is equivalent to our existing code because no new re-entrance is 
>>> introduced.  The only re-entrancy points are in the 
>>> bdrv_aio_{readv,writev} calls.
>>
>> This requires you to know which code is sync, and which code is 
>> async.  My conversion allows you to wrap the code blindly with a 
>> mutex, and have it do the right thing automatically.  This is most 
>> useful where the code can be sync or async depending on data (which is 
>> the case for qcow2).

Well, but in the case of qcow2, you don't want to have a big mutex
around everything. We perfectly know which parts are asynchronous and
which are synchronous, so we'd want to do it finer grained from the
beginning.

Kevin
Avi Kivity Jan. 27, 2011, 9:49 a.m. UTC | #13
On 01/27/2011 11:27 AM, Kevin Wolf wrote:
> Well, but in the case of qcow2, you don't want to have a big mutex
> around everything. We perfectly know which parts are asynchronous and
> which are synchronous, so we'd want to do it finer grained from the
> beginning.

Yes we do.  And the way I proposed it, the new mutex does not introduce 
any new serialization.

To repeat, for every qcow2 callback or completion X (not qcow2 read or 
write operation), we transform it in the following manner:

1. Rename X into do_X.  If X is called directly from within qcow2, 
change the call to do_x.  Create a new X which simply calls do_X().  
This change is purely textual and doesn't affect runtime at all.

2. Change X to

X()
{
    create a coroutine
    pack arguments to X in a structure
    schedule the coroutine to execute a new function call_X with the 
structure as argument
    wait for the coroutine to complete
    unpack the result (if any)
    dispose of the coroutine
    return the result
}

call_X()
{
    unpack arguments
    co_mutex_lock(&bs->mutex)
    result = do_X()
    co_mutex_unlock(&bs->mutex)
    pack result
}

(in the case of aio_X callbacks, we return a fake AIOCB:

aio_X()
{
       allocate AIOCB
       allocate coroutine
       pack arguments to X, and fake AIOCB in a structure
       schedule the coroutine to execute call_X with the structure as 
argument
       return fake AIOCB
}

call_aio_X()
{
       unpack arguments
       co_mutex_lock(&bs->mutex)
       do_X()
       co_mutex_unlock(&bs->mutex)
}

and change the completion to complete the fake AIOCB
)

The result of this transformation is:

- if X1 executed without blocking, it still executes without blocking, 
except for the case below
- if X2 blocked, it will serialize any X1 which doesn't, but it will no 
longer block the vcpu thread

We could do this transformation in the block layer, as it doesn't 
involve qcow2 at all.  I don't think that's a good idea, though, as 
qcow2 would be the only beneficiary and it would be harder to further 
evolve qcow2.
Stefan Hajnoczi Jan. 27, 2011, 10:09 a.m. UTC | #14
On Wed, Jan 26, 2011 at 05:40:27PM +0200, Avi Kivity wrote:
> On 01/22/2011 11:29 AM, Stefan Hajnoczi wrote:
> >Converting qcow2 to use coroutines is fairly simple since most of qcow2
> >is synchronous.  The synchronous I/O functions likes bdrv_pread() now
> >transparently work when called from a coroutine, so all the synchronous
> >code just works.
> >
> >The explicitly asynchronous code is adjusted to repeatedly call
> >qcow2_aio_read_cb() or qcow2_aio_write_cb() until the request completes.
> >At that point the coroutine will return from its entry function and its
> >resources are freed.
> >
> >The bdrv_aio_readv() and bdrv_aio_writev() user callback is now invoked
> >from a BH.  This is necessary since the user callback code does not
> >expect to be executed from a coroutine.
> >
> >This conversion is not completely correct because the safety the
> >synchronous code does not carry over to the coroutine version.
> >Previously, a synchronous code path could assume that it will never be
> >interleaved with another request executing.  This is no longer true
> >because bdrv_pread() and bdrv_pwrite() cause the coroutine to yield and
> >other requests can be processed during that time.
> >
> >The solution is to carefully introduce checks so that pending requests
> >do not step on each other's toes.  That is left for a future patch...
> >
> 
> 
> The way I thought of doing this is:
> 
> qcow_aio_write(...)
> {
>     execute_in_coroutine {
>         co_mutex_lock(&bs->mutex);
>         do_qcow_aio_write(...); // original qcow code
>         co_mutex_release(&bs->mutex);
>     }
> }

I think this approach makes sense as the next step towards a
fine-grained strategy.

Remember that qcow2 is not just:
qcow2_aio_write(...)
{
    sync_io(...);
    aio_write(...);
    complete_request(...); /* in callback */
}

It is actually:
qcow2_aio_write(...)
{
    sync_io(...);
    aio_write(...);
    more_sync_io(...); /* in callback */
    complete_request(...);
}

We need to make sure the synchronous I/O after aio write completion is
also guarded.

Stefan
Kevin Wolf Jan. 27, 2011, 10:34 a.m. UTC | #15
Am 27.01.2011 10:49, schrieb Avi Kivity:
> On 01/27/2011 11:27 AM, Kevin Wolf wrote:
>> Well, but in the case of qcow2, you don't want to have a big mutex
>> around everything. We perfectly know which parts are asynchronous and
>> which are synchronous, so we'd want to do it finer grained from the
>> beginning.
> 
> Yes we do.  And the way I proposed it, the new mutex does not introduce 
> any new serialization.
> 
> To repeat, for every qcow2 callback or completion X (not qcow2 read or 
> write operation), we transform it in the following manner:
> [...]

This works fine for code that is completely synchronous today (and you
can't serialize it more than it already is anyway).

It doesn't work for qemu_aio_readv/writev because these use AIO for
reading/writing the data. So you definitely need to rewrite that part,
or the AIO callback will cause the code to run outside its coroutine.
And during this rewrite you'll want to pay attention that you don't hold
the mutex for the bdrv_co_readv that was an AIO request before, or
you'll introduce additional serialization.

Kevin
Avi Kivity Jan. 27, 2011, 10:41 a.m. UTC | #16
On 01/27/2011 12:34 PM, Kevin Wolf wrote:
> Am 27.01.2011 10:49, schrieb Avi Kivity:
> >  On 01/27/2011 11:27 AM, Kevin Wolf wrote:
> >>  Well, but in the case of qcow2, you don't want to have a big mutex
> >>  around everything. We perfectly know which parts are asynchronous and
> >>  which are synchronous, so we'd want to do it finer grained from the
> >>  beginning.
> >
> >  Yes we do.  And the way I proposed it, the new mutex does not introduce
> >  any new serialization.
> >
> >  To repeat, for every qcow2 callback or completion X (not qcow2 read or
> >  write operation), we transform it in the following manner:
> >  [...]
>
> This works fine for code that is completely synchronous today (and you
> can't serialize it more than it already is anyway).
>
> It doesn't work for qemu_aio_readv/writev because these use AIO for
> reading/writing the data. So you definitely need to rewrite that part,
> or the AIO callback will cause the code to run outside its coroutine.

The callbacks need to be wrapped in the same way.  Schedule a coroutine 
to run the true callback.

> And during this rewrite you'll want to pay attention that you don't hold
> the mutex for the bdrv_co_readv that was an AIO request before, or
> you'll introduce additional serialization.

I don't follow.  Please elaborate.

The idea behind this is, we keep exactly the same serialization 
characteristics we had before.  If a aio_X callback or its intermediate 
completion executed without blocking, it will continue to do so.  
Holding a co_mutex over a non-blocking code sequence does not serialize, 
except if a blocking sequence is executing concurrently (in which case 
the serialization was present before).
Avi Kivity Jan. 27, 2011, 10:46 a.m. UTC | #17
On 01/27/2011 12:09 PM, Stefan Hajnoczi wrote:
> >
> >
> >  The way I thought of doing this is:
> >
> >  qcow_aio_write(...)
> >  {
> >      execute_in_coroutine {
> >          co_mutex_lock(&bs->mutex);
> >          do_qcow_aio_write(...); // original qcow code
> >          co_mutex_release(&bs->mutex);
> >      }
> >  }
>
> I think this approach makes sense as the next step towards a
> fine-grained strategy.
>
> Remember that qcow2 is not just:
> qcow2_aio_write(...)
> {
>      sync_io(...);
>      aio_write(...);
>      complete_request(...); /* in callback */
> }
>
> It is actually:
> qcow2_aio_write(...)
> {
>      sync_io(...);
>      aio_write(...);
>      more_sync_io(...); /* in callback */
>      complete_request(...);
> }
>
> We need to make sure the synchronous I/O after aio write completion is
> also guarded.

Correct.  Every entry point into qcow2 needs to be wrapped.

Maybe the best way forward is to implement it in the block layer.  While 
that makes additional hacking harder, it emphasises the fact that qcow2 
logic need not be changed in any way in order to remove vcpu blocking 
without compromising concurrency.

The block layer could examine BlockDriver::not_really_async, and if set, 
instead of calling BlockDriver::bdrv_aio_writev, it can call a wrapper 
which schedules a coroutine and returns a fake AIOCB.  The wrapper would 
also wrap the completion with a wrapper, so that it too would execute 
under a coroutine.  The mutex would be managed by the wrapper, and qcow2 
would be completely unchanged except for setting not_really_async.
Kevin Wolf Jan. 27, 2011, 11:27 a.m. UTC | #18
Am 27.01.2011 11:41, schrieb Avi Kivity:
> On 01/27/2011 12:34 PM, Kevin Wolf wrote:
>> Am 27.01.2011 10:49, schrieb Avi Kivity:
>>>  On 01/27/2011 11:27 AM, Kevin Wolf wrote:
>>>>  Well, but in the case of qcow2, you don't want to have a big mutex
>>>>  around everything. We perfectly know which parts are asynchronous and
>>>>  which are synchronous, so we'd want to do it finer grained from the
>>>>  beginning.
>>>
>>>  Yes we do.  And the way I proposed it, the new mutex does not introduce
>>>  any new serialization.
>>>
>>>  To repeat, for every qcow2 callback or completion X (not qcow2 read or
>>>  write operation), we transform it in the following manner:
>>>  [...]
>>
>> This works fine for code that is completely synchronous today (and you
>> can't serialize it more than it already is anyway).
>>
>> It doesn't work for qemu_aio_readv/writev because these use AIO for
>> reading/writing the data. So you definitely need to rewrite that part,
>> or the AIO callback will cause the code to run outside its coroutine.
> 
> The callbacks need to be wrapped in the same way.  Schedule a coroutine 
> to run the true callback.

Okay, I see what you're proposing. You could schedule a new coroutine
for callbacks indeed.

But I think it's actually easier to convert the bdrv_aio_readv into a
bdrv_co_readv (and by that removing the callback) and just make sure
that you don't hold the mutex during this call - basically what Stefan's
code does, just with mutexes instead of a request queue.

>> And during this rewrite you'll want to pay attention that you don't hold
>> the mutex for the bdrv_co_readv that was an AIO request before, or
>> you'll introduce additional serialization.
> 
> I don't follow.  Please elaborate.

We were thinking of different approaches. I hope it's clearer now.

Kevin
Avi Kivity Jan. 27, 2011, 12:21 p.m. UTC | #19
On 01/27/2011 01:27 PM, Kevin Wolf wrote:
> Am 27.01.2011 11:41, schrieb Avi Kivity:
> >  On 01/27/2011 12:34 PM, Kevin Wolf wrote:
> >>  Am 27.01.2011 10:49, schrieb Avi Kivity:
> >>>   On 01/27/2011 11:27 AM, Kevin Wolf wrote:
> >>>>   Well, but in the case of qcow2, you don't want to have a big mutex
> >>>>   around everything. We perfectly know which parts are asynchronous and
> >>>>   which are synchronous, so we'd want to do it finer grained from the
> >>>>   beginning.
> >>>
> >>>   Yes we do.  And the way I proposed it, the new mutex does not introduce
> >>>   any new serialization.
> >>>
> >>>   To repeat, for every qcow2 callback or completion X (not qcow2 read or
> >>>   write operation), we transform it in the following manner:
> >>>   [...]
> >>
> >>  This works fine for code that is completely synchronous today (and you
> >>  can't serialize it more than it already is anyway).
> >>
> >>  It doesn't work for qemu_aio_readv/writev because these use AIO for
> >>  reading/writing the data. So you definitely need to rewrite that part,
> >>  or the AIO callback will cause the code to run outside its coroutine.
> >
> >  The callbacks need to be wrapped in the same way.  Schedule a coroutine
> >  to run the true callback.
>
> Okay, I see what you're proposing. You could schedule a new coroutine
> for callbacks indeed.
>
> But I think it's actually easier to convert the bdrv_aio_readv into a
> bdrv_co_readv (and by that removing the callback) and just make sure
> that you don't hold the mutex during this call - basically what Stefan's
> code does, just with mutexes instead of a request queue.

My approach was for someone who isn't too familiar with qcow2 - a 
mindless conversion.  A more integrated approach is better, since it 
will lead to tighter code, and if Stefan or you are able to do it 
without impacting concurrency, I'm all for it.

> >>  And during this rewrite you'll want to pay attention that you don't hold
> >>  the mutex for the bdrv_co_readv that was an AIO request before, or
> >>  you'll introduce additional serialization.
> >
> >  I don't follow.  Please elaborate.
>
> We were thinking of different approaches. I hope it's clearer now.

I think so.
diff mbox

Patch

diff --git a/block/qcow2.c b/block/qcow2.c
index b6b094c..4b33ef3 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -361,19 +361,20 @@  typedef struct QCowAIOCB {
     uint64_t bytes_done;
     uint64_t cluster_offset;
     uint8_t *cluster_data;
-    BlockDriverAIOCB *hd_aiocb;
     QEMUIOVector hd_qiov;
     QEMUBH *bh;
     QCowL2Meta l2meta;
     QLIST_ENTRY(QCowAIOCB) next_depend;
+    Coroutine *coroutine;
+    int ret; /* return code for user callback */
 } QCowAIOCB;
 
 static void qcow2_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QCowAIOCB *acb = container_of(blockacb, QCowAIOCB, common);
-    if (acb->hd_aiocb)
-        bdrv_aio_cancel(acb->hd_aiocb);
     qemu_aio_release(acb);
+    /* XXX This function looks broken, we could be in the middle of a request
+     * and releasing the acb is not a good idea */
 }
 
 static AIOPool qcow2_aio_pool = {
@@ -381,13 +382,14 @@  static AIOPool qcow2_aio_pool = {
     .cancel             = qcow2_aio_cancel,
 };
 
-static void qcow2_aio_read_cb(void *opaque, int ret);
-static void qcow2_aio_read_bh(void *opaque)
+static void qcow2_aio_bh(void *opaque)
 {
     QCowAIOCB *acb = opaque;
     qemu_bh_delete(acb->bh);
     acb->bh = NULL;
-    qcow2_aio_read_cb(opaque, 0);
+    acb->common.cb(acb->common.opaque, acb->ret);
+    qemu_iovec_destroy(&acb->hd_qiov);
+    qemu_aio_release(acb);
 }
 
 static int qcow2_schedule_bh(QEMUBHFunc *cb, QCowAIOCB *acb)
@@ -404,14 +406,13 @@  static int qcow2_schedule_bh(QEMUBHFunc *cb, QCowAIOCB *acb)
     return 0;
 }
 
-static void qcow2_aio_read_cb(void *opaque, int ret)
+static int coroutine_fn qcow2_aio_read_cb(void *opaque, int ret)
 {
     QCowAIOCB *acb = opaque;
     BlockDriverState *bs = acb->common.bs;
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster, n1;
 
-    acb->hd_aiocb = NULL;
     if (ret < 0)
         goto done;
 
@@ -469,22 +470,13 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
                 acb->sector_num, acb->cur_nr_sectors);
             if (n1 > 0) {
                 BLKDBG_EVENT(bs->file, BLKDBG_READ_BACKING_AIO);
-                acb->hd_aiocb = bdrv_aio_readv(bs->backing_hd, acb->sector_num,
-                                    &acb->hd_qiov, acb->cur_nr_sectors,
-				    qcow2_aio_read_cb, acb);
-                if (acb->hd_aiocb == NULL)
-                    goto done;
-            } else {
-                ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
-                if (ret < 0)
+                ret = bdrv_co_readv(bs->backing_hd, acb->sector_num, &acb->hd_qiov, acb->cur_nr_sectors);
+                if (ret < 0) {
                     goto done;
+                }
             }
         } else {
-            /* Note: in this case, no need to wait */
             qemu_iovec_memset(&acb->hd_qiov, 0, 512 * acb->cur_nr_sectors);
-            ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
-            if (ret < 0)
-                goto done;
         }
     } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
         /* add AIO support for compressed blocks ? */
@@ -494,10 +486,6 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
         qemu_iovec_from_buffer(&acb->hd_qiov,
             s->cluster_cache + index_in_cluster * 512,
             512 * acb->cur_nr_sectors);
-
-        ret = qcow2_schedule_bh(qcow2_aio_read_bh, acb);
-        if (ret < 0)
-            goto done;
     } else {
         if ((acb->cluster_offset & 511) != 0) {
             ret = -EIO;
@@ -522,34 +510,50 @@  static void qcow2_aio_read_cb(void *opaque, int ret)
         }
 
         BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
-        acb->hd_aiocb = bdrv_aio_readv(bs->file,
+        ret = bdrv_co_readv(bs->file,
                             (acb->cluster_offset >> 9) + index_in_cluster,
-                            &acb->hd_qiov, acb->cur_nr_sectors,
-                            qcow2_aio_read_cb, acb);
-        if (acb->hd_aiocb == NULL) {
-            ret = -EIO;
+                            &acb->hd_qiov, acb->cur_nr_sectors);
+        if (ret < 0) {
             goto done;
         }
     }
 
-    return;
+    return 1;
 done:
-    acb->common.cb(acb->common.opaque, ret);
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    acb->ret = ret;
+    qcow2_schedule_bh(qcow2_aio_bh, acb);
+    return 0;
+}
+
+static void * coroutine_fn qcow2_co_read(void *opaque)
+{
+    QCowAIOCB *acb = opaque;
+
+    while (qcow2_aio_read_cb(acb, 0)) {
+    }
+    return NULL;
+}
+
+static int coroutine_fn qcow2_aio_write_cb(void *opaque, int ret);
+static void * coroutine_fn qcow2_co_write(void *opaque)
+{
+    QCowAIOCB *acb = opaque;
+
+    while (qcow2_aio_write_cb(acb, 0)) {
+    }
+    return NULL;
 }
 
-static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
-                                  QEMUIOVector *qiov, int nb_sectors,
-                                  BlockDriverCompletionFunc *cb,
-                                  void *opaque, int is_write)
+static BlockDriverAIOCB *qcow2_aio_setup(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int is_write)
 {
     QCowAIOCB *acb;
+    Coroutine *coroutine;
 
     acb = qemu_aio_get(&qcow2_aio_pool, bs, cb, opaque);
     if (!acb)
         return NULL;
-    acb->hd_aiocb = NULL;
     acb->sector_num = sector_num;
     acb->qiov = qiov;
 
@@ -561,7 +565,12 @@  static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
     acb->cluster_offset = 0;
     acb->l2meta.nb_clusters = 0;
     QLIST_INIT(&acb->l2meta.dependent_requests);
-    return acb;
+
+    coroutine = qemu_coroutine_create(is_write ? qcow2_co_write
+                                               : qcow2_co_read);
+    acb->coroutine = coroutine;
+    qemu_coroutine_enter(coroutine, acb);
+    return &acb->common;
 }
 
 static BlockDriverAIOCB *qcow2_aio_readv(BlockDriverState *bs,
@@ -570,38 +579,48 @@  static BlockDriverAIOCB *qcow2_aio_readv(BlockDriverState *bs,
                                          BlockDriverCompletionFunc *cb,
                                          void *opaque)
 {
-    QCowAIOCB *acb;
-
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
-    if (!acb)
-        return NULL;
-
-    qcow2_aio_read_cb(acb, 0);
-    return &acb->common;
+    return qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
 }
 
-static void qcow2_aio_write_cb(void *opaque, int ret);
-
-static void run_dependent_requests(QCowL2Meta *m)
+static void qcow2_co_run_dependent_requests(void *opaque)
 {
+    QCowAIOCB *acb = opaque;
     QCowAIOCB *req;
     QCowAIOCB *next;
 
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    /* Restart all dependent requests */
+    QLIST_FOREACH_SAFE(req, &acb->l2meta.dependent_requests, next_depend, next) {
+        qemu_coroutine_enter(req->coroutine, NULL);
+    }
+
+    /* Reenter the original request */
+    qemu_coroutine_enter(acb->coroutine, NULL);
+}
+
+static void run_dependent_requests(QCowL2Meta *m)
+{
     /* Take the request off the list of running requests */
     if (m->nb_clusters != 0) {
         QLIST_REMOVE(m, next_in_flight);
     }
 
-    /* Restart all dependent requests */
-    QLIST_FOREACH_SAFE(req, &m->dependent_requests, next_depend, next) {
-        qcow2_aio_write_cb(req, 0);
+    if (!QLIST_EMPTY(&m->dependent_requests)) {
+        /* TODO This is a hack to get at the acb, may not be correct if called
+         * with a QCowL2Meta that is not part of a QCowAIOCB.
+         */
+        QCowAIOCB *acb = container_of(m, QCowAIOCB, l2meta);
+        qcow2_schedule_bh(qcow2_co_run_dependent_requests, acb);
+        qemu_coroutine_yield(NULL);
     }
 
     /* Empty the list for the next part of the request */
     QLIST_INIT(&m->dependent_requests);
 }
 
-static void qcow2_aio_write_cb(void *opaque, int ret)
+static int coroutine_fn qcow2_aio_write_cb(void *opaque, int ret)
 {
     QCowAIOCB *acb = opaque;
     BlockDriverState *bs = acb->common.bs;
@@ -609,8 +628,6 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
     int index_in_cluster;
     int n_end;
 
-    acb->hd_aiocb = NULL;
-
     if (ret >= 0) {
         ret = qcow2_alloc_cluster_link_l2(bs, &acb->l2meta);
     }
@@ -648,7 +665,8 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
     if (acb->l2meta.nb_clusters == 0 && acb->l2meta.depends_on != NULL) {
         QLIST_INSERT_HEAD(&acb->l2meta.depends_on->dependent_requests,
             acb, next_depend);
-        return;
+        qemu_coroutine_yield(NULL);
+        return 1;
     }
 
     assert((acb->cluster_offset & 511) == 0);
@@ -675,25 +693,22 @@  static void qcow2_aio_write_cb(void *opaque, int ret)
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
-    acb->hd_aiocb = bdrv_aio_writev(bs->file,
-                                    (acb->cluster_offset >> 9) + index_in_cluster,
-                                    &acb->hd_qiov, acb->cur_nr_sectors,
-                                    qcow2_aio_write_cb, acb);
-    if (acb->hd_aiocb == NULL) {
-        ret = -EIO;
+    ret = bdrv_co_writev(bs->file,
+                         (acb->cluster_offset >> 9) + index_in_cluster,
+                         &acb->hd_qiov, acb->cur_nr_sectors);
+    if (ret < 0) {
         goto fail;
     }
-
-    return;
+    return 1;
 
 fail:
     if (acb->l2meta.nb_clusters != 0) {
         QLIST_REMOVE(&acb->l2meta, next_in_flight);
     }
 done:
-    acb->common.cb(acb->common.opaque, ret);
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    acb->ret = ret;
+    qcow2_schedule_bh(qcow2_aio_bh, acb);
+    return 0;
 }
 
 static BlockDriverAIOCB *qcow2_aio_writev(BlockDriverState *bs,
@@ -703,16 +718,10 @@  static BlockDriverAIOCB *qcow2_aio_writev(BlockDriverState *bs,
                                           void *opaque)
 {
     BDRVQcowState *s = bs->opaque;
-    QCowAIOCB *acb;
 
     s->cluster_cache_offset = -1; /* disable compressed cache */
 
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
-    if (!acb)
-        return NULL;
-
-    qcow2_aio_write_cb(acb, 0);
-    return &acb->common;
+    return qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
 }
 
 static void qcow2_close(BlockDriverState *bs)
@@ -824,6 +833,7 @@  static int qcow2_change_backing_file(BlockDriverState *bs,
     return qcow2_update_ext_header(bs, backing_file, backing_fmt);
 }
 
+/* TODO did we break this for coroutines? */
 static int preallocate(BlockDriverState *bs)
 {
     uint64_t nb_sectors;