Patchwork [v4,04/11] dataplane: add virtqueue vring code

login
register
mail settings
Submitter Stefan Hajnoczi
Date Nov. 22, 2012, 3:16 p.m.
Message ID <1353597412-12232-5-git-send-email-stefanha@redhat.com>
Download mbox | patch
Permalink /patch/201065/
State New
Headers show

Comments

Stefan Hajnoczi - Nov. 22, 2012, 3:16 p.m.
The virtio-blk-data-plane cannot access memory using the usual QEMU
functions since it executes outside the global mutex and the memory APIs
are this time are not thread-safe.

This patch introduces a virtqueue module based on the kernel's vhost
vring code.  The trick is that we map guest memory ahead of time and
access it cheaply outside the global mutex.

Once the hardware emulation code can execute outside the global mutex it
will be possible to drop this code.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 hw/Makefile.objs           |   2 +-
 hw/dataplane/Makefile.objs |   2 +-
 hw/dataplane/vring.c       | 344 +++++++++++++++++++++++++++++++++++++++++++++
 hw/dataplane/vring.h       |  62 ++++++++
 trace-events               |   3 +
 5 files changed, 411 insertions(+), 2 deletions(-)
 create mode 100644 hw/dataplane/vring.c
 create mode 100644 hw/dataplane/vring.h
Michael S. Tsirkin - Nov. 29, 2012, 12:50 p.m.
On Thu, Nov 22, 2012 at 04:16:45PM +0100, Stefan Hajnoczi wrote:
> The virtio-blk-data-plane cannot access memory using the usual QEMU
> functions since it executes outside the global mutex and the memory APIs
> are this time are not thread-safe.
> 
> This patch introduces a virtqueue module based on the kernel's vhost
> vring code.  The trick is that we map guest memory ahead of time and
> access it cheaply outside the global mutex.
> 
> Once the hardware emulation code can execute outside the global mutex it
> will be possible to drop this code.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>

Is there no way to factor out ommon code and share it with virtio.c?

> ---
>  hw/Makefile.objs           |   2 +-
>  hw/dataplane/Makefile.objs |   2 +-
>  hw/dataplane/vring.c       | 344 +++++++++++++++++++++++++++++++++++++++++++++
>  hw/dataplane/vring.h       |  62 ++++++++
>  trace-events               |   3 +
>  5 files changed, 411 insertions(+), 2 deletions(-)
>  create mode 100644 hw/dataplane/vring.c
>  create mode 100644 hw/dataplane/vring.h
> 
> diff --git a/hw/Makefile.objs b/hw/Makefile.objs
> index ea46f81..db87fbf 100644
> --- a/hw/Makefile.objs
> +++ b/hw/Makefile.objs
> @@ -1,4 +1,4 @@
> -common-obj-y = usb/ ide/
> +common-obj-y = usb/ ide/ dataplane/
>  common-obj-y += loader.o
>  common-obj-$(CONFIG_VIRTIO) += virtio-console.o
>  common-obj-$(CONFIG_VIRTIO) += virtio-rng.o
> diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
> index 8c8dea1..34e6d57 100644
> --- a/hw/dataplane/Makefile.objs
> +++ b/hw/dataplane/Makefile.objs
> @@ -1,3 +1,3 @@
>  ifeq ($(CONFIG_VIRTIO), y)
> -common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o
> +common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o
>  endif
> diff --git a/hw/dataplane/vring.c b/hw/dataplane/vring.c
> new file mode 100644
> index 0000000..2632fbd
> --- /dev/null
> +++ b/hw/dataplane/vring.c
> @@ -0,0 +1,344 @@
> +/* Copyright 2012 Red Hat, Inc.
> + * Copyright IBM, Corp. 2012
> + *
> + * Based on Linux vhost code:
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2006 Rusty Russell IBM Corporation
> + *
> + * Author: Michael S. Tsirkin <mst@redhat.com>
> + *         Stefan Hajnoczi <stefanha@redhat.com>
> + *
> + * Inspiration, some code, and most witty comments come from
> + * Documentation/virtual/lguest/lguest.c, by Rusty Russell
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.
> + */
> +
> +#include "trace.h"
> +#include "hw/dataplane/vring.h"
> +
> +/* Map the guest's vring to host memory */
> +bool vring_setup(Vring *vring, VirtIODevice *vdev, int n)
> +{
> +    hwaddr vring_addr = virtio_queue_get_ring_addr(vdev, n);
> +    hwaddr vring_size = virtio_queue_get_ring_size(vdev, n);
> +    void *vring_ptr;
> +
> +    vring->broken = false;
> +
> +    hostmem_init(&vring->hostmem);
> +    vring_ptr = hostmem_lookup(&vring->hostmem, vring_addr, vring_size, true);
> +    if (!vring_ptr) {
> +        error_report("Failed to map vring "
> +                     "addr %#" HWADDR_PRIx " size %" HWADDR_PRIu,
> +                     vring_addr, vring_size);
> +        vring->broken = true;
> +        return false;
> +    }
> +
> +    vring_init(&vring->vr, virtio_queue_get_num(vdev, n), vring_ptr, 4096);
> +
> +    vring->last_avail_idx = 0;
> +    vring->last_used_idx = 0;
> +    vring->signalled_used = 0;
> +    vring->signalled_used_valid = false;
> +
> +    trace_vring_setup(virtio_queue_get_ring_addr(vdev, n),
> +                      vring->vr.desc, vring->vr.avail, vring->vr.used);
> +    return true;
> +}
> +
> +void vring_teardown(Vring *vring)
> +{
> +    hostmem_finalize(&vring->hostmem);
> +}
> +
> +/* Toggle guest->host notifies */
> +void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable)
> +{
> +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> +        if (enable) {
> +            vring_avail_event(&vring->vr) = vring->vr.avail->idx;
> +        }
> +    } else if (enable) {
> +        vring->vr.used->flags &= ~VRING_USED_F_NO_NOTIFY;
> +    } else {
> +        vring->vr.used->flags |= VRING_USED_F_NO_NOTIFY;
> +    }
> +}
> +
> +/* This is stolen from linux/drivers/vhost/vhost.c:vhost_notify() */
> +bool vring_should_notify(VirtIODevice *vdev, Vring *vring)
> +{
> +    uint16_t old, new;
> +    bool v;
> +    /* Flush out used index updates. This is paired
> +     * with the barrier that the Guest executes when enabling
> +     * interrupts. */
> +    smp_mb();
> +
> +    if ((vdev->guest_features & VIRTIO_F_NOTIFY_ON_EMPTY) &&
> +        unlikely(vring->vr.avail->idx == vring->last_avail_idx)) {
> +        return true;
> +    }
> +
> +    if (!(vdev->guest_features & VIRTIO_RING_F_EVENT_IDX)) {
> +        return !(vring->vr.avail->flags & VRING_AVAIL_F_NO_INTERRUPT);
> +    }
> +    old = vring->signalled_used;
> +    v = vring->signalled_used_valid;
> +    new = vring->signalled_used = vring->last_used_idx;
> +    vring->signalled_used_valid = true;
> +
> +    if (unlikely(!v)) {
> +        return true;
> +    }
> +
> +    return vring_need_event(vring_used_event(&vring->vr), new, old);
> +}
> +
> +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */

Probably should document the version you based this on.
Surely not really 2.6?

> +static int get_indirect(Vring *vring,
> +                        struct iovec iov[], struct iovec *iov_end,
> +                        unsigned int *out_num, unsigned int *in_num,
> +                        struct vring_desc *indirect)
> +{
> +    struct vring_desc desc;
> +    unsigned int i = 0, count, found = 0;
> +
> +    /* Sanity check */
> +    if (unlikely(indirect->len % sizeof desc)) {
> +        error_report("Invalid length in indirect descriptor: "
> +                     "len %#x not multiple of %#zx",
> +                     indirect->len, sizeof desc);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    count = indirect->len / sizeof desc;
> +    /* Buffers are chained via a 16 bit next field, so
> +     * we can have at most 2^16 of these. */
> +    if (unlikely(count > USHRT_MAX + 1)) {
> +        error_report("Indirect buffer length too big: %d", indirect->len);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* Point to translate indirect desc chain */
> +    indirect = hostmem_lookup(&vring->hostmem, indirect->addr, indirect->len,
> +                              false);

This assumes an indirect buffer is contigious in qemu memory
which seems wrong since unlike vring itself
there are no alignment requirements.

Overriding indirect here also seems unnecessarily tricky.

> +    if (!indirect) {
> +        error_report("Failed to map indirect desc chain "
> +                     "addr %#" PRIx64 " len %u",
> +                     (uint64_t)indirect->addr, indirect->len);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* We will use the result as an address to read from, so most
> +     * architectures only need a compiler barrier here. */
> +    barrier(); /* read_barrier_depends(); */
> +
> +    do {
> +        if (unlikely(++found > count)) {
> +            error_report("Loop detected: last one at %u "
> +                         "indirect size %u", i, count);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +
> +        desc = *indirect++;
> +        if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
> +            error_report("Nested indirect descriptor");
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +
> +        /* Stop for now if there are not enough iovecs available. */
> +        if (iov >= iov_end) {
> +            return -ENOBUFS;
> +        }
> +
> +        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
> +                                       desc.flags & VRING_DESC_F_WRITE);
> +        if (!iov->iov_base) {
> +            error_report("Failed to map indirect descriptor"
> +                         "addr %#" PRIx64 " len %u",
> +                         (uint64_t)desc.addr, desc.len);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        iov->iov_len = desc.len;
> +        iov++;
> +
> +        /* If this is an input descriptor, increment that count. */
> +        if (desc.flags & VRING_DESC_F_WRITE) {
> +            *in_num += 1;
> +        } else {
> +            /* If it's an output descriptor, they're all supposed
> +             * to come before any input descriptors. */
> +            if (unlikely(*in_num)) {
> +                error_report("Indirect descriptor "
> +                             "has out after in: idx %d", i);
> +                vring->broken = true;
> +                return -EFAULT;
> +            }
> +            *out_num += 1;
> +        }
> +        i = desc.next;
> +    } while (desc.flags & VRING_DESC_F_NEXT);
> +    return 0;
> +}
> +
> +/* This looks in the virtqueue and for the first available buffer, and converts
> + * it to an iovec for convenient access.  Since descriptors consist of some
> + * number of output then some number of input descriptors, it's actually two
> + * iovecs, but we pack them into one and note how many of each there were.
> + *
> + * This function returns the descriptor number found, or vq->num (which is
> + * never a valid descriptor number) if none was found.  A negative code is
> + * returned on error.
> + *
> + * Stolen from linux-2.6/drivers/vhost/vhost.c.
> + */
> +int vring_pop(VirtIODevice *vdev, Vring *vring,
> +              struct iovec iov[], struct iovec *iov_end,
> +              unsigned int *out_num, unsigned int *in_num)
> +{
> +    struct vring_desc desc;
> +    unsigned int i, head, found = 0, num = vring->vr.num;
> +    uint16_t avail_idx, last_avail_idx;
> +
> +    /* If there was a fatal error then refuse operation */
> +    if (vring->broken) {
> +        return -EFAULT;
> +    }
> +
> +    /* Check it isn't doing very strange things with descriptor numbers. */
> +    last_avail_idx = vring->last_avail_idx;
> +    avail_idx = vring->vr.avail->idx;

I think something needs to be done here to force
a read otherwise two accesses to avail_idx
below can cause two reads from the ring and
could return inconsistent results.

> +
> +    if (unlikely((uint16_t)(avail_idx - last_avail_idx) > num)) {
> +        error_report("Guest moved used index from %u to %u",
> +                     last_avail_idx, avail_idx);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* If there's nothing new since last we looked. */
> +    if (avail_idx == last_avail_idx) {
> +        return -EAGAIN;
> +    }
> +
> +    /* Only get avail ring entries after they have been exposed by guest. */
> +    smp_rmb();
> +
> +    /* Grab the next descriptor number they're advertising, and increment
> +     * the index we've seen. */
> +    head = vring->vr.avail->ring[last_avail_idx % num];
> +
> +    /* If their number is silly, that's an error. */
> +    if (unlikely(head >= num)) {
> +        error_report("Guest says index %u > %u is available", head, num);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> +        vring_avail_event(&vring->vr) = vring->vr.avail->idx;

No barrier here?
I also don't see similar code in vhost - why is it a good idea?

> +    }
> +
> +    /* When we start there are none of either input nor output. */
> +    *out_num = *in_num = 0;
> +
> +    i = head;
> +    do {
> +        if (unlikely(i >= num)) {
> +            error_report("Desc index is %u > %u, head = %u", i, num, head);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        if (unlikely(++found > num)) {
> +            error_report("Loop detected: last one at %u vq size %u head %u",
> +                         i, num, head);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        desc = vring->vr.desc[i];
> +        if (desc.flags & VRING_DESC_F_INDIRECT) {
> +            int ret = get_indirect(vring, iov, iov_end, out_num, in_num, &desc);
> +            if (ret < 0) {
> +                return ret;
> +            }
> +            continue;
> +        }
> +
> +        /* If there are not enough iovecs left, stop for now.  The caller
> +         * should check if there are more descs available once they have dealt
> +         * with the current set.
> +         */
> +        if (iov >= iov_end) {
> +            return -ENOBUFS;
> +        }
> +
> +        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
> +                                       desc.flags & VRING_DESC_F_WRITE);
> +        if (!iov->iov_base) {
> +            error_report("Failed to map vring desc addr %#" PRIx64 " len %u",
> +                         (uint64_t)desc.addr, desc.len);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        iov->iov_len  = desc.len;
> +        iov++;
> +
> +        if (desc.flags & VRING_DESC_F_WRITE) {
> +            /* If this is an input descriptor,
> +             * increment that count. */
> +            *in_num += 1;
> +        } else {
> +            /* If it's an output descriptor, they're all supposed
> +             * to come before any input descriptors. */
> +            if (unlikely(*in_num)) {
> +                error_report("Descriptor has out after in: idx %d", i);
> +                vring->broken = true;
> +                return -EFAULT;
> +            }
> +            *out_num += 1;
> +        }
> +        i = desc.next;
> +    } while (desc.flags & VRING_DESC_F_NEXT);
> +
> +    /* On success, increment avail index. */
> +    vring->last_avail_idx++;
> +    return head;
> +}
> +
> +/* After we've used one of their buffers, we tell them about it.
> + *
> + * Stolen from linux-2.6/drivers/vhost/vhost.c.
> + */
> +void vring_push(Vring *vring, unsigned int head, int len)
> +{
> +    struct vring_used_elem *used;
> +    uint16_t new;
> +
> +    /* Don't touch vring if a fatal error occurred */
> +    if (vring->broken) {
> +        return;
> +    }
> +
> +    /* The virtqueue contains a ring of used buffers.  Get a pointer to the
> +     * next entry in that used ring. */
> +    used = &vring->vr.used->ring[vring->last_used_idx % vring->vr.num];
> +    used->id = head;
> +    used->len = len;
> +
> +    /* Make sure buffer is written before we update index. */
> +    smp_wmb();
> +
> +    new = vring->vr.used->idx = ++vring->last_used_idx;
> +    if (unlikely((int16_t)(new - vring->signalled_used) < (uint16_t)1)) {
> +        vring->signalled_used_valid = false;
> +    }
> +}
> diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
> new file mode 100644
> index 0000000..7245d99
> --- /dev/null
> +++ b/hw/dataplane/vring.h
> @@ -0,0 +1,62 @@
> +/* Copyright 2012 Red Hat, Inc. and/or its affiliates
> + * Copyright IBM, Corp. 2012
> + *
> + * Based on Linux vhost code:
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2006 Rusty Russell IBM Corporation
> + *
> + * Author: Michael S. Tsirkin <mst@redhat.com>
> + *         Stefan Hajnoczi <stefanha@redhat.com>
> + *
> + * Inspiration, some code, and most witty comments come from
> + * Documentation/virtual/lguest/lguest.c, by Rusty Russell
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.
> + */
> +
> +#ifndef VRING_H
> +#define VRING_H
> +
> +#include <linux/virtio_ring.h>
> +#include "qemu-common.h"
> +#include "qemu-barrier.h"
> +#include "hw/dataplane/hostmem.h"
> +#include "hw/virtio.h"
> +
> +typedef struct {
> +    Hostmem hostmem;                /* guest memory mapper */
> +    struct vring vr;                /* virtqueue vring mapped to host memory */
> +    uint16_t last_avail_idx;        /* last processed avail ring index */
> +    uint16_t last_used_idx;         /* last processed used ring index */
> +    uint16_t signalled_used;        /* EVENT_IDX state */
> +    bool signalled_used_valid;
> +    bool broken;                    /* was there a fatal error? */
> +} Vring;
> +
> +static inline unsigned int vring_get_num(Vring *vring)
> +{
> +    return vring->vr.num;
> +}
> +
> +/* Are there more descriptors available? */
> +static inline bool vring_more_avail(Vring *vring)
> +{
> +    return vring->vr.avail->idx != vring->last_avail_idx;
> +}
> +
> +/* Fail future vring_pop() and vring_push() calls until reset */
> +static inline void vring_set_broken(Vring *vring)
> +{
> +    vring->broken = true;
> +}
> +
> +bool vring_setup(Vring *vring, VirtIODevice *vdev, int n);
> +void vring_teardown(Vring *vring);
> +void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable);
> +bool vring_should_notify(VirtIODevice *vdev, Vring *vring);
> +int vring_pop(VirtIODevice *vdev, Vring *vring,
> +              struct iovec iov[], struct iovec *iov_end,
> +              unsigned int *out_num, unsigned int *in_num);
> +void vring_push(Vring *vring, unsigned int head, int len);
> +
> +#endif /* VRING_H */
> diff --git a/trace-events b/trace-events
> index 6c6cbf1..a9a791b 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -98,6 +98,9 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"
>  virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
>  virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
>  
> +# hw/dataplane/vring.c
> +vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
> +
>  # thread-pool.c
>  thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
>  thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
> -- 
> 1.8.0
Michael S. Tsirkin - Nov. 29, 2012, 1:48 p.m.
On Thu, Nov 22, 2012 at 04:16:45PM +0100, Stefan Hajnoczi wrote:
> The virtio-blk-data-plane cannot access memory using the usual QEMU
> functions since it executes outside the global mutex and the memory APIs
> are this time are not thread-safe.
> 
> This patch introduces a virtqueue module based on the kernel's vhost
> vring code.  The trick is that we map guest memory ahead of time and
> access it cheaply outside the global mutex.
> 
> Once the hardware emulation code can execute outside the global mutex it
> will be possible to drop this code.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  hw/Makefile.objs           |   2 +-
>  hw/dataplane/Makefile.objs |   2 +-
>  hw/dataplane/vring.c       | 344 +++++++++++++++++++++++++++++++++++++++++++++
>  hw/dataplane/vring.h       |  62 ++++++++
>  trace-events               |   3 +
>  5 files changed, 411 insertions(+), 2 deletions(-)
>  create mode 100644 hw/dataplane/vring.c
>  create mode 100644 hw/dataplane/vring.h
> 
> diff --git a/hw/Makefile.objs b/hw/Makefile.objs
> index ea46f81..db87fbf 100644
> --- a/hw/Makefile.objs
> +++ b/hw/Makefile.objs
> @@ -1,4 +1,4 @@
> -common-obj-y = usb/ ide/
> +common-obj-y = usb/ ide/ dataplane/
>  common-obj-y += loader.o
>  common-obj-$(CONFIG_VIRTIO) += virtio-console.o
>  common-obj-$(CONFIG_VIRTIO) += virtio-rng.o
> diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
> index 8c8dea1..34e6d57 100644
> --- a/hw/dataplane/Makefile.objs
> +++ b/hw/dataplane/Makefile.objs
> @@ -1,3 +1,3 @@
>  ifeq ($(CONFIG_VIRTIO), y)
> -common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o
> +common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o
>  endif
> diff --git a/hw/dataplane/vring.c b/hw/dataplane/vring.c
> new file mode 100644
> index 0000000..2632fbd
> --- /dev/null
> +++ b/hw/dataplane/vring.c
> @@ -0,0 +1,344 @@
> +/* Copyright 2012 Red Hat, Inc.
> + * Copyright IBM, Corp. 2012
> + *
> + * Based on Linux vhost code:
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2006 Rusty Russell IBM Corporation
> + *
> + * Author: Michael S. Tsirkin <mst@redhat.com>
> + *         Stefan Hajnoczi <stefanha@redhat.com>
> + *
> + * Inspiration, some code, and most witty comments come from
> + * Documentation/virtual/lguest/lguest.c, by Rusty Russell
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.
> + */
> +
> +#include "trace.h"
> +#include "hw/dataplane/vring.h"
> +
> +/* Map the guest's vring to host memory */
> +bool vring_setup(Vring *vring, VirtIODevice *vdev, int n)
> +{
> +    hwaddr vring_addr = virtio_queue_get_ring_addr(vdev, n);
> +    hwaddr vring_size = virtio_queue_get_ring_size(vdev, n);
> +    void *vring_ptr;
> +
> +    vring->broken = false;
> +
> +    hostmem_init(&vring->hostmem);
> +    vring_ptr = hostmem_lookup(&vring->hostmem, vring_addr, vring_size, true);
> +    if (!vring_ptr) {
> +        error_report("Failed to map vring "
> +                     "addr %#" HWADDR_PRIx " size %" HWADDR_PRIu,
> +                     vring_addr, vring_size);
> +        vring->broken = true;
> +        return false;
> +    }
> +
> +    vring_init(&vring->vr, virtio_queue_get_num(vdev, n), vring_ptr, 4096);
> +
> +    vring->last_avail_idx = 0;
> +    vring->last_used_idx = 0;
> +    vring->signalled_used = 0;
> +    vring->signalled_used_valid = false;
> +
> +    trace_vring_setup(virtio_queue_get_ring_addr(vdev, n),
> +                      vring->vr.desc, vring->vr.avail, vring->vr.used);
> +    return true;
> +}
> +
> +void vring_teardown(Vring *vring)
> +{
> +    hostmem_finalize(&vring->hostmem);
> +}
> +
> +/* Toggle guest->host notifies */
> +void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable)
> +{
> +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> +        if (enable) {
> +            vring_avail_event(&vring->vr) = vring->vr.avail->idx;
> +        }
> +    } else if (enable) {
> +        vring->vr.used->flags &= ~VRING_USED_F_NO_NOTIFY;
> +    } else {
> +        vring->vr.used->flags |= VRING_USED_F_NO_NOTIFY;
> +    }
> +}
> +
> +/* This is stolen from linux/drivers/vhost/vhost.c:vhost_notify() */
> +bool vring_should_notify(VirtIODevice *vdev, Vring *vring)
> +{
> +    uint16_t old, new;
> +    bool v;
> +    /* Flush out used index updates. This is paired
> +     * with the barrier that the Guest executes when enabling
> +     * interrupts. */
> +    smp_mb();
> +
> +    if ((vdev->guest_features & VIRTIO_F_NOTIFY_ON_EMPTY) &&
> +        unlikely(vring->vr.avail->idx == vring->last_avail_idx)) {
> +        return true;
> +    }
> +
> +    if (!(vdev->guest_features & VIRTIO_RING_F_EVENT_IDX)) {
> +        return !(vring->vr.avail->flags & VRING_AVAIL_F_NO_INTERRUPT);
> +    }
> +    old = vring->signalled_used;
> +    v = vring->signalled_used_valid;
> +    new = vring->signalled_used = vring->last_used_idx;
> +    vring->signalled_used_valid = true;
> +
> +    if (unlikely(!v)) {
> +        return true;
> +    }
> +
> +    return vring_need_event(vring_used_event(&vring->vr), new, old);
> +}
> +
> +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
> +static int get_indirect(Vring *vring,
> +                        struct iovec iov[], struct iovec *iov_end,
> +                        unsigned int *out_num, unsigned int *in_num,
> +                        struct vring_desc *indirect)
> +{
> +    struct vring_desc desc;
> +    unsigned int i = 0, count, found = 0;
> +
> +    /* Sanity check */
> +    if (unlikely(indirect->len % sizeof desc)) {
> +        error_report("Invalid length in indirect descriptor: "
> +                     "len %#x not multiple of %#zx",
> +                     indirect->len, sizeof desc);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    count = indirect->len / sizeof desc;
> +    /* Buffers are chained via a 16 bit next field, so
> +     * we can have at most 2^16 of these. */
> +    if (unlikely(count > USHRT_MAX + 1)) {
> +        error_report("Indirect buffer length too big: %d", indirect->len);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* Point to translate indirect desc chain */
> +    indirect = hostmem_lookup(&vring->hostmem, indirect->addr, indirect->len,
> +                              false);
> +    if (!indirect) {
> +        error_report("Failed to map indirect desc chain "
> +                     "addr %#" PRIx64 " len %u",
> +                     (uint64_t)indirect->addr, indirect->len);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* We will use the result as an address to read from, so most
> +     * architectures only need a compiler barrier here. */
> +    barrier(); /* read_barrier_depends(); */
> +
> +    do {
> +        if (unlikely(++found > count)) {
> +            error_report("Loop detected: last one at %u "
> +                         "indirect size %u", i, count);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +
> +        desc = *indirect++;

One other thing - I am guessing that in practice this and other accesses
get compiled to a memcpy call so you operate on stack from this point
on so it's fine.
But to really guarantee this, you have to use something like ACCESS_ONCE
macro in linux for all memory accesses or even (gasp) volatile.

Otherwise, e.g. desc.len could get modified after it's
validated with bad results.

> +        if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
> +            error_report("Nested indirect descriptor");
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +
> +        /* Stop for now if there are not enough iovecs available. */
> +        if (iov >= iov_end) {
> +            return -ENOBUFS;
> +        }
> +
> +        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
> +                                       desc.flags & VRING_DESC_F_WRITE);
> +        if (!iov->iov_base) {
> +            error_report("Failed to map indirect descriptor"
> +                         "addr %#" PRIx64 " len %u",
> +                         (uint64_t)desc.addr, desc.len);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        iov->iov_len = desc.len;
> +        iov++;
> +
> +        /* If this is an input descriptor, increment that count. */
> +        if (desc.flags & VRING_DESC_F_WRITE) {
> +            *in_num += 1;
> +        } else {
> +            /* If it's an output descriptor, they're all supposed
> +             * to come before any input descriptors. */
> +            if (unlikely(*in_num)) {
> +                error_report("Indirect descriptor "
> +                             "has out after in: idx %d", i);
> +                vring->broken = true;
> +                return -EFAULT;
> +            }
> +            *out_num += 1;
> +        }
> +        i = desc.next;
> +    } while (desc.flags & VRING_DESC_F_NEXT);
> +    return 0;
> +}
> +
> +/* This looks in the virtqueue and for the first available buffer, and converts
> + * it to an iovec for convenient access.  Since descriptors consist of some
> + * number of output then some number of input descriptors, it's actually two
> + * iovecs, but we pack them into one and note how many of each there were.
> + *
> + * This function returns the descriptor number found, or vq->num (which is
> + * never a valid descriptor number) if none was found.  A negative code is
> + * returned on error.
> + *
> + * Stolen from linux-2.6/drivers/vhost/vhost.c.
> + */
> +int vring_pop(VirtIODevice *vdev, Vring *vring,
> +              struct iovec iov[], struct iovec *iov_end,
> +              unsigned int *out_num, unsigned int *in_num)
> +{
> +    struct vring_desc desc;
> +    unsigned int i, head, found = 0, num = vring->vr.num;
> +    uint16_t avail_idx, last_avail_idx;
> +
> +    /* If there was a fatal error then refuse operation */
> +    if (vring->broken) {
> +        return -EFAULT;
> +    }
> +
> +    /* Check it isn't doing very strange things with descriptor numbers. */
> +    last_avail_idx = vring->last_avail_idx;
> +    avail_idx = vring->vr.avail->idx;
> +
> +    if (unlikely((uint16_t)(avail_idx - last_avail_idx) > num)) {
> +        error_report("Guest moved used index from %u to %u",
> +                     last_avail_idx, avail_idx);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    /* If there's nothing new since last we looked. */
> +    if (avail_idx == last_avail_idx) {
> +        return -EAGAIN;
> +    }
> +
> +    /* Only get avail ring entries after they have been exposed by guest. */
> +    smp_rmb();
> +
> +    /* Grab the next descriptor number they're advertising, and increment
> +     * the index we've seen. */
> +    head = vring->vr.avail->ring[last_avail_idx % num];
> +
> +    /* If their number is silly, that's an error. */
> +    if (unlikely(head >= num)) {
> +        error_report("Guest says index %u > %u is available", head, num);
> +        vring->broken = true;
> +        return -EFAULT;
> +    }
> +
> +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> +        vring_avail_event(&vring->vr) = vring->vr.avail->idx;
> +    }
> +
> +    /* When we start there are none of either input nor output. */
> +    *out_num = *in_num = 0;
> +
> +    i = head;
> +    do {
> +        if (unlikely(i >= num)) {
> +            error_report("Desc index is %u > %u, head = %u", i, num, head);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        if (unlikely(++found > num)) {
> +            error_report("Loop detected: last one at %u vq size %u head %u",
> +                         i, num, head);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        desc = vring->vr.desc[i];
> +        if (desc.flags & VRING_DESC_F_INDIRECT) {
> +            int ret = get_indirect(vring, iov, iov_end, out_num, in_num, &desc);
> +            if (ret < 0) {
> +                return ret;
> +            }
> +            continue;
> +        }
> +
> +        /* If there are not enough iovecs left, stop for now.  The caller
> +         * should check if there are more descs available once they have dealt
> +         * with the current set.
> +         */
> +        if (iov >= iov_end) {
> +            return -ENOBUFS;
> +        }
> +
> +        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
> +                                       desc.flags & VRING_DESC_F_WRITE);
> +        if (!iov->iov_base) {
> +            error_report("Failed to map vring desc addr %#" PRIx64 " len %u",
> +                         (uint64_t)desc.addr, desc.len);
> +            vring->broken = true;
> +            return -EFAULT;
> +        }
> +        iov->iov_len  = desc.len;
> +        iov++;
> +
> +        if (desc.flags & VRING_DESC_F_WRITE) {
> +            /* If this is an input descriptor,
> +             * increment that count. */
> +            *in_num += 1;
> +        } else {
> +            /* If it's an output descriptor, they're all supposed
> +             * to come before any input descriptors. */
> +            if (unlikely(*in_num)) {
> +                error_report("Descriptor has out after in: idx %d", i);
> +                vring->broken = true;
> +                return -EFAULT;
> +            }
> +            *out_num += 1;
> +        }
> +        i = desc.next;
> +    } while (desc.flags & VRING_DESC_F_NEXT);
> +
> +    /* On success, increment avail index. */
> +    vring->last_avail_idx++;
> +    return head;
> +}
> +
> +/* After we've used one of their buffers, we tell them about it.
> + *
> + * Stolen from linux-2.6/drivers/vhost/vhost.c.
> + */
> +void vring_push(Vring *vring, unsigned int head, int len)
> +{
> +    struct vring_used_elem *used;
> +    uint16_t new;
> +
> +    /* Don't touch vring if a fatal error occurred */
> +    if (vring->broken) {
> +        return;
> +    }
> +
> +    /* The virtqueue contains a ring of used buffers.  Get a pointer to the
> +     * next entry in that used ring. */
> +    used = &vring->vr.used->ring[vring->last_used_idx % vring->vr.num];
> +    used->id = head;
> +    used->len = len;
> +
> +    /* Make sure buffer is written before we update index. */
> +    smp_wmb();
> +
> +    new = vring->vr.used->idx = ++vring->last_used_idx;
> +    if (unlikely((int16_t)(new - vring->signalled_used) < (uint16_t)1)) {
> +        vring->signalled_used_valid = false;
> +    }
> +}
> diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
> new file mode 100644
> index 0000000..7245d99
> --- /dev/null
> +++ b/hw/dataplane/vring.h
> @@ -0,0 +1,62 @@
> +/* Copyright 2012 Red Hat, Inc. and/or its affiliates
> + * Copyright IBM, Corp. 2012
> + *
> + * Based on Linux vhost code:
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2006 Rusty Russell IBM Corporation
> + *
> + * Author: Michael S. Tsirkin <mst@redhat.com>
> + *         Stefan Hajnoczi <stefanha@redhat.com>
> + *
> + * Inspiration, some code, and most witty comments come from
> + * Documentation/virtual/lguest/lguest.c, by Rusty Russell
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.
> + */
> +
> +#ifndef VRING_H
> +#define VRING_H
> +
> +#include <linux/virtio_ring.h>
> +#include "qemu-common.h"
> +#include "qemu-barrier.h"
> +#include "hw/dataplane/hostmem.h"
> +#include "hw/virtio.h"
> +
> +typedef struct {
> +    Hostmem hostmem;                /* guest memory mapper */
> +    struct vring vr;                /* virtqueue vring mapped to host memory */
> +    uint16_t last_avail_idx;        /* last processed avail ring index */
> +    uint16_t last_used_idx;         /* last processed used ring index */
> +    uint16_t signalled_used;        /* EVENT_IDX state */
> +    bool signalled_used_valid;
> +    bool broken;                    /* was there a fatal error? */
> +} Vring;
> +
> +static inline unsigned int vring_get_num(Vring *vring)
> +{
> +    return vring->vr.num;
> +}
> +
> +/* Are there more descriptors available? */
> +static inline bool vring_more_avail(Vring *vring)
> +{
> +    return vring->vr.avail->idx != vring->last_avail_idx;
> +}
> +
> +/* Fail future vring_pop() and vring_push() calls until reset */
> +static inline void vring_set_broken(Vring *vring)
> +{
> +    vring->broken = true;
> +}
> +
> +bool vring_setup(Vring *vring, VirtIODevice *vdev, int n);
> +void vring_teardown(Vring *vring);
> +void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable);
> +bool vring_should_notify(VirtIODevice *vdev, Vring *vring);
> +int vring_pop(VirtIODevice *vdev, Vring *vring,
> +              struct iovec iov[], struct iovec *iov_end,
> +              unsigned int *out_num, unsigned int *in_num);
> +void vring_push(Vring *vring, unsigned int head, int len);
> +
> +#endif /* VRING_H */
> diff --git a/trace-events b/trace-events
> index 6c6cbf1..a9a791b 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -98,6 +98,9 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"
>  virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
>  virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
>  
> +# hw/dataplane/vring.c
> +vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
> +
>  # thread-pool.c
>  thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
>  thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
> -- 
> 1.8.0
Paolo Bonzini - Nov. 29, 2012, 3:17 p.m.
> > +/* Toggle guest->host notifies */
> > +void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool
> > enable)
> > +{
> > +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> > +        if (enable) {
> > +            vring_avail_event(&vring->vr) = vring->vr.avail->idx;
> > +        }
> > +    } else if (enable) {
> > +        vring->vr.used->flags &= ~VRING_USED_F_NO_NOTIFY;
> > +    } else {
> > +        vring->vr.used->flags |= VRING_USED_F_NO_NOTIFY;
> > +    }
> > +}

This is similar to the (guest-side) virtqueue_disable_cb/virtqueue_enable_cb.

Perhaps having two functions will be easier to use, because from your
other code it looks like you'd benefit from a return value when
enable == true (again similar to virtqueue_enable_cb).

Paolo
Stefan Hajnoczi - Dec. 5, 2012, 12:57 p.m.
On Thu, Nov 29, 2012 at 02:50:01PM +0200, Michael S. Tsirkin wrote:
> On Thu, Nov 22, 2012 at 04:16:45PM +0100, Stefan Hajnoczi wrote:
> > The virtio-blk-data-plane cannot access memory using the usual QEMU
> > functions since it executes outside the global mutex and the memory APIs
> > are this time are not thread-safe.
> > 
> > This patch introduces a virtqueue module based on the kernel's vhost
> > vring code.  The trick is that we map guest memory ahead of time and
> > access it cheaply outside the global mutex.
> > 
> > Once the hardware emulation code can execute outside the global mutex it
> > will be possible to drop this code.
> > 
> > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> 
> Is there no way to factor out ommon code and share it with virtio.c?

I think we have touched on this in other sub-threads but for reference:
this code implements vring access outside the global mutex, which means
no QEMU memory API functions.  Therefore it's hard to share the virtio.c
code which uses QEMU memory API functions.

The current work that Ping Fan Liu is doing will lead to thread-safe
memory accesses from device emulation code.  At that point we can ditch
this and unify with virtio.c.

> > +/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
> 
> Probably should document the version you based this on.
> Surely not really 2.6?

linux-2.6.git is still mirrored from linux.git :).  I'll try to dig up
the specific Linux version that this code is based on.

> > +static int get_indirect(Vring *vring,
> > +                        struct iovec iov[], struct iovec *iov_end,
> > +                        unsigned int *out_num, unsigned int *in_num,
> > +                        struct vring_desc *indirect)
> > +{
> > +    struct vring_desc desc;
> > +    unsigned int i = 0, count, found = 0;
> > +
> > +    /* Sanity check */
> > +    if (unlikely(indirect->len % sizeof desc)) {
> > +        error_report("Invalid length in indirect descriptor: "
> > +                     "len %#x not multiple of %#zx",
> > +                     indirect->len, sizeof desc);
> > +        vring->broken = true;
> > +        return -EFAULT;
> > +    }
> > +
> > +    count = indirect->len / sizeof desc;
> > +    /* Buffers are chained via a 16 bit next field, so
> > +     * we can have at most 2^16 of these. */
> > +    if (unlikely(count > USHRT_MAX + 1)) {
> > +        error_report("Indirect buffer length too big: %d", indirect->len);
> > +        vring->broken = true;
> > +        return -EFAULT;
> > +    }
> > +
> > +    /* Point to translate indirect desc chain */
> > +    indirect = hostmem_lookup(&vring->hostmem, indirect->addr, indirect->len,
> > +                              false);
> 
> This assumes an indirect buffer is contigious in qemu memory
> which seems wrong since unlike vring itself
> there are no alignment requirements.

Let's break this up into one hostmem_lookup() per descriptor.  In other
words, don't try to lookup the entire indirect buffer but copy-in one
descriptor at a time.

> Overriding indirect here also seems unnecessarily tricky.

You are right, let's use a separate local variable to make the code
clearer.

> > +int vring_pop(VirtIODevice *vdev, Vring *vring,
> > +              struct iovec iov[], struct iovec *iov_end,
> > +              unsigned int *out_num, unsigned int *in_num)
> > +{
> > +    struct vring_desc desc;
> > +    unsigned int i, head, found = 0, num = vring->vr.num;
> > +    uint16_t avail_idx, last_avail_idx;
> > +
> > +    /* If there was a fatal error then refuse operation */
> > +    if (vring->broken) {
> > +        return -EFAULT;
> > +    }
> > +
> > +    /* Check it isn't doing very strange things with descriptor numbers. */
> > +    last_avail_idx = vring->last_avail_idx;
> > +    avail_idx = vring->vr.avail->idx;
> 
> I think something needs to be done here to force
> a read otherwise two accesses to avail_idx
> below can cause two reads from the ring and
> could return inconsistent results.

There is no function call or anything in between that forces the
compiler to load the value of avail_idx and reuse it.

So I think you're right.  I'm not 100% sure a read barrier forces the
compiler to load here since the following code just manipulates
last_avail_idx and avail_idx.

Any ideas?

> > +    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
> > +        vring_avail_event(&vring->vr) = vring->vr.avail->idx;
> 
> No barrier here?
> I also don't see similar code in vhost - why is it a good idea?

This is from hw/virtio.c:virtqueue_pop().  We know there is at least one
request available, we're hinting to the guest to not to bother
notifying any requests up to the latest.

However, setting avail_event to the current vring avail_idx only helps
if we get lucky and process the vring *before* the guest decides to
notify a whole bunch of requests it has just enqueued.

So this doesn't seem incorrect but the performance benefit is
questionable.

Do you remember why you wrote this code?  The commit is:

commit bcbabae8ff7f7ec114da9fe2aa7f25f420f35306
Author: Michael S. Tsirkin <mst@redhat.com>
Date:   Sun Jun 12 16:21:57 2011 +0300

    virtio: event index support

    Add support for event_idx feature, and utilize it to
    reduce the number of interrupts and exits for the guest.

Stefan

Patch

diff --git a/hw/Makefile.objs b/hw/Makefile.objs
index ea46f81..db87fbf 100644
--- a/hw/Makefile.objs
+++ b/hw/Makefile.objs
@@ -1,4 +1,4 @@ 
-common-obj-y = usb/ ide/
+common-obj-y = usb/ ide/ dataplane/
 common-obj-y += loader.o
 common-obj-$(CONFIG_VIRTIO) += virtio-console.o
 common-obj-$(CONFIG_VIRTIO) += virtio-rng.o
diff --git a/hw/dataplane/Makefile.objs b/hw/dataplane/Makefile.objs
index 8c8dea1..34e6d57 100644
--- a/hw/dataplane/Makefile.objs
+++ b/hw/dataplane/Makefile.objs
@@ -1,3 +1,3 @@ 
 ifeq ($(CONFIG_VIRTIO), y)
-common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o
+common-obj-$(CONFIG_VIRTIO_BLK_DATA_PLANE) += hostmem.o vring.o
 endif
diff --git a/hw/dataplane/vring.c b/hw/dataplane/vring.c
new file mode 100644
index 0000000..2632fbd
--- /dev/null
+++ b/hw/dataplane/vring.c
@@ -0,0 +1,344 @@ 
+/* Copyright 2012 Red Hat, Inc.
+ * Copyright IBM, Corp. 2012
+ *
+ * Based on Linux vhost code:
+ * Copyright (C) 2009 Red Hat, Inc.
+ * Copyright (C) 2006 Rusty Russell IBM Corporation
+ *
+ * Author: Michael S. Tsirkin <mst@redhat.com>
+ *         Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * Inspiration, some code, and most witty comments come from
+ * Documentation/virtual/lguest/lguest.c, by Rusty Russell
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ */
+
+#include "trace.h"
+#include "hw/dataplane/vring.h"
+
+/* Map the guest's vring to host memory */
+bool vring_setup(Vring *vring, VirtIODevice *vdev, int n)
+{
+    hwaddr vring_addr = virtio_queue_get_ring_addr(vdev, n);
+    hwaddr vring_size = virtio_queue_get_ring_size(vdev, n);
+    void *vring_ptr;
+
+    vring->broken = false;
+
+    hostmem_init(&vring->hostmem);
+    vring_ptr = hostmem_lookup(&vring->hostmem, vring_addr, vring_size, true);
+    if (!vring_ptr) {
+        error_report("Failed to map vring "
+                     "addr %#" HWADDR_PRIx " size %" HWADDR_PRIu,
+                     vring_addr, vring_size);
+        vring->broken = true;
+        return false;
+    }
+
+    vring_init(&vring->vr, virtio_queue_get_num(vdev, n), vring_ptr, 4096);
+
+    vring->last_avail_idx = 0;
+    vring->last_used_idx = 0;
+    vring->signalled_used = 0;
+    vring->signalled_used_valid = false;
+
+    trace_vring_setup(virtio_queue_get_ring_addr(vdev, n),
+                      vring->vr.desc, vring->vr.avail, vring->vr.used);
+    return true;
+}
+
+void vring_teardown(Vring *vring)
+{
+    hostmem_finalize(&vring->hostmem);
+}
+
+/* Toggle guest->host notifies */
+void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable)
+{
+    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
+        if (enable) {
+            vring_avail_event(&vring->vr) = vring->vr.avail->idx;
+        }
+    } else if (enable) {
+        vring->vr.used->flags &= ~VRING_USED_F_NO_NOTIFY;
+    } else {
+        vring->vr.used->flags |= VRING_USED_F_NO_NOTIFY;
+    }
+}
+
+/* This is stolen from linux/drivers/vhost/vhost.c:vhost_notify() */
+bool vring_should_notify(VirtIODevice *vdev, Vring *vring)
+{
+    uint16_t old, new;
+    bool v;
+    /* Flush out used index updates. This is paired
+     * with the barrier that the Guest executes when enabling
+     * interrupts. */
+    smp_mb();
+
+    if ((vdev->guest_features & VIRTIO_F_NOTIFY_ON_EMPTY) &&
+        unlikely(vring->vr.avail->idx == vring->last_avail_idx)) {
+        return true;
+    }
+
+    if (!(vdev->guest_features & VIRTIO_RING_F_EVENT_IDX)) {
+        return !(vring->vr.avail->flags & VRING_AVAIL_F_NO_INTERRUPT);
+    }
+    old = vring->signalled_used;
+    v = vring->signalled_used_valid;
+    new = vring->signalled_used = vring->last_used_idx;
+    vring->signalled_used_valid = true;
+
+    if (unlikely(!v)) {
+        return true;
+    }
+
+    return vring_need_event(vring_used_event(&vring->vr), new, old);
+}
+
+/* This is stolen from linux-2.6/drivers/vhost/vhost.c. */
+static int get_indirect(Vring *vring,
+                        struct iovec iov[], struct iovec *iov_end,
+                        unsigned int *out_num, unsigned int *in_num,
+                        struct vring_desc *indirect)
+{
+    struct vring_desc desc;
+    unsigned int i = 0, count, found = 0;
+
+    /* Sanity check */
+    if (unlikely(indirect->len % sizeof desc)) {
+        error_report("Invalid length in indirect descriptor: "
+                     "len %#x not multiple of %#zx",
+                     indirect->len, sizeof desc);
+        vring->broken = true;
+        return -EFAULT;
+    }
+
+    count = indirect->len / sizeof desc;
+    /* Buffers are chained via a 16 bit next field, so
+     * we can have at most 2^16 of these. */
+    if (unlikely(count > USHRT_MAX + 1)) {
+        error_report("Indirect buffer length too big: %d", indirect->len);
+        vring->broken = true;
+        return -EFAULT;
+    }
+
+    /* Point to translate indirect desc chain */
+    indirect = hostmem_lookup(&vring->hostmem, indirect->addr, indirect->len,
+                              false);
+    if (!indirect) {
+        error_report("Failed to map indirect desc chain "
+                     "addr %#" PRIx64 " len %u",
+                     (uint64_t)indirect->addr, indirect->len);
+        vring->broken = true;
+        return -EFAULT;
+    }
+
+    /* We will use the result as an address to read from, so most
+     * architectures only need a compiler barrier here. */
+    barrier(); /* read_barrier_depends(); */
+
+    do {
+        if (unlikely(++found > count)) {
+            error_report("Loop detected: last one at %u "
+                         "indirect size %u", i, count);
+            vring->broken = true;
+            return -EFAULT;
+        }
+
+        desc = *indirect++;
+        if (unlikely(desc.flags & VRING_DESC_F_INDIRECT)) {
+            error_report("Nested indirect descriptor");
+            vring->broken = true;
+            return -EFAULT;
+        }
+
+        /* Stop for now if there are not enough iovecs available. */
+        if (iov >= iov_end) {
+            return -ENOBUFS;
+        }
+
+        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
+                                       desc.flags & VRING_DESC_F_WRITE);
+        if (!iov->iov_base) {
+            error_report("Failed to map indirect descriptor"
+                         "addr %#" PRIx64 " len %u",
+                         (uint64_t)desc.addr, desc.len);
+            vring->broken = true;
+            return -EFAULT;
+        }
+        iov->iov_len = desc.len;
+        iov++;
+
+        /* If this is an input descriptor, increment that count. */
+        if (desc.flags & VRING_DESC_F_WRITE) {
+            *in_num += 1;
+        } else {
+            /* If it's an output descriptor, they're all supposed
+             * to come before any input descriptors. */
+            if (unlikely(*in_num)) {
+                error_report("Indirect descriptor "
+                             "has out after in: idx %d", i);
+                vring->broken = true;
+                return -EFAULT;
+            }
+            *out_num += 1;
+        }
+        i = desc.next;
+    } while (desc.flags & VRING_DESC_F_NEXT);
+    return 0;
+}
+
+/* This looks in the virtqueue and for the first available buffer, and converts
+ * it to an iovec for convenient access.  Since descriptors consist of some
+ * number of output then some number of input descriptors, it's actually two
+ * iovecs, but we pack them into one and note how many of each there were.
+ *
+ * This function returns the descriptor number found, or vq->num (which is
+ * never a valid descriptor number) if none was found.  A negative code is
+ * returned on error.
+ *
+ * Stolen from linux-2.6/drivers/vhost/vhost.c.
+ */
+int vring_pop(VirtIODevice *vdev, Vring *vring,
+              struct iovec iov[], struct iovec *iov_end,
+              unsigned int *out_num, unsigned int *in_num)
+{
+    struct vring_desc desc;
+    unsigned int i, head, found = 0, num = vring->vr.num;
+    uint16_t avail_idx, last_avail_idx;
+
+    /* If there was a fatal error then refuse operation */
+    if (vring->broken) {
+        return -EFAULT;
+    }
+
+    /* Check it isn't doing very strange things with descriptor numbers. */
+    last_avail_idx = vring->last_avail_idx;
+    avail_idx = vring->vr.avail->idx;
+
+    if (unlikely((uint16_t)(avail_idx - last_avail_idx) > num)) {
+        error_report("Guest moved used index from %u to %u",
+                     last_avail_idx, avail_idx);
+        vring->broken = true;
+        return -EFAULT;
+    }
+
+    /* If there's nothing new since last we looked. */
+    if (avail_idx == last_avail_idx) {
+        return -EAGAIN;
+    }
+
+    /* Only get avail ring entries after they have been exposed by guest. */
+    smp_rmb();
+
+    /* Grab the next descriptor number they're advertising, and increment
+     * the index we've seen. */
+    head = vring->vr.avail->ring[last_avail_idx % num];
+
+    /* If their number is silly, that's an error. */
+    if (unlikely(head >= num)) {
+        error_report("Guest says index %u > %u is available", head, num);
+        vring->broken = true;
+        return -EFAULT;
+    }
+
+    if (vdev->guest_features & (1 << VIRTIO_RING_F_EVENT_IDX)) {
+        vring_avail_event(&vring->vr) = vring->vr.avail->idx;
+    }
+
+    /* When we start there are none of either input nor output. */
+    *out_num = *in_num = 0;
+
+    i = head;
+    do {
+        if (unlikely(i >= num)) {
+            error_report("Desc index is %u > %u, head = %u", i, num, head);
+            vring->broken = true;
+            return -EFAULT;
+        }
+        if (unlikely(++found > num)) {
+            error_report("Loop detected: last one at %u vq size %u head %u",
+                         i, num, head);
+            vring->broken = true;
+            return -EFAULT;
+        }
+        desc = vring->vr.desc[i];
+        if (desc.flags & VRING_DESC_F_INDIRECT) {
+            int ret = get_indirect(vring, iov, iov_end, out_num, in_num, &desc);
+            if (ret < 0) {
+                return ret;
+            }
+            continue;
+        }
+
+        /* If there are not enough iovecs left, stop for now.  The caller
+         * should check if there are more descs available once they have dealt
+         * with the current set.
+         */
+        if (iov >= iov_end) {
+            return -ENOBUFS;
+        }
+
+        iov->iov_base = hostmem_lookup(&vring->hostmem, desc.addr, desc.len,
+                                       desc.flags & VRING_DESC_F_WRITE);
+        if (!iov->iov_base) {
+            error_report("Failed to map vring desc addr %#" PRIx64 " len %u",
+                         (uint64_t)desc.addr, desc.len);
+            vring->broken = true;
+            return -EFAULT;
+        }
+        iov->iov_len  = desc.len;
+        iov++;
+
+        if (desc.flags & VRING_DESC_F_WRITE) {
+            /* If this is an input descriptor,
+             * increment that count. */
+            *in_num += 1;
+        } else {
+            /* If it's an output descriptor, they're all supposed
+             * to come before any input descriptors. */
+            if (unlikely(*in_num)) {
+                error_report("Descriptor has out after in: idx %d", i);
+                vring->broken = true;
+                return -EFAULT;
+            }
+            *out_num += 1;
+        }
+        i = desc.next;
+    } while (desc.flags & VRING_DESC_F_NEXT);
+
+    /* On success, increment avail index. */
+    vring->last_avail_idx++;
+    return head;
+}
+
+/* After we've used one of their buffers, we tell them about it.
+ *
+ * Stolen from linux-2.6/drivers/vhost/vhost.c.
+ */
+void vring_push(Vring *vring, unsigned int head, int len)
+{
+    struct vring_used_elem *used;
+    uint16_t new;
+
+    /* Don't touch vring if a fatal error occurred */
+    if (vring->broken) {
+        return;
+    }
+
+    /* The virtqueue contains a ring of used buffers.  Get a pointer to the
+     * next entry in that used ring. */
+    used = &vring->vr.used->ring[vring->last_used_idx % vring->vr.num];
+    used->id = head;
+    used->len = len;
+
+    /* Make sure buffer is written before we update index. */
+    smp_wmb();
+
+    new = vring->vr.used->idx = ++vring->last_used_idx;
+    if (unlikely((int16_t)(new - vring->signalled_used) < (uint16_t)1)) {
+        vring->signalled_used_valid = false;
+    }
+}
diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
new file mode 100644
index 0000000..7245d99
--- /dev/null
+++ b/hw/dataplane/vring.h
@@ -0,0 +1,62 @@ 
+/* Copyright 2012 Red Hat, Inc. and/or its affiliates
+ * Copyright IBM, Corp. 2012
+ *
+ * Based on Linux vhost code:
+ * Copyright (C) 2009 Red Hat, Inc.
+ * Copyright (C) 2006 Rusty Russell IBM Corporation
+ *
+ * Author: Michael S. Tsirkin <mst@redhat.com>
+ *         Stefan Hajnoczi <stefanha@redhat.com>
+ *
+ * Inspiration, some code, and most witty comments come from
+ * Documentation/virtual/lguest/lguest.c, by Rusty Russell
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ */
+
+#ifndef VRING_H
+#define VRING_H
+
+#include <linux/virtio_ring.h>
+#include "qemu-common.h"
+#include "qemu-barrier.h"
+#include "hw/dataplane/hostmem.h"
+#include "hw/virtio.h"
+
+typedef struct {
+    Hostmem hostmem;                /* guest memory mapper */
+    struct vring vr;                /* virtqueue vring mapped to host memory */
+    uint16_t last_avail_idx;        /* last processed avail ring index */
+    uint16_t last_used_idx;         /* last processed used ring index */
+    uint16_t signalled_used;        /* EVENT_IDX state */
+    bool signalled_used_valid;
+    bool broken;                    /* was there a fatal error? */
+} Vring;
+
+static inline unsigned int vring_get_num(Vring *vring)
+{
+    return vring->vr.num;
+}
+
+/* Are there more descriptors available? */
+static inline bool vring_more_avail(Vring *vring)
+{
+    return vring->vr.avail->idx != vring->last_avail_idx;
+}
+
+/* Fail future vring_pop() and vring_push() calls until reset */
+static inline void vring_set_broken(Vring *vring)
+{
+    vring->broken = true;
+}
+
+bool vring_setup(Vring *vring, VirtIODevice *vdev, int n);
+void vring_teardown(Vring *vring);
+void vring_set_notification(VirtIODevice *vdev, Vring *vring, bool enable);
+bool vring_should_notify(VirtIODevice *vdev, Vring *vring);
+int vring_pop(VirtIODevice *vdev, Vring *vring,
+              struct iovec iov[], struct iovec *iov_end,
+              unsigned int *out_num, unsigned int *in_num);
+void vring_push(Vring *vring, unsigned int head, int len);
+
+#endif /* VRING_H */
diff --git a/trace-events b/trace-events
index 6c6cbf1..a9a791b 100644
--- a/trace-events
+++ b/trace-events
@@ -98,6 +98,9 @@  virtio_blk_rw_complete(void *req, int ret) "req %p ret %d"
 virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
 virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu"
 
+# hw/dataplane/vring.c
+vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
+
 # thread-pool.c
 thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
 thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"