diff mbox

[RFC,v3,1/4] block: Implement bdrv_aio_pwrite

Message ID 1291121332-10588-2-git-send-email-kwolf@redhat.com
State New
Headers show

Commit Message

Kevin Wolf Nov. 30, 2010, 12:48 p.m. UTC
This implements an asynchronous version of bdrv_pwrite.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 block.c |  167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 block.h |    2 +
 2 files changed, 169 insertions(+), 0 deletions(-)

Comments

Stefan Hajnoczi Dec. 2, 2010, 12:07 p.m. UTC | #1
On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> This implements an asynchronous version of bdrv_pwrite.
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  block.c |  167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block.h |    2 +
>  2 files changed, 169 insertions(+), 0 deletions(-)

Is this function is necessary?

Current synchronous code uses pwrite() so this function makes it easy
to convert existing code.  But if that code took the block-based
nature of storage into account then this read-modify-write helper
isn't needed.

I guess what I'm saying is that this function should only be used when
you really need rmw (in many cases with image metadata it can be
avoided because you have enough metadata cached in memory to do full
sector writes).  If it turns out we don't need rmw then we can
eliminate this function.

> +    switch (acb->state) {
> +    case 0: {
> +        /* Read first sector if needed */

Please use an enum instead of int literals with comments.  Or you
could try separate functions and see if the switch statement really
saves that many lines of code.

> +    case 3: {
> +        /* Read last sector if needed */
> +        if (acb->bytes == 0) {
> +            goto done;
> +        }
> +
> +        acb->state = 4;
> +        acb->iov.iov_base = acb->tmp_buf;

acb->tmp_buf may be NULL here if we took the state transition to 2
instead of doing 1.

> +done:
> +    qemu_free(acb->tmp_buf);
> +    acb->common.cb(acb->common.opaque, ret);

Callback not invoked from a BH.  In an error case we might have made
no blocking calls, i.e. never returned and this callback can cause
reentrancy.

> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
> +    void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    PwriteAIOCB *acb;
> +
> +    acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
> +    acb->state      = 0;
> +    acb->offset     = offset;
> +    acb->buf        = buf;
> +    acb->bytes      = bytes;
> +    acb->tmp_buf    = NULL;
> +
> +    bdrv_aio_pwrite_cb(acb, 0);

We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
checks here.  Are we okay to wait until calling
bdrv_aio_readv/bdrv_aio_writev for these checks?

Stefan
Kevin Wolf Dec. 2, 2010, 12:30 p.m. UTC | #2
Am 02.12.2010 13:07, schrieb Stefan Hajnoczi:
> On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> This implements an asynchronous version of bdrv_pwrite.
>>
>> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
>> ---
>>  block.c |  167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block.h |    2 +
>>  2 files changed, 169 insertions(+), 0 deletions(-)
> 
> Is this function is necessary?
> 
> Current synchronous code uses pwrite() so this function makes it easy
> to convert existing code.  But if that code took the block-based
> nature of storage into account then this read-modify-write helper
> isn't needed.

For qcow2, most writes (refcount tables, L2 tables, etc.) are aligned to
512 byte sectors, but there are still some left that use pwrite with an
unaligned count. I'm not completely sure which data, but qemu-iotests
crashed with tmp_buf == NULL, so there are some ;-) Probably things like
header and snapshot table writes.

I'm not sure what other image formats do (we might want to use
block-queue for them, too, eventually), but usually that means that they
do strange things.

> I guess what I'm saying is that this function should only be used when
> you really need rmw (in many cases with image metadata it can be
> avoided because you have enough metadata cached in memory to do full
> sector writes).  If it turns out we don't need rmw then we can
> eliminate this function.

Maybe what we really should do is completely change the block layer
functions to use bytes as their unit and do any RMW in posix-aio-compat
and linux-aio. Other backends don't need it and without O_DIRECT we
don't even need to do it with files.

Also, using units of 512 bytes is completely arbitrary and may still
involve RMW if the host uses a different sector size.

>> +    switch (acb->state) {
>> +    case 0: {
>> +        /* Read first sector if needed */
> 
> Please use an enum instead of int literals with comments.  Or you
> could try separate functions and see if the switch statement really
> saves that many lines of code.

Okay, will use an enum.

I think the switch may not save that many lines of code, but it improves
readability because with chained functions (and no forward declarations)
you have to read backwards.

>> +    case 3: {
>> +        /* Read last sector if needed */
>> +        if (acb->bytes == 0) {
>> +            goto done;
>> +        }
>> +
>> +        acb->state = 4;
>> +        acb->iov.iov_base = acb->tmp_buf;
> 
> acb->tmp_buf may be NULL here if we took the state transition to 2
> instead of doing 1.

Yup, is already fixed.

>> +done:
>> +    qemu_free(acb->tmp_buf);
>> +    acb->common.cb(acb->common.opaque, ret);
> 
> Callback not invoked from a BH.  In an error case we might have made
> no blocking calls, i.e. never returned and this callback can cause
> reentrancy.

Good point.

>> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
>> +    void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
>> +{
>> +    PwriteAIOCB *acb;
>> +
>> +    acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
>> +    acb->state      = 0;
>> +    acb->offset     = offset;
>> +    acb->buf        = buf;
>> +    acb->bytes      = bytes;
>> +    acb->tmp_buf    = NULL;
>> +
>> +    bdrv_aio_pwrite_cb(acb, 0);
> 
> We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
> checks here.  Are we okay to wait until calling
> bdrv_aio_readv/bdrv_aio_writev for these checks?

I think we are, but if you prefer, I can copy them here.

Kevin
Stefan Hajnoczi Dec. 2, 2010, 1:04 p.m. UTC | #3
On Thu, Dec 2, 2010 at 12:30 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 02.12.2010 13:07, schrieb Stefan Hajnoczi:
>> On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> I guess what I'm saying is that this function should only be used when
>> you really need rmw (in many cases with image metadata it can be
>> avoided because you have enough metadata cached in memory to do full
>> sector writes).  If it turns out we don't need rmw then we can
>> eliminate this function.
>
> Maybe what we really should do is completely change the block layer
> functions to use bytes as their unit and do any RMW in posix-aio-compat
> and linux-aio. Other backends don't need it and without O_DIRECT we
> don't even need to do it with files.

Yeah that sounds like something worth exploring more.  Perhaps
together with some input from Christoph on moving QEMU to the native
block size (e.g. 4k on some devices).

>>> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
>>> +    void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
>>> +{
>>> +    PwriteAIOCB *acb;
>>> +
>>> +    acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
>>> +    acb->state      = 0;
>>> +    acb->offset     = offset;
>>> +    acb->buf        = buf;
>>> +    acb->bytes      = bytes;
>>> +    acb->tmp_buf    = NULL;
>>> +
>>> +    bdrv_aio_pwrite_cb(acb, 0);
>>
>> We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
>> checks here.  Are we okay to wait until calling
>> bdrv_aio_readv/bdrv_aio_writev for these checks?
>
> I think we are, but if you prefer, I can copy them here.

No, I just wanted to make sure you took them into account.  In theory
those error cases won't affect your code and it's fine to wait for
bdrv_aio_readv/bdrv_aio_writev to catch them.  I haven't thought
through the cases in detail though.

Stefan
diff mbox

Patch

diff --git a/block.c b/block.c
index 63effd8..f10066e 100644
--- a/block.c
+++ b/block.c
@@ -2106,6 +2106,173 @@  BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
     return ret;
 }
 
+typedef struct PwriteAIOCB {
+    BlockDriverAIOCB    common;
+    int                 state;
+    int64_t             offset;
+    size_t              bytes;
+    uint8_t*            buf;
+    uint8_t*            tmp_buf;
+    struct iovec        iov;
+    QEMUIOVector        qiov;
+} PwriteAIOCB;
+
+static void pwrite_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    qemu_aio_flush();
+}
+
+static AIOPool blkqueue_aio_pool = {
+    .aiocb_size         = sizeof(PwriteAIOCB),
+    .cancel             = pwrite_aio_cancel,
+};
+
+static void bdrv_aio_pwrite_cb(void *opaque, int ret)
+{
+    PwriteAIOCB *acb = opaque;
+    BlockDriverAIOCB *tmp_acb;
+    int64_t sector_num;
+
+    if (ret < 0) {
+        goto done;
+    }
+
+    sector_num = acb->offset >> BDRV_SECTOR_BITS;
+
+    switch (acb->state) {
+    case 0: {
+        /* Read first sector if needed */
+        int len;
+
+        len = (BDRV_SECTOR_SIZE - acb->offset) & (BDRV_SECTOR_SIZE - 1);
+
+        if (len > 0) {
+            acb->state = 1;
+            acb->tmp_buf = qemu_blockalign(acb->common.bs, BDRV_SECTOR_SIZE);
+            acb->iov.iov_base = acb->tmp_buf;
+            acb->iov.iov_len = BDRV_SECTOR_SIZE;
+            qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+            tmp_acb = bdrv_aio_readv(acb->common.bs, sector_num, &acb->qiov, 1,
+                bdrv_aio_pwrite_cb, acb);
+            if (tmp_acb == NULL) {
+                bdrv_aio_pwrite_cb(acb, -EIO);
+            }
+        } else {
+            acb->state = 2;
+            bdrv_aio_pwrite_cb(acb, 0);
+        }
+        break;
+    }
+
+    case 1: {
+        /* Modify first cluster and write it back */
+        int len;
+
+        len = (BDRV_SECTOR_SIZE - acb->offset) & (BDRV_SECTOR_SIZE - 1);
+        if (len > acb->bytes) {
+            len = acb->bytes;
+        }
+
+        memcpy(acb->tmp_buf + (acb->offset & (BDRV_SECTOR_SIZE - 1)),
+            acb->buf, len);
+
+        acb->state = 2;
+        acb->offset += len;
+        acb->buf += len;
+        acb->bytes -= len;
+
+        tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov, 1,
+            bdrv_aio_pwrite_cb, acb);
+        if (tmp_acb == NULL) {
+            bdrv_aio_pwrite_cb(acb, -EIO);
+        }
+        break;
+    }
+
+    case 2: {
+        /* Write the sectors "in place" */
+        int nb_sectors = acb->bytes >> BDRV_SECTOR_BITS;
+
+        acb->state = 3;
+        if (nb_sectors > 0) {
+            int len = nb_sectors << BDRV_SECTOR_BITS;
+
+            acb->iov.iov_base = acb->buf;
+            acb->iov.iov_len = len;
+            qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+
+            acb->offset += len;
+            acb->buf += len;
+            acb->bytes -= len;
+
+            tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov,
+                nb_sectors, bdrv_aio_pwrite_cb, acb);
+            if (tmp_acb == NULL) {
+                bdrv_aio_pwrite_cb(acb, -EIO);
+            }
+        } else {
+            bdrv_aio_pwrite_cb(acb, 0);
+        }
+        break;
+    }
+
+    case 3: {
+        /* Read last sector if needed */
+        if (acb->bytes == 0) {
+            goto done;
+        }
+
+        acb->state = 4;
+        acb->iov.iov_base = acb->tmp_buf;
+        acb->iov.iov_len = BDRV_SECTOR_SIZE;
+        qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+        tmp_acb = bdrv_aio_readv(acb->common.bs, sector_num, &acb->qiov, 1,
+            bdrv_aio_pwrite_cb, acb);
+        if (tmp_acb == NULL) {
+            bdrv_aio_pwrite_cb(acb, -EIO);
+        }
+        break;
+    }
+
+    case 4:
+        /* Modify and write last sector */
+        acb->state = 5;
+        memcpy(acb->tmp_buf, acb->buf, acb->bytes);
+        tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov, 1,
+            bdrv_aio_pwrite_cb, acb);
+        if (tmp_acb == NULL) {
+            bdrv_aio_pwrite_cb(acb, -EIO);
+        }
+        break;
+
+    case 5:
+        goto done;
+    }
+    return;
+
+done:
+    qemu_free(acb->tmp_buf);
+    acb->common.cb(acb->common.opaque, ret);
+    qemu_aio_release(acb);
+}
+
+BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
+    void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
+{
+    PwriteAIOCB *acb;
+
+    acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
+    acb->state      = 0;
+    acb->offset     = offset;
+    acb->buf        = buf;
+    acb->bytes      = bytes;
+    acb->tmp_buf    = NULL;
+
+    bdrv_aio_pwrite_cb(acb, 0);
+
+    return &acb->common;
+}
+
 
 typedef struct MultiwriteCB {
     int error;
diff --git a/block.h b/block.h
index 78ecfac..c6e4d90 100644
--- a/block.h
+++ b/block.h
@@ -116,6 +116,8 @@  BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
 BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
                                   QEMUIOVector *iov, int nb_sectors,
                                   BlockDriverCompletionFunc *cb, void *opaque);
+BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset, void* buf,
+    size_t bytes, BlockDriverCompletionFunc *cb, void *opaque);
 BlockDriverAIOCB *bdrv_aio_flush(BlockDriverState *bs,
 				 BlockDriverCompletionFunc *cb, void *opaque);
 void bdrv_aio_cancel(BlockDriverAIOCB *acb);