Patchwork [RFC,v9,11/27] virtio-blk: Indirect vring and flush support

login
register
mail settings
Submitter Stefan Hajnoczi
Date July 18, 2012, 3:07 p.m.
Message ID <1342624074-24650-12-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/171719/
State New
Headers show

Comments

Stefan Hajnoczi - July 18, 2012, 3:07 p.m.
RHEL6 and other new guest kernels use indirect vring descriptors to
increase the number of requests that can be batched.  This fundamentally
changes vring from a scheme that requires fixed resources to something
more dynamic (although there is still an absolute maximum number of
descriptors).  Cope with indirect vrings by taking on as many requests
as we can in one go and then postponing the remaining requests until the
first batch completes.

It would be possible to switch to dynamic resource management so iovec
and iocb structs are malloced.  This would allow the entire ring to be
processed even with indirect descriptors, but would probably hit a
bottleneck when io_submit refuses to queue more requests.  Therefore,
stick with the simpler scheme for now.

Unfortunately Linux AIO does not support asynchronous fsync/fdatasync on
all files.  In particular, an O_DIRECT opened file on ext4 does not
support Linux AIO fdsync.  Work around this by performing fdatasync()
synchronously for now.

Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 hw/dataplane/ioq.h   |   18 ++++-----
 hw/dataplane/vring.h |  103 +++++++++++++++++++++++++++++++++++++++++++-------
 hw/virtio-blk.c      |   75 ++++++++++++++++++++++--------------
 3 files changed, 144 insertions(+), 52 deletions(-)
Michael S. Tsirkin - July 18, 2012, 6:28 p.m.
On Wed, Jul 18, 2012 at 04:07:38PM +0100, Stefan Hajnoczi wrote:
> RHEL6 and other new guest kernels use indirect vring descriptors to
> increase the number of requests that can be batched.  This fundamentally
> changes vring from a scheme that requires fixed resources to something
> more dynamic (although there is still an absolute maximum number of
> descriptors).  Cope with indirect vrings by taking on as many requests
> as we can in one go and then postponing the remaining requests until the
> first batch completes.
> 
> It would be possible to switch to dynamic resource management so iovec
> and iocb structs are malloced.  This would allow the entire ring to be
> processed even with indirect descriptors, but would probably hit a
> bottleneck when io_submit refuses to queue more requests.  Therefore,
> stick with the simpler scheme for now.
> 
> Unfortunately Linux AIO does not support asynchronous fsync/fdatasync on
> all files.  In particular, an O_DIRECT opened file on ext4 does not
> support Linux AIO fdsync.  Work around this by performing fdatasync()
> synchronously for now.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  hw/dataplane/ioq.h   |   18 ++++-----
>  hw/dataplane/vring.h |  103 +++++++++++++++++++++++++++++++++++++++++++-------
>  hw/virtio-blk.c      |   75 ++++++++++++++++++++++--------------
>  3 files changed, 144 insertions(+), 52 deletions(-)
> 
> diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
> index 7200e87..d1545d6 100644
> --- a/hw/dataplane/ioq.h
> +++ b/hw/dataplane/ioq.h
> @@ -3,7 +3,7 @@
>  
>  typedef struct {
>      int fd;                         /* file descriptor */
> -    unsigned int max_reqs;           /* max length of freelist and queue */
> +    unsigned int max_reqs;          /* max length of freelist and queue */
>  
>      io_context_t io_ctx;            /* Linux AIO context */
>      EventNotifier io_notifier;      /* Linux AIO eventfd */
> @@ -91,18 +91,16 @@ static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigne
>      return iocb;
>  }
>  
> -static struct iocb *ioq_fdsync(IOQueue *ioq)
> -{
> -    struct iocb *iocb = ioq_get_iocb(ioq);
> -
> -    io_prep_fdsync(iocb, ioq->fd);
> -    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
> -    return iocb;
> -}
> -
>  static int ioq_submit(IOQueue *ioq)
>  {
>      int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +    if (unlikely(rc < 0)) {
> +        unsigned int i;
> +        fprintf(stderr, "io_submit io_ctx=%#lx nr=%d iovecs=%p\n", (uint64_t)ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +        for (i = 0; i < ioq->queue_idx; i++) {
> +            fprintf(stderr, "[%u] type=%#x fd=%d\n", i, ioq->queue[i]->aio_lio_opcode, ioq->queue[i]->aio_fildes);
> +        }
> +    }
>      ioq->queue_idx = 0; /* reset */
>      return rc;
>  }
> diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
> index 70675e5..3eab4b4 100644
> --- a/hw/dataplane/vring.h
> +++ b/hw/dataplane/vring.h
> @@ -64,6 +64,86 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, int n)
>              vring->vr.desc, vring->vr.avail, vring->vr.used);
>  }
>  
> +static bool vring_more_avail(Vring *vring)
> +{
> +	return vring->vr.avail->idx != vring->last_avail_idx;
> +}
> +
> +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
> +static bool get_indirect(Vring *vring,
> +			struct iovec iov[], struct iovec *iov_end,
> +			unsigned int *out_num, unsigned int *in_num,
> +			struct vring_desc *indirect)
> +{
> +	struct vring_desc desc;
> +	unsigned int i = 0, count, found = 0;
> +
> +	/* Sanity check */
> +	if (unlikely(indirect->len % sizeof desc)) {
> +		fprintf(stderr, "Invalid length in indirect descriptor: "
> +		       "len 0x%llx not multiple of 0x%zx\n",
> +		       (unsigned long long)indirect->len,
> +		       sizeof desc);
> +		exit(1);
> +	}
> +
> +	count = indirect->len / sizeof desc;
> +	/* Buffers are chained via a 16 bit next field, so
> +	 * we can have at most 2^16 of these. */
> +	if (unlikely(count > USHRT_MAX + 1)) {
> +		fprintf(stderr, "Indirect buffer length too big: %d\n",
> +		       indirect->len);
> +        exit(1);
> +	}
> +
> +    /* Point to translate indirect desc chain */
> +    indirect = phys_to_host(vring, indirect->addr);
> +
> +	/* We will use the result as an address to read from, so most
> +	 * architectures only need a compiler barrier here. */
> +	__sync_synchronize(); /* read_barrier_depends(); */


qemu has its own barriers now, pls use them.

> +
> +	do {
> +		if (unlikely(++found > count)) {
> +			fprintf(stderr, "Loop detected: last one at %u "
> +			       "indirect size %u\n",
> +			       i, count);
> +			exit(1);
> +		}
> +
> +        desc = *indirect++;
> +		if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
> +			fprintf(stderr, "Nested indirect descriptor\n");
> +            exit(1);
> +		}
> +
> +        /* Stop for now if there are not enough iovecs available. */
> +        if (iov >= iov_end) {
> +            return false;
> +        }
> +
> +        iov->iov_base = phys_to_host(vring, desc.addr);
> +        iov->iov_len  = desc.len;
> +        iov++;
> +
> +		/* If this is an input descriptor, increment that count. */
> +		if (desc.flags & VRING_DESC_F_WRITE) {
> +			*in_num += 1;
> +		} else {
> +			/* If it's an output descriptor, they're all supposed
> +			 * to come before any input descriptors. */
> +			if (unlikely(*in_num)) {
> +				fprintf(stderr, "Indirect descriptor "
> +				       "has out after in: idx %d\n", i);
> +                exit(1);
> +			}
> +			*out_num += 1;
> +		}
> +        i = desc.next;
> +	} while (desc.flags & VRING_DESC_F_NEXT);
> +    return true;
> +}
> +
>  /* This looks in the virtqueue and for the first available buffer, and converts
>   * it to an iovec for convenient access.  Since descriptors consist of some
>   * number of output then some number of input descriptors, it's actually two
> @@ -129,23 +209,20 @@ static unsigned int vring_pop(Vring *vring,
>  		}
>          desc = vring->vr.desc[i];
>  		if (desc.flags & VRING_DESC_F_INDIRECT) {
> -/*			ret = get_indirect(dev, vq, iov, iov_size,
> -					   out_num, in_num,
> -					   log, log_num, &desc);
> -			if (unlikely(ret < 0)) {
> -				vq_err(vq, "Failure detected "
> -				       "in indirect descriptor at idx %d\n", i);
> -				return ret;
> -			}
> -			continue; */
> -            fprintf(stderr, "Indirect vring not supported\n");
> -            exit(1);
> +			if (!get_indirect(vring, iov, iov_end, out_num, in_num, &desc)) {
> +                return num; /* not enough iovecs, stop for now */
> +            }
> +            continue;
>  		}
>  
> +        /* If there are not enough iovecs left, stop for now.  The caller
> +         * should check if there are more descs available once they have dealt
> +         * with the current set.
> +         */
>          if (iov >= iov_end) {
> -            fprintf(stderr, "Not enough vring iovecs\n");
> -            exit(1);
> +            return num;
>          }
> +
>          iov->iov_base = phys_to_host(vring, desc.addr);
>          iov->iov_len  = desc.len;
>          iov++;
> diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
> index 52ea601..591eace 100644
> --- a/hw/virtio-blk.c
> +++ b/hw/virtio-blk.c
> @@ -62,6 +62,14 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
>      return (VirtIOBlock *)vdev;
>  }
>  
> +/* Normally the block driver passes down the fd, there's no way to get it from
> + * above.
> + */
> +static int get_raw_posix_fd_hack(VirtIOBlock *s)
> +{
> +    return *(int*)s->bs->file->opaque;
> +}
> +
>  static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
>  {
>      VirtIOBlock *s = opaque;
> @@ -83,18 +91,6 @@ static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
>      vring_push(&s->vring, req->head, len + sizeof req->status);
>  }
>  
> -static bool handle_io(EventHandler *handler)
> -{
> -    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> -
> -    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> -        /* TODO is this thread-safe and can it be done faster? */
> -        virtio_irq(s->vq);
> -    }
> -
> -    return true;
> -}
> -
>  static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_num, unsigned int in_num, unsigned int head)
>  {
>      /* Virtio block requests look like this: */
> @@ -117,13 +113,16 @@ static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
>              outhdr->type, outhdr->sector);
>      */
>  
> -    if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
> +    /* TODO Linux sets the barrier bit even when not advertised! */
> +    uint32_t type = outhdr->type & ~VIRTIO_BLK_T_BARRIER;
> +
> +    if (unlikely(type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
>          fprintf(stderr, "virtio-blk unsupported request type %#x\n", outhdr->type);
>          exit(1);
>      }
>  
>      struct iocb *iocb;
> -    switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
> +    switch (type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
>      case VIRTIO_BLK_T_IN:
>          if (unlikely(out_num != 1)) {
>              fprintf(stderr, "virtio-blk invalid read request\n");
> @@ -145,8 +144,16 @@ static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
>              fprintf(stderr, "virtio-blk invalid flush request\n");
>              exit(1);
>          }
> -        iocb = ioq_fdsync(ioq);
> -        break;
> +
> +        /* TODO fdsync is not supported by all backends, do it synchronously here! */
> +        {
> +            VirtIOBlock *s = container_of(ioq, VirtIOBlock, ioqueue);
> +            fdatasync(get_raw_posix_fd_hack(s));
> +            inhdr->status = VIRTIO_BLK_S_OK;
> +            vring_push(&s->vring, head, sizeof *inhdr);
> +            virtio_irq(s->vq);
> +        }
> +        return;
>  
>      default:
>          fprintf(stderr, "virtio-blk multiple request type bits set\n");
> @@ -199,11 +206,29 @@ static bool handle_notify(EventHandler *handler)
>      }
>  
>      /* Submit requests, if any */
> -    if (likely(iov != iovec)) {
> -        if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
> -            fprintf(stderr, "ioq_submit failed\n");
> -            exit(1);
> -        }
> +    int rc = ioq_submit(&s->ioqueue);
> +    if (unlikely(rc < 0)) {
> +        fprintf(stderr, "ioq_submit failed %d\n", rc);
> +        exit(1);
> +    }
> +    return true;
> +}
> +
> +static bool handle_io(EventHandler *handler)
> +{
> +    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> +
> +    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> +        /* TODO is this thread-safe and can it be done faster? */
> +        virtio_irq(s->vq);
> +    }
> +
> +    /* If there were more requests than iovecs, the vring will not be empty yet
> +     * so check again.  There should now be enough resources to process more
> +     * requests.
> +     */
> +    if (vring_more_avail(&s->vring)) {
> +        return handle_notify(&s->notify_handler);
>      }
>  
>      return true;
> @@ -217,14 +242,6 @@ static void *data_plane_thread(void *opaque)
>      return NULL;
>  }
>  
> -/* Normally the block driver passes down the fd, there's no way to get it from
> - * above.
> - */
> -static int get_raw_posix_fd_hack(VirtIOBlock *s)
> -{
> -    return *(int*)s->bs->file->opaque;
> -}
> -
>  static void data_plane_start(VirtIOBlock *s)
>  {
>      int i;
> -- 
> 1.7.10.4
Michael S. Tsirkin - July 18, 2012, 7:02 p.m.
On Wed, Jul 18, 2012 at 04:07:38PM +0100, Stefan Hajnoczi wrote:
> RHEL6 and other new guest kernels use indirect vring descriptors to
> increase the number of requests that can be batched.  This fundamentally
> changes vring from a scheme that requires fixed resources to something
> more dynamic (although there is still an absolute maximum number of
> descriptors).  Cope with indirect vrings by taking on as many requests
> as we can in one go and then postponing the remaining requests until the
> first batch completes.
> 
> It would be possible to switch to dynamic resource management so iovec
> and iocb structs are malloced.  This would allow the entire ring to be
> processed even with indirect descriptors, but would probably hit a
> bottleneck when io_submit refuses to queue more requests.  Therefore,
> stick with the simpler scheme for now.
> 
> Unfortunately Linux AIO does not support asynchronous fsync/fdatasync on
> all files.  In particular, an O_DIRECT opened file on ext4 does not
> support Linux AIO fdsync.  Work around this by performing fdatasync()
> synchronously for now.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  hw/dataplane/ioq.h   |   18 ++++-----
>  hw/dataplane/vring.h |  103 +++++++++++++++++++++++++++++++++++++++++++-------
>  hw/virtio-blk.c      |   75 ++++++++++++++++++++++--------------
>  3 files changed, 144 insertions(+), 52 deletions(-)
> 
> diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
> index 7200e87..d1545d6 100644
> --- a/hw/dataplane/ioq.h
> +++ b/hw/dataplane/ioq.h
> @@ -3,7 +3,7 @@
>  
>  typedef struct {
>      int fd;                         /* file descriptor */
> -    unsigned int max_reqs;           /* max length of freelist and queue */
> +    unsigned int max_reqs;          /* max length of freelist and queue */
>  
>      io_context_t io_ctx;            /* Linux AIO context */
>      EventNotifier io_notifier;      /* Linux AIO eventfd */
> @@ -91,18 +91,16 @@ static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigne
>      return iocb;
>  }
>  
> -static struct iocb *ioq_fdsync(IOQueue *ioq)
> -{
> -    struct iocb *iocb = ioq_get_iocb(ioq);
> -
> -    io_prep_fdsync(iocb, ioq->fd);
> -    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
> -    return iocb;
> -}
> -
>  static int ioq_submit(IOQueue *ioq)
>  {
>      int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +    if (unlikely(rc < 0)) {
> +        unsigned int i;
> +        fprintf(stderr, "io_submit io_ctx=%#lx nr=%d iovecs=%p\n", (uint64_t)ioq->io_ctx, ioq->queue_idx, ioq->queue);
> +        for (i = 0; i < ioq->queue_idx; i++) {
> +            fprintf(stderr, "[%u] type=%#x fd=%d\n", i, ioq->queue[i]->aio_lio_opcode, ioq->queue[i]->aio_fildes);
> +        }
> +    }
>      ioq->queue_idx = 0; /* reset */
>      return rc;
>  }
> diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
> index 70675e5..3eab4b4 100644
> --- a/hw/dataplane/vring.h
> +++ b/hw/dataplane/vring.h
> @@ -64,6 +64,86 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, int n)
>              vring->vr.desc, vring->vr.avail, vring->vr.used);
>  }
>  
> +static bool vring_more_avail(Vring *vring)
> +{
> +	return vring->vr.avail->idx != vring->last_avail_idx;
> +}
> +
> +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */

So add a Red Hat copyright pls.

> +static bool get_indirect(Vring *vring,
> +			struct iovec iov[], struct iovec *iov_end,
> +			unsigned int *out_num, unsigned int *in_num,
> +			struct vring_desc *indirect)
> +{
> +	struct vring_desc desc;
> +	unsigned int i = 0, count, found = 0;
> +
> +	/* Sanity check */
> +	if (unlikely(indirect->len % sizeof desc)) {
> +		fprintf(stderr, "Invalid length in indirect descriptor: "
> +		       "len 0x%llx not multiple of 0x%zx\n",
> +		       (unsigned long long)indirect->len,
> +		       sizeof desc);
> +		exit(1);
> +	}
> +
> +	count = indirect->len / sizeof desc;
> +	/* Buffers are chained via a 16 bit next field, so
> +	 * we can have at most 2^16 of these. */
> +	if (unlikely(count > USHRT_MAX + 1)) {
> +		fprintf(stderr, "Indirect buffer length too big: %d\n",
> +		       indirect->len);
> +        exit(1);
> +	}
> +
> +    /* Point to translate indirect desc chain */
> +    indirect = phys_to_host(vring, indirect->addr);
> +
> +	/* We will use the result as an address to read from, so most
> +	 * architectures only need a compiler barrier here. */
> +	__sync_synchronize(); /* read_barrier_depends(); */
> +
> +	do {
> +		if (unlikely(++found > count)) {
> +			fprintf(stderr, "Loop detected: last one at %u "
> +			       "indirect size %u\n",
> +			       i, count);
> +			exit(1);
> +		}
> +
> +        desc = *indirect++;
> +		if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
> +			fprintf(stderr, "Nested indirect descriptor\n");
> +            exit(1);
> +		}
> +
> +        /* Stop for now if there are not enough iovecs available. */
> +        if (iov >= iov_end) {
> +            return false;
> +        }
> +
> +        iov->iov_base = phys_to_host(vring, desc.addr);
> +        iov->iov_len  = desc.len;
> +        iov++;
> +
> +		/* If this is an input descriptor, increment that count. */
> +		if (desc.flags & VRING_DESC_F_WRITE) {
> +			*in_num += 1;
> +		} else {
> +			/* If it's an output descriptor, they're all supposed
> +			 * to come before any input descriptors. */
> +			if (unlikely(*in_num)) {
> +				fprintf(stderr, "Indirect descriptor "
> +				       "has out after in: idx %d\n", i);
> +                exit(1);
> +			}
> +			*out_num += 1;
> +		}
> +        i = desc.next;
> +	} while (desc.flags & VRING_DESC_F_NEXT);
> +    return true;
> +}
> +
>  /* This looks in the virtqueue and for the first available buffer, and converts
>   * it to an iovec for convenient access.  Since descriptors consist of some
>   * number of output then some number of input descriptors, it's actually two
> @@ -129,23 +209,20 @@ static unsigned int vring_pop(Vring *vring,
>  		}
>          desc = vring->vr.desc[i];
>  		if (desc.flags & VRING_DESC_F_INDIRECT) {
> -/*			ret = get_indirect(dev, vq, iov, iov_size,
> -					   out_num, in_num,
> -					   log, log_num, &desc);
> -			if (unlikely(ret < 0)) {
> -				vq_err(vq, "Failure detected "
> -				       "in indirect descriptor at idx %d\n", i);
> -				return ret;
> -			}
> -			continue; */
> -            fprintf(stderr, "Indirect vring not supported\n");
> -            exit(1);
> +			if (!get_indirect(vring, iov, iov_end, out_num, in_num, &desc)) {
> +                return num; /* not enough iovecs, stop for now */
> +            }
> +            continue;
>  		}
>  
> +        /* If there are not enough iovecs left, stop for now.  The caller
> +         * should check if there are more descs available once they have dealt
> +         * with the current set.
> +         */
>          if (iov >= iov_end) {
> -            fprintf(stderr, "Not enough vring iovecs\n");
> -            exit(1);
> +            return num;
>          }
> +
>          iov->iov_base = phys_to_host(vring, desc.addr);
>          iov->iov_len  = desc.len;
>          iov++;
> diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
> index 52ea601..591eace 100644
> --- a/hw/virtio-blk.c
> +++ b/hw/virtio-blk.c
> @@ -62,6 +62,14 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
>      return (VirtIOBlock *)vdev;
>  }
>  
> +/* Normally the block driver passes down the fd, there's no way to get it from
> + * above.
> + */
> +static int get_raw_posix_fd_hack(VirtIOBlock *s)
> +{
> +    return *(int*)s->bs->file->opaque;
> +}
> +
>  static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
>  {
>      VirtIOBlock *s = opaque;
> @@ -83,18 +91,6 @@ static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
>      vring_push(&s->vring, req->head, len + sizeof req->status);
>  }
>  
> -static bool handle_io(EventHandler *handler)
> -{
> -    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> -
> -    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> -        /* TODO is this thread-safe and can it be done faster? */
> -        virtio_irq(s->vq);
> -    }
> -
> -    return true;
> -}
> -
>  static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_num, unsigned int in_num, unsigned int head)
>  {
>      /* Virtio block requests look like this: */
> @@ -117,13 +113,16 @@ static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
>              outhdr->type, outhdr->sector);
>      */
>  
> -    if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
> +    /* TODO Linux sets the barrier bit even when not advertised! */
> +    uint32_t type = outhdr->type & ~VIRTIO_BLK_T_BARRIER;
> +
> +    if (unlikely(type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
>          fprintf(stderr, "virtio-blk unsupported request type %#x\n", outhdr->type);
>          exit(1);
>      }
>  
>      struct iocb *iocb;
> -    switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
> +    switch (type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
>      case VIRTIO_BLK_T_IN:
>          if (unlikely(out_num != 1)) {
>              fprintf(stderr, "virtio-blk invalid read request\n");
> @@ -145,8 +144,16 @@ static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
>              fprintf(stderr, "virtio-blk invalid flush request\n");
>              exit(1);
>          }
> -        iocb = ioq_fdsync(ioq);
> -        break;
> +
> +        /* TODO fdsync is not supported by all backends, do it synchronously here! */
> +        {
> +            VirtIOBlock *s = container_of(ioq, VirtIOBlock, ioqueue);
> +            fdatasync(get_raw_posix_fd_hack(s));
> +            inhdr->status = VIRTIO_BLK_S_OK;
> +            vring_push(&s->vring, head, sizeof *inhdr);
> +            virtio_irq(s->vq);
> +        }
> +        return;
>  
>      default:
>          fprintf(stderr, "virtio-blk multiple request type bits set\n");
> @@ -199,11 +206,29 @@ static bool handle_notify(EventHandler *handler)
>      }
>  
>      /* Submit requests, if any */
> -    if (likely(iov != iovec)) {
> -        if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
> -            fprintf(stderr, "ioq_submit failed\n");
> -            exit(1);
> -        }
> +    int rc = ioq_submit(&s->ioqueue);
> +    if (unlikely(rc < 0)) {
> +        fprintf(stderr, "ioq_submit failed %d\n", rc);
> +        exit(1);
> +    }
> +    return true;
> +}
> +
> +static bool handle_io(EventHandler *handler)
> +{
> +    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
> +
> +    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
> +        /* TODO is this thread-safe and can it be done faster? */
> +        virtio_irq(s->vq);
> +    }
> +
> +    /* If there were more requests than iovecs, the vring will not be empty yet
> +     * so check again.  There should now be enough resources to process more
> +     * requests.
> +     */
> +    if (vring_more_avail(&s->vring)) {
> +        return handle_notify(&s->notify_handler);
>      }
>  
>      return true;
> @@ -217,14 +242,6 @@ static void *data_plane_thread(void *opaque)
>      return NULL;
>  }
>  
> -/* Normally the block driver passes down the fd, there's no way to get it from
> - * above.
> - */
> -static int get_raw_posix_fd_hack(VirtIOBlock *s)
> -{
> -    return *(int*)s->bs->file->opaque;
> -}
> -
>  static void data_plane_start(VirtIOBlock *s)
>  {
>      int i;
> -- 
> 1.7.10.4

Patch

diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
index 7200e87..d1545d6 100644
--- a/hw/dataplane/ioq.h
+++ b/hw/dataplane/ioq.h
@@ -3,7 +3,7 @@ 
 
 typedef struct {
     int fd;                         /* file descriptor */
-    unsigned int max_reqs;           /* max length of freelist and queue */
+    unsigned int max_reqs;          /* max length of freelist and queue */
 
     io_context_t io_ctx;            /* Linux AIO context */
     EventNotifier io_notifier;      /* Linux AIO eventfd */
@@ -91,18 +91,16 @@  static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigne
     return iocb;
 }
 
-static struct iocb *ioq_fdsync(IOQueue *ioq)
-{
-    struct iocb *iocb = ioq_get_iocb(ioq);
-
-    io_prep_fdsync(iocb, ioq->fd);
-    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
-    return iocb;
-}
-
 static int ioq_submit(IOQueue *ioq)
 {
     int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
+    if (unlikely(rc < 0)) {
+        unsigned int i;
+        fprintf(stderr, "io_submit io_ctx=%#lx nr=%d iovecs=%p\n", (uint64_t)ioq->io_ctx, ioq->queue_idx, ioq->queue);
+        for (i = 0; i < ioq->queue_idx; i++) {
+            fprintf(stderr, "[%u] type=%#x fd=%d\n", i, ioq->queue[i]->aio_lio_opcode, ioq->queue[i]->aio_fildes);
+        }
+    }
     ioq->queue_idx = 0; /* reset */
     return rc;
 }
diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
index 70675e5..3eab4b4 100644
--- a/hw/dataplane/vring.h
+++ b/hw/dataplane/vring.h
@@ -64,6 +64,86 @@  static void vring_setup(Vring *vring, VirtIODevice *vdev, int n)
             vring->vr.desc, vring->vr.avail, vring->vr.used);
 }
 
+static bool vring_more_avail(Vring *vring)
+{
+	return vring->vr.avail->idx != vring->last_avail_idx;
+}
+
+/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
+static bool get_indirect(Vring *vring,
+			struct iovec iov[], struct iovec *iov_end,
+			unsigned int *out_num, unsigned int *in_num,
+			struct vring_desc *indirect)
+{
+	struct vring_desc desc;
+	unsigned int i = 0, count, found = 0;
+
+	/* Sanity check */
+	if (unlikely(indirect->len % sizeof desc)) {
+		fprintf(stderr, "Invalid length in indirect descriptor: "
+		       "len 0x%llx not multiple of 0x%zx\n",
+		       (unsigned long long)indirect->len,
+		       sizeof desc);
+		exit(1);
+	}
+
+	count = indirect->len / sizeof desc;
+	/* Buffers are chained via a 16 bit next field, so
+	 * we can have at most 2^16 of these. */
+	if (unlikely(count > USHRT_MAX + 1)) {
+		fprintf(stderr, "Indirect buffer length too big: %d\n",
+		       indirect->len);
+        exit(1);
+	}
+
+    /* Point to translate indirect desc chain */
+    indirect = phys_to_host(vring, indirect->addr);
+
+	/* We will use the result as an address to read from, so most
+	 * architectures only need a compiler barrier here. */
+	__sync_synchronize(); /* read_barrier_depends(); */
+
+	do {
+		if (unlikely(++found > count)) {
+			fprintf(stderr, "Loop detected: last one at %u "
+			       "indirect size %u\n",
+			       i, count);
+			exit(1);
+		}
+
+        desc = *indirect++;
+		if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
+			fprintf(stderr, "Nested indirect descriptor\n");
+            exit(1);
+		}
+
+        /* Stop for now if there are not enough iovecs available. */
+        if (iov >= iov_end) {
+            return false;
+        }
+
+        iov->iov_base = phys_to_host(vring, desc.addr);
+        iov->iov_len  = desc.len;
+        iov++;
+
+		/* If this is an input descriptor, increment that count. */
+		if (desc.flags & VRING_DESC_F_WRITE) {
+			*in_num += 1;
+		} else {
+			/* If it's an output descriptor, they're all supposed
+			 * to come before any input descriptors. */
+			if (unlikely(*in_num)) {
+				fprintf(stderr, "Indirect descriptor "
+				       "has out after in: idx %d\n", i);
+                exit(1);
+			}
+			*out_num += 1;
+		}
+        i = desc.next;
+	} while (desc.flags & VRING_DESC_F_NEXT);
+    return true;
+}
+
 /* This looks in the virtqueue and for the first available buffer, and converts
  * it to an iovec for convenient access.  Since descriptors consist of some
  * number of output then some number of input descriptors, it's actually two
@@ -129,23 +209,20 @@  static unsigned int vring_pop(Vring *vring,
 		}
         desc = vring->vr.desc[i];
 		if (desc.flags & VRING_DESC_F_INDIRECT) {
-/*			ret = get_indirect(dev, vq, iov, iov_size,
-					   out_num, in_num,
-					   log, log_num, &desc);
-			if (unlikely(ret < 0)) {
-				vq_err(vq, "Failure detected "
-				       "in indirect descriptor at idx %d\n", i);
-				return ret;
-			}
-			continue; */
-            fprintf(stderr, "Indirect vring not supported\n");
-            exit(1);
+			if (!get_indirect(vring, iov, iov_end, out_num, in_num, &desc)) {
+                return num; /* not enough iovecs, stop for now */
+            }
+            continue;
 		}
 
+        /* If there are not enough iovecs left, stop for now.  The caller
+         * should check if there are more descs available once they have dealt
+         * with the current set.
+         */
         if (iov >= iov_end) {
-            fprintf(stderr, "Not enough vring iovecs\n");
-            exit(1);
+            return num;
         }
+
         iov->iov_base = phys_to_host(vring, desc.addr);
         iov->iov_len  = desc.len;
         iov++;
diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
index 52ea601..591eace 100644
--- a/hw/virtio-blk.c
+++ b/hw/virtio-blk.c
@@ -62,6 +62,14 @@  static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
     return (VirtIOBlock *)vdev;
 }
 
+/* Normally the block driver passes down the fd, there's no way to get it from
+ * above.
+ */
+static int get_raw_posix_fd_hack(VirtIOBlock *s)
+{
+    return *(int*)s->bs->file->opaque;
+}
+
 static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
 {
     VirtIOBlock *s = opaque;
@@ -83,18 +91,6 @@  static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
     vring_push(&s->vring, req->head, len + sizeof req->status);
 }
 
-static bool handle_io(EventHandler *handler)
-{
-    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
-
-    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
-        /* TODO is this thread-safe and can it be done faster? */
-        virtio_irq(s->vq);
-    }
-
-    return true;
-}
-
 static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_num, unsigned int in_num, unsigned int head)
 {
     /* Virtio block requests look like this: */
@@ -117,13 +113,16 @@  static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
             outhdr->type, outhdr->sector);
     */
 
-    if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
+    /* TODO Linux sets the barrier bit even when not advertised! */
+    uint32_t type = outhdr->type & ~VIRTIO_BLK_T_BARRIER;
+
+    if (unlikely(type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
         fprintf(stderr, "virtio-blk unsupported request type %#x\n", outhdr->type);
         exit(1);
     }
 
     struct iocb *iocb;
-    switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
+    switch (type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
     case VIRTIO_BLK_T_IN:
         if (unlikely(out_num != 1)) {
             fprintf(stderr, "virtio-blk invalid read request\n");
@@ -145,8 +144,16 @@  static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_n
             fprintf(stderr, "virtio-blk invalid flush request\n");
             exit(1);
         }
-        iocb = ioq_fdsync(ioq);
-        break;
+
+        /* TODO fdsync is not supported by all backends, do it synchronously here! */
+        {
+            VirtIOBlock *s = container_of(ioq, VirtIOBlock, ioqueue);
+            fdatasync(get_raw_posix_fd_hack(s));
+            inhdr->status = VIRTIO_BLK_S_OK;
+            vring_push(&s->vring, head, sizeof *inhdr);
+            virtio_irq(s->vq);
+        }
+        return;
 
     default:
         fprintf(stderr, "virtio-blk multiple request type bits set\n");
@@ -199,11 +206,29 @@  static bool handle_notify(EventHandler *handler)
     }
 
     /* Submit requests, if any */
-    if (likely(iov != iovec)) {
-        if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
-            fprintf(stderr, "ioq_submit failed\n");
-            exit(1);
-        }
+    int rc = ioq_submit(&s->ioqueue);
+    if (unlikely(rc < 0)) {
+        fprintf(stderr, "ioq_submit failed %d\n", rc);
+        exit(1);
+    }
+    return true;
+}
+
+static bool handle_io(EventHandler *handler)
+{
+    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
+
+    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
+        /* TODO is this thread-safe and can it be done faster? */
+        virtio_irq(s->vq);
+    }
+
+    /* If there were more requests than iovecs, the vring will not be empty yet
+     * so check again.  There should now be enough resources to process more
+     * requests.
+     */
+    if (vring_more_avail(&s->vring)) {
+        return handle_notify(&s->notify_handler);
     }
 
     return true;
@@ -217,14 +242,6 @@  static void *data_plane_thread(void *opaque)
     return NULL;
 }
 
-/* Normally the block driver passes down the fd, there's no way to get it from
- * above.
- */
-static int get_raw_posix_fd_hack(VirtIOBlock *s)
-{
-    return *(int*)s->bs->file->opaque;
-}
-
 static void data_plane_start(VirtIOBlock *s)
 {
     int i;