diff mbox

[RFC,v2] tests/vhost-user-bridge: add vhost-user bridge application

Message ID 1445879215-9320-1-git-send-email-victork@redhat.com
State New
Headers show

Commit Message

Victor Kaplansky Oct. 26, 2015, 5:19 p.m. UTC
The test existing in QEMU for vhost-user feature is good for
testing the management protocol, but does not allow actual
traffic. This patch proposes Vhost-User Bridge application, which
can serve the QEMU community as a comprehensive test by running
real internet traffic by means of vhost-user interface.

Essentially the Vhost-User Bridge is a very basic vhost-user
backend for QEMU. It runs as a standalone user-level process.
For packet processing Vhost-User Bridge uses an additional QEMU
instance with a backend configured by "-net socket" as a shared
VLAN.  This way another QEMU virtual machine can effectively
serve as a shared bus by means of UDP communication.

For a more simple setup, the another QEMU instance running the
SLiRP backend can be the same QEMU instance running vhost-user
client.

This Vhost-User Bridge implementation is very preliminary.  It is
missing many features. I has been studying vhost-user protocol
internals, so I've written vhost-user-bridge bit by bit as I
progressed through the protocol.  Most probably its internal
architecture will change significantly.

To run Vhost-User Bridge application:

1. Build vhost-user-bridge with a regular procedure. This will
create a vhost-user-bridge executable under tests directory:

    $ configure; make tests/vhost-user-bridge

2. Ensure the machine has hugepages enabled in kernel with
command line like:

    default_hugepagesz=2M hugepagesz=2M hugepages=2048

3. Run Vhost-User Bridge with:

    $ tests/vhost-user-bridge

The above will run vhost-user server listening for connections
on UNIX domain socket /tmp/vubr.sock, and will try to connect
by UDP to VLAN bridge to localhost:5555, while listening on
localhost:4444

Run qemu with a virtio-net backed by vhost-user:

    $ qemu \
        -enable-kvm -m 512 -smp 2 \
        -object memory-backend-file,id=mem,size=512M,mem-path=/dev/hugepages,share=on \
        -numa node,memdev=mem -mem-prealloc \
        -chardev socket,id=char0,path=/tmp/vubr.sock \
        -netdev type=vhost-user,id=mynet1,chardev=char0,vhostforce \
        -device virtio-net-pci,netdev=mynet1 \
        -net none \
        -net socket,vlan=0,udp=localhost:4444,localaddr=localhost:5555 \
        -net user,vlan=0 \
        disk.img

vhost-user-bridge was tested very lightly: it's able to bring up a
Linux on client VM with the virtio-net driver, and execute transmits
and receives to the internet. I tested with "wget redhat.com",
"dig redhat.com".

PS. I've consulted DPDK's code for vhost-user during Vhost-User
Bridge implementation.

Signed-off-by: Victor Kaplansky <victork@redhat.com>
---
v2:
    Cosmetic changes:
    - Tabs expanded, trailing spaces removed.
    - Removed use of architecture specific definitions starting with _
    - Used header files available in qemu/includes.
    - Rearranged source into single file.
    - checkpatch.pl pacified.
    - Added copyright note.
    - Small spelling corrections.
    - Removed _ prefixes in function names.
    - Makefile incorporated into tests/Makefile.
    - Error handling code changed to use die().
    - Prefix "vubr" replaced by "vhost_user".
    - Structures, enums and function type names renamed to
      comply with CODING_STYLE doc.
    - Preprocessor tricks thrown away.
    - Lines are no longer than 80.

    Functional changes:
    - Added memory barriers.
    - Implemented SET_OWNER. (by doing nothing).

TODO:
    - move all declarations before code inside block.
    - main should get parameters from the command line.
    - cleanup debug printings.
    - implement all request handlers.
    - test for broken requests and virtqueue.
    - implement features defined by Virtio 1.0 spec.
    - support mergeable buffers and indirect descriptors.
    - implement RESET_DEVICE request.
    - implement clean shutdown.
    - implement non-blocking writes to UDP backend.
    - handle correctly blocking if there are no available
      descriptors on RX virtqueue.
    - implement polling strategy.
---
 tests/vhost-user-bridge.c | 1138 +++++++++++++++++++++++++++++++++++++++++++++
 tests/Makefile            |    1 +
 2 files changed, 1139 insertions(+)
 create mode 100644 tests/vhost-user-bridge.c

Comments

Michael S. Tsirkin Oct. 27, 2015, 11:40 a.m. UTC | #1
On Mon, Oct 26, 2015 at 07:19:23PM +0200, Victor Kaplansky wrote:
> The test existing in QEMU for vhost-user feature is good for
> testing the management protocol, but does not allow actual
> traffic. This patch proposes Vhost-User Bridge application, which
> can serve the QEMU community as a comprehensive test by running
> real internet traffic by means of vhost-user interface.
> 
> Essentially the Vhost-User Bridge is a very basic vhost-user
> backend for QEMU. It runs as a standalone user-level process.
> For packet processing Vhost-User Bridge uses an additional QEMU
> instance with a backend configured by "-net socket" as a shared
> VLAN.  This way another QEMU virtual machine can effectively
> serve as a shared bus by means of UDP communication.
> 
> For a more simple setup, the another QEMU instance running the
> SLiRP backend can be the same QEMU instance running vhost-user
> client.
> 
> This Vhost-User Bridge implementation is very preliminary.  It is
> missing many features. I has been studying vhost-user protocol
> internals, so I've written vhost-user-bridge bit by bit as I
> progressed through the protocol.  Most probably its internal
> architecture will change significantly.
> 
> To run Vhost-User Bridge application:
> 
> 1. Build vhost-user-bridge with a regular procedure. This will
> create a vhost-user-bridge executable under tests directory:
> 
>     $ configure; make tests/vhost-user-bridge
> 
> 2. Ensure the machine has hugepages enabled in kernel with
> command line like:
> 
>     default_hugepagesz=2M hugepagesz=2M hugepages=2048
> 
> 3. Run Vhost-User Bridge with:
> 
>     $ tests/vhost-user-bridge
> 
> The above will run vhost-user server listening for connections
> on UNIX domain socket /tmp/vubr.sock, and will try to connect
> by UDP to VLAN bridge to localhost:5555, while listening on
> localhost:4444
> 
> Run qemu with a virtio-net backed by vhost-user:
> 
>     $ qemu \
>         -enable-kvm -m 512 -smp 2 \
>         -object memory-backend-file,id=mem,size=512M,mem-path=/dev/hugepages,share=on \
>         -numa node,memdev=mem -mem-prealloc \
>         -chardev socket,id=char0,path=/tmp/vubr.sock \
>         -netdev type=vhost-user,id=mynet1,chardev=char0,vhostforce \
>         -device virtio-net-pci,netdev=mynet1 \
>         -net none \
>         -net socket,vlan=0,udp=localhost:4444,localaddr=localhost:5555 \
>         -net user,vlan=0 \
>         disk.img
> 
> vhost-user-bridge was tested very lightly: it's able to bring up a
> Linux on client VM with the virtio-net driver, and execute transmits
> and receives to the internet. I tested with "wget redhat.com",
> "dig redhat.com".
> 
> PS. I've consulted DPDK's code for vhost-user during Vhost-User
> Bridge implementation.
> 
> Signed-off-by: Victor Kaplansky <victork@redhat.com>
> ---
> v2:
>     Cosmetic changes:
>     - Tabs expanded, trailing spaces removed.
>     - Removed use of architecture specific definitions starting with _
>     - Used header files available in qemu/includes.
>     - Rearranged source into single file.
>     - checkpatch.pl pacified.
>     - Added copyright note.
>     - Small spelling corrections.
>     - Removed _ prefixes in function names.
>     - Makefile incorporated into tests/Makefile.
>     - Error handling code changed to use die().
>     - Prefix "vubr" replaced by "vhost_user".
>     - Structures, enums and function type names renamed to
>       comply with CODING_STYLE doc.
>     - Preprocessor tricks thrown away.
>     - Lines are no longer than 80.
> 
>     Functional changes:
>     - Added memory barriers.
>     - Implemented SET_OWNER. (by doing nothing).
> 
> TODO:
>     - move all declarations before code inside block.
>     - main should get parameters from the command line.
Not required for merge.
>     - cleanup debug printings.
>     - implement all request handlers.
>     - test for broken requests and virtqueue.
Not required for merge.
>     - implement features defined by Virtio 1.0 spec.
Not required for merge.
>     - support mergeable buffers and indirect descriptors.

Not required for merge.

>     - implement RESET_DEVICE request.

No need for this one I think.

>     - implement clean shutdown.
>     - implement non-blocking writes to UDP backend.
Not required for merge.
>     - handle correctly blocking if there are no available
>       descriptors on RX virtqueue.
>     - implement polling strategy.
Not required for merge.
> ---
>  tests/vhost-user-bridge.c | 1138 +++++++++++++++++++++++++++++++++++++++++++++
>  tests/Makefile            |    1 +
>  2 files changed, 1139 insertions(+)
>  create mode 100644 tests/vhost-user-bridge.c
> 
> diff --git a/tests/vhost-user-bridge.c b/tests/vhost-user-bridge.c
> new file mode 100644
> index 0000000..89f1e07
> --- /dev/null
> +++ b/tests/vhost-user-bridge.c
> @@ -0,0 +1,1138 @@
> +/*
> + * Vhost-User Bridge
> + *
> + * Copyright (c) 2015 Red Hat, Inc.
> + *
> + * Authors:
> + *  Victor Kaplansky <victork@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include <stddef.h>
> +#include <assert.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <string.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <sys/un.h>
> +#include <sys/unistd.h>
> +#include <sys/mman.h>
> +#include <sys/eventfd.h>
> +#include <arpa/inet.h>
> +
> +#include <linux/vhost.h>
> +
> +#include "qemu/atomic.h"
> +#include "standard-headers/linux/virtio_net.h"
> +#include "standard-headers/linux/virtio_ring.h"
> +
> +#define DEBUG_VHOST_USER_BRIDGE
> +
> +typedef void (*CallbackFunc)(int sock, void *ctx);
> +
> +typedef struct Event {
> +    void *ctx;
> +    CallbackFunc callback;
> +} Event;
> +
> +typedef struct Dispatcher {
> +    int max_sock;
> +    fd_set fdset;
> +    Event events[FD_SETSIZE];
> +} Dispatcher;
> +
> +static void
> +vhost_user_die(const char *s)
> +{
> +    perror(s);
> +    exit(1);
> +}
> +
> +static int
> +dispatcher_init(Dispatcher *dispr)
> +{
> +    FD_ZERO(&dispr->fdset);
> +    dispr->max_sock = -1;
> +    return 0;
> +}
> +
> +static int
> +dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb)
> +{
> +    if (sock >= FD_SETSIZE) {
> +        fprintf(stderr,
> +                "Error: Failed to add new event. sock %d should be less than %d\n",
> +                sock, FD_SETSIZE);
> +        return -1;
> +    }
> +
> +    dispr->events[sock].ctx = ctx;
> +    dispr->events[sock].callback = cb;
> +
> +    FD_SET(sock, &dispr->fdset);
> +    if (sock > dispr->max_sock) {
> +        dispr->max_sock = sock;
> +    }
> +    printf("Added sock %d for watching. max_sock: %d\n",
> +           sock, dispr->max_sock);
> +    return 0;
> +}
> +
> +#if 0
> +/* dispatcher_remove() is not currently in use but may be useful
> + * in the future. */
> +static int
> +dispatcher_remove(Dispatcher *dispr, int sock)
> +{
> +    if (sock >= FD_SETSIZE) {
> +        fprintf(stderr,
> +                "Error: Failed to remove event. sock %d should be less than %d\n",
> +                sock, FD_SETSIZE);
> +        return -1;
> +    }
> +
> +    FD_CLR(sock, &dispr->fdset);
> +    return 0;
> +}
> +#endif
> +
> +/* timeout in us */
> +static int
> +dispatcher_wait(Dispatcher *dispr, uint32_t timeout)
> +{
> +    struct timeval tv;
> +    tv.tv_sec = timeout / 1000000;
> +    tv.tv_usec = timeout % 1000000;
> +
> +    fd_set fdset = dispr->fdset;
> +
> +    /* wait until some of sockets become readable. */
> +    int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv);
> +
> +    if (rc == -1) {
> +        vhost_user_die("select");
> +    }
> +
> +    /* Timeout */
> +    if (rc == 0) {
> +        return 0;
> +    }
> +
> +    /* Now call callback for every ready socket. */
> +
> +    int sock;
> +    for (sock = 0; sock < dispr->max_sock + 1; sock++)
> +        if (FD_ISSET(sock, &fdset)) {
> +            Event *e = &dispr->events[sock];
> +            e->callback(sock, e->ctx);
> +        }
> +
> +    return 0;
> +}
> +
> +typedef struct VirtQueue {
> +    int call_fd;
> +    int kick_fd;
> +    uint32_t size;
> +    uint16_t last_avail_index;
> +    uint16_t last_used_index;
> +    struct vring_desc *desc;
> +    struct vring_avail *avail;
> +    struct vring_used *used;
> +} VirtQueue;
> +
> +/* Based on qemu/hw/virtio/vhost-user.c */
> +
> +#define VHOST_MEMORY_MAX_NREGIONS    8
> +#define VHOST_USER_F_PROTOCOL_FEATURES 30
> +
> +enum VhostUserProtocolFeature {
> +    VHOST_USER_PROTOCOL_F_MQ = 0,
> +    VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1,
> +    VHOST_USER_PROTOCOL_F_RARP = 2,
> +
> +    VHOST_USER_PROTOCOL_F_MAX
> +};
> +
> +#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1)
> +
> +typedef enum VhostUserRequest {
> +    VHOST_USER_NONE = 0,
> +    VHOST_USER_GET_FEATURES = 1,
> +    VHOST_USER_SET_FEATURES = 2,
> +    VHOST_USER_SET_OWNER = 3,
> +    VHOST_USER_RESET_DEVICE = 4,
> +    VHOST_USER_SET_MEM_TABLE = 5,
> +    VHOST_USER_SET_LOG_BASE = 6,
> +    VHOST_USER_SET_LOG_FD = 7,
> +    VHOST_USER_SET_VRING_NUM = 8,
> +    VHOST_USER_SET_VRING_ADDR = 9,
> +    VHOST_USER_SET_VRING_BASE = 10,
> +    VHOST_USER_GET_VRING_BASE = 11,
> +    VHOST_USER_SET_VRING_KICK = 12,
> +    VHOST_USER_SET_VRING_CALL = 13,
> +    VHOST_USER_SET_VRING_ERR = 14,
> +    VHOST_USER_GET_PROTOCOL_FEATURES = 15,
> +    VHOST_USER_SET_PROTOCOL_FEATURES = 16,
> +    VHOST_USER_GET_QUEUE_NUM = 17,
> +    VHOST_USER_SET_VRING_ENABLE = 18,
> +    VHOST_USER_SEND_RARP = 19,
> +    VHOST_USER_MAX
> +} VhostUserRequest;


Maybe we need a common copy under tests/

> +
> +typedef struct VhostUserMemoryRegion {
> +    uint64_t guest_phys_addr;
> +    uint64_t memory_size;
> +    uint64_t userspace_addr;
> +    uint64_t mmap_offset;
> +} VhostUserMemoryRegion;
> +
> +typedef struct VhostUserMemory {
> +    uint32_t nregions;
> +    uint32_t padding;
> +    VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS];
> +} VhostUserMemory;
> +
> +typedef struct VhostUserMsg {
> +    VhostUserRequest request;
> +
> +#define VHOST_USER_VERSION_MASK     (0x3)
> +#define VHOST_USER_REPLY_MASK       (0x1<<2)
> +    uint32_t flags;
> +    uint32_t size; /* the following payload size */
> +    union {
> +#define VHOST_USER_VRING_IDX_MASK   (0xff)
> +#define VHOST_USER_VRING_NOFD_MASK  (0x1<<8)
> +        uint64_t u64;
> +        struct vhost_vring_state state;
> +        struct vhost_vring_addr addr;
> +        VhostUserMemory memory;
> +    } payload;
> +    int fds[VHOST_MEMORY_MAX_NREGIONS];
> +    int fd_num;
> +} QEMU_PACKED VhostUserMsg;
> +
> +#define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64)
> +
> +/* The version of the protocol we support */
> +#define VHOST_USER_VERSION    (0x1)
> +
> +#define MAX_NR_VIRTQUEUE (8)
> +
> +typedef struct VhostDevRegion {
> +    /* Guest Phhysical address. */
> +    uint64_t gpa;
> +    /* Memory region size. */
> +    uint64_t size;
> +    /* QEMU virtual address (userspace). */
> +    uint64_t qva;
> +    /* Starting offset in our mmaped space. */
> +    uint64_t mmap_offset;
> +    /* Start addrtess of mmaped space. */
> +    uint64_t mmap_addr;
> +} VhostDevRegion;
> +
> +typedef struct VhostDev {
> +    int sock;
> +    Dispatcher dispatcher;
> +    uint32_t nregions;
> +    VhostDevRegion regions[VHOST_MEMORY_MAX_NREGIONS];
> +    VirtQueue virtqueue[MAX_NR_VIRTQUEUE];
> +    int backend_udp_sock;
> +    struct sockaddr_in backend_udp_dest;
> +} VhostDev;
> +
> +static const char *vhost_user_request_str[] = {
> +    [VHOST_USER_NONE]                   =  "VHOST_USER_NONE",
> +    [VHOST_USER_GET_FEATURES]           =  "VHOST_USER_GET_FEATURES",
> +    [VHOST_USER_SET_FEATURES]           =  "VHOST_USER_SET_FEATURES",
> +    [VHOST_USER_SET_OWNER]              =  "VHOST_USER_SET_OWNER",
> +    [VHOST_USER_RESET_DEVICE]           =  "VHOST_USER_RESET_DEVICE",
> +    [VHOST_USER_SET_MEM_TABLE]          =  "VHOST_USER_SET_MEM_TABLE",
> +    [VHOST_USER_SET_LOG_BASE]           =  "VHOST_USER_SET_LOG_BASE",
> +    [VHOST_USER_SET_LOG_FD]             =  "VHOST_USER_SET_LOG_FD",
> +    [VHOST_USER_SET_VRING_NUM]          =  "VHOST_USER_SET_VRING_NUM",
> +    [VHOST_USER_SET_VRING_ADDR]         =  "VHOST_USER_SET_VRING_ADDR",
> +    [VHOST_USER_SET_VRING_BASE]         =  "VHOST_USER_SET_VRING_BASE",
> +    [VHOST_USER_GET_VRING_BASE]         =  "VHOST_USER_GET_VRING_BASE",
> +    [VHOST_USER_SET_VRING_KICK]         =  "VHOST_USER_SET_VRING_KICK",
> +    [VHOST_USER_SET_VRING_CALL]         =  "VHOST_USER_SET_VRING_CALL",
> +    [VHOST_USER_SET_VRING_ERR]          =  "VHOST_USER_SET_VRING_ERR",
> +    [VHOST_USER_GET_PROTOCOL_FEATURES]  =  "VHOST_USER_GET_PROTOCOL_FEATURES",
> +    [VHOST_USER_SET_PROTOCOL_FEATURES]  =  "VHOST_USER_SET_PROTOCOL_FEATURES",
> +    [VHOST_USER_GET_QUEUE_NUM]          =  "VHOST_USER_GET_QUEUE_NUM",
> +    [VHOST_USER_SET_VRING_ENABLE]       =  "VHOST_USER_SET_VRING_ENABLE",
> +    [VHOST_USER_SEND_RARP]              =  "VHOST_USER_SEND_RARP",
> +    [VHOST_USER_MAX]                    =  "VHOST_USER_MAX",
> +};
> +
> +static void
> +print_buffer(uint8_t *buf, size_t len)
> +{
> +    int i;
> +    printf("Raw buffer:\n");
> +    for (i = 0; i < len; i++) {
> +        if (i % 16 == 0) {
> +            printf("\n");
> +        }
> +        if (i % 4 == 0) {
> +            printf("   ");
> +        }
> +        printf("%02x ", buf[i]);
> +    }
> +    printf("\n............................................................\n");
> +}
> +
> +/* Translate guest physical address to our virtual address.  */
> +static uint64_t
> +gpa_to_va(VhostDev *dev, uint64_t guest_addr)
> +{
> +    int i;
> +
> +    /* Find matching memory region.  */
> +    for (i = 0; i < dev->nregions; i++) {
> +        VhostDevRegion *r = &dev->regions[i];
> +
> +        if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) {
> +            return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset;
> +        }
> +    }
> +
> +    assert(!"address not found in regions");
> +    return 0;
> +}
> +
> +/* Translate qemu virtual address to our virtual address.  */
> +static uint64_t
> +qva_to_va(VhostDev *dev, uint64_t qemu_addr)
> +{
> +    int i;
> +
> +    /* Find matching memory region.  */
> +    for (i = 0; i < dev->nregions; i++) {
> +        VhostDevRegion *r = &dev->regions[i];
> +
> +        if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) {
> +            return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset;
> +        }
> +    }
> +
> +    assert(!"address not found in regions");
> +    return 0;
> +}
> +
> +static void
> +vhost_user_message_read(int conn_fd, VhostUserMsg *vmsg)
> +{
> +    char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { };
> +    struct iovec iov = {
> +        .iov_base = (char *)vmsg,
> +        .iov_len = VHOST_USER_HDR_SIZE,
> +    };
> +    struct msghdr msg = {
> +        .msg_iov = &iov,
> +        .msg_iovlen = 1,
> +        .msg_control = control,
> +        .msg_controllen = sizeof(control),
> +    };
> +    size_t fd_size;
> +    struct cmsghdr *cmsg;
> +    int rc;
> +
> +    rc = recvmsg(conn_fd, &msg, 0);
> +
> +    if (rc <= 0) {
> +        vhost_user_die("recvmsg");
> +    }
> +
> +    vmsg->fd_num = 0;
> +    for (cmsg = CMSG_FIRSTHDR(&msg);
> +         cmsg != NULL;
> +         cmsg = CMSG_NXTHDR(&msg, cmsg))
> +    {
> +        if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
> +            fd_size = cmsg->cmsg_len - CMSG_LEN(0);
> +            vmsg->fd_num = fd_size / sizeof(int);
> +            memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size);
> +            break;
> +        }
> +    }
> +
> +    if (vmsg->size > sizeof(vmsg->payload)) {
> +        fprintf(stderr,
> +                "Error: too big message request: %d, size: vmsg->size: %u, "
> +                "while sizeof(vmsg->payload) = %lu\n",
> +                vmsg->request, vmsg->size, sizeof(vmsg->payload));
> +        exit(1);
> +    }
> +
> +    if (vmsg->size) {
> +        rc = read(conn_fd, &vmsg->payload, vmsg->size);
> +        if (rc <= 0) {
> +            vhost_user_die("recvmsg");
> +        }
> +
> +        assert(rc == vmsg->size);
> +    }
> +}
> +
> +static void
> +vhost_user_message_write(int conn_fd, VhostUserMsg *vmsg)
> +{
> +    int rc;
> +    do {
> +        rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size);
> +    } while (rc < 0 && errno == EINTR);
> +
> +    if (rc < 0) {
> +        vhost_user_die("write");
> +    }
> +}
> +
> +static void
> +vhost_user_backend_udp_sendbuf(VhostDev *dev,
> +                                uint8_t *buf,
> +                                size_t len)
> +{
> +    int slen = sizeof(struct sockaddr_in);
> +
> +    if (sendto(dev->backend_udp_sock, buf, len, 0,
> +               (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) {
> +        vhost_user_die("sendto()");
> +    }
> +}
> +
> +static int
> +vhost_user_backend_udp_recvbuf(VhostDev *dev,
> +                                uint8_t *buf,
> +                                size_t buflen)
> +{
> +    int slen = sizeof(struct sockaddr_in);
> +    int rc;
> +
> +    rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0,
> +                  (struct sockaddr *) &dev->backend_udp_dest,
> +                  (socklen_t *)&slen);
> +    if (rc == -1) {
> +        vhost_user_die("recvfrom()");
> +    }
> +
> +    return rc;
> +}
> +
> +static void
> +vubr_consume_raw_packet(VhostDev *dev, uint8_t *buf, uint32_t len)
> +{
> +    int hdrlen = sizeof(struct virtio_net_hdr_v1);
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +    print_buffer(buf, len);
> +#endif
> +    vhost_user_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen);
> +}
> +
> +/* Kick the guest if necessary. */
> +static void
> +virtqueue_kick(VirtQueue *vq)
> +{
> +    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
> +        printf("Kicking the guest...\n");
> +        eventfd_write(vq->call_fd, 1);
> +    }
> +}
> +
> +static void
> +vubr_post_buffer(VhostDev *dev,
> +                 VirtQueue *vq,
> +                 uint8_t *buf,
> +                 int32_t len)
> +{
> +    struct vring_desc *desc   = vq->desc;
> +    struct vring_avail *avail = vq->avail;
> +    struct vring_used *used   = vq->used;
> +
> +    unsigned int size = vq->size;
> +
> +    assert(vq->last_avail_index != avail->idx);

Why? How do you know there's anything there?

> +    /* Prevent accessing descriptors, buffers, avail->ring and used before
> +     * avail->idx */

smp_rmb then? Can be fixed later ...

> +    smp_mb();
> +
> +    uint16_t a_index = vq->last_avail_index % size;
> +    uint16_t u_index = vq->last_used_index % size;
> +    uint16_t d_index = avail->ring[a_index];
> +
> +    int i = d_index;
> +
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +    printf("Post packet to guest on vq:\n");
> +    printf("    size             = %d\n", vq->size);
> +    printf("    last_avail_index = %d\n", vq->last_avail_index);
> +    printf("    last_used_index  = %d\n", vq->last_used_index);
> +    printf("    a_index = %d\n", a_index);
> +    printf("    u_index = %d\n", u_index);
> +    printf("    d_index = %d\n", d_index);
> +    printf("    desc[%d].addr    = 0x%016"PRIx64"\n", i, desc[i].addr);
> +    printf("    desc[%d].len     = %d\n", i, desc[i].len);
> +    printf("    desc[%d].flags   = %d\n", i, desc[i].flags);
> +    printf("    avail->idx = %d\n", avail->idx);
> +    printf("    used->idx  = %d\n", used->idx);
> +#endif
> +
> +    if (!(desc[i].flags & VRING_DESC_F_WRITE)) {
> +        /* FIXME: we should find writable descriptor. */
> +        fprintf(stderr, "Error: descriptor is not writable. Exiting.\n");
> +        exit(1);
> +    }
> +
> +    void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
> +    uint32_t chunk_len = desc[i].len;
> +
> +    if (len <= chunk_len) {
> +        memcpy(chunk_start, buf, len);
> +    } else {
> +        fprintf(stderr,
> +                "Received too long packet from the backend. Dropping...\n");
> +        return;
> +    }
> +
> +    /* Add descriptor to the used ring. */
> +    used->ring[u_index].id = d_index;
> +    used->ring[u_index].len = len;
> +
> +    vq->last_avail_index++;
> +    vq->last_used_index++;
> +
> +    /* Prevent accessing avail, descriptors, buffers and used->ring after
> +     * the store to used->idx */
> +    smp_mb();
> +    used->idx = vq->last_used_index;
> +
> +    /* Kick the guest if necessary. */
> +    virtqueue_kick(vq);
> +}
> +
> +static int
> +vubr_process_desc(VhostDev *dev, VirtQueue *vq)
> +{
> +    struct vring_desc *desc   = vq->desc;
> +    struct vring_avail *avail = vq->avail;
> +    struct vring_used *used   = vq->used;
> +
> +    unsigned int size = vq->size;
> +
> +    uint16_t a_index = vq->last_avail_index % size;
> +    uint16_t u_index = vq->last_used_index % size;
> +    uint16_t d_index = avail->ring[a_index];
> +
> +    uint32_t i, len = 0;
> +    size_t buf_size = 4096;
> +    uint8_t buf[4096];
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +    printf("Chunks: ");
> +#endif
> +
> +    i = d_index;
> +    do {
> +        void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
> +        uint32_t chunk_len = desc[i].len;
> +
> +        if (len + chunk_len < buf_size) {
> +            memcpy(buf + len, chunk_start, chunk_len);
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +            printf("%d ", chunk_len);
> +#endif

Wrap these in a macro so you don't need ifdefs in code.

> +        } else {
> +            fprintf(stderr, "Error: too long packet. Dropping...\n");
> +            break;
> +        }
> +
> +        len += chunk_len;
> +
> +        if (!(desc[i].flags & VRING_DESC_F_NEXT)) {
> +            break;
> +        }
> +
> +        i = desc[i].next;
> +    } while (1);
> +
> +    if (!len) {
> +        return -1;
> +    }
> +
> +    /* Add descriptor to the used ring. */
> +    used->ring[u_index].id = d_index;
> +    used->ring[u_index].len = len;
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +    printf("\n");
> +#endif
> +
> +    vubr_consume_raw_packet(dev, buf, len);
> +
> +    return 0;
> +}
> +
> +static void
> +vubr_process_avail(VhostDev *dev, VirtQueue *vq)
> +{
> +    struct vring_avail *avail = vq->avail;
> +    struct vring_used *used = vq->used;
> +
> +    while (vq->last_avail_index != avail->idx) {
> +        /* Prevent accessing avail->ring, descriptors and buffers before
> +         * avail->idx */
> +        smp_mb();
> +
> +        vubr_process_desc(dev, vq);
> +        vq->last_avail_index++;
> +        vq->last_used_index++;
> +    }
> +
> +    /* Prevent accessing avail->ring, descriptors, buffers and used->ring,
> +     * after user->idx */
> +    smp_mb();
> +
> +    used->idx = vq->last_used_index;
> +}
> +
> +static void
> +vubr_backend_recv_cb(int sock, void *ctx)
> +{
> +    VhostDev *dev = (VhostDev *) ctx;
> +    VirtQueue *rx_vq = &dev->virtqueue[0];
> +    uint8_t buf[4096];
> +    struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf;
> +    int hdrlen = sizeof(struct virtio_net_hdr_v1);
> +    int buflen = sizeof(buf);
> +    int len;
> +
> +#ifdef DEBUG_VHOST_USER_BRIDGE
> +    printf("\n\n   ***   IN UDP RECEIVE CALLBACK    ***\n\n");
> +#endif
> +
> +    *hdr = (struct virtio_net_hdr_v1) { };
> +    hdr->num_buffers = 1;
> +
> +    len = vhost_user_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen);
> +    vubr_post_buffer(dev, rx_vq, buf, len + hdrlen);
> +}
> +
> +static void
> +vubr_kick_cb(int sock, void *ctx)
> +{
> +    VhostDev *dev = (VhostDev *) ctx;
> +    eventfd_t kick_data;
> +    ssize_t rc;
> +
> +    rc = eventfd_read(sock, &kick_data);
> +
> +    if (rc == -1) {
> +        vhost_user_die("eventfd_read()");
> +    } else {
> +        printf("Got kick_data: %016"PRIx64"\n", kick_data);
> +        vubr_process_avail(dev, &dev->virtqueue[1]);
> +    }
> +}
> +
> +static int
> +vhost_user_none_exec(VhostDev *dev,
> +                  VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_get_features_exec(VhostDev *dev,
> +                          VhostUserMsg *vmsg)

Please prefix everything with vubr_ consistently.
Same applies to types etc.


> +{
> +    vmsg->payload.u64 =
> +            ((1ULL << VIRTIO_NET_F_MRG_RXBUF) |
> +             (1ULL << VIRTIO_NET_F_CTRL_VQ) |
> +             (1ULL << VIRTIO_NET_F_CTRL_RX) |
> +             (1ULL << VHOST_F_LOG_ALL));
> +    vmsg->size = sizeof(vmsg->payload.u64);
> +
> +    printf("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +
> +    /* reply */
> +    return 1;
> +}
> +
> +static int
> +vhost_user_set_features_exec(VhostDev *dev,
> +                          VhostUserMsg *vmsg)
> +{
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_owner_exec(VhostDev *dev,
> +                       VhostUserMsg *vmsg)
> +{
> +    return 0;
> +}
> +
> +static int
> +vhost_user_reset_device_exec(VhostDev *dev,
> +                          VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_mem_table_exec(VhostDev *dev,
> +                           VhostUserMsg *vmsg)
> +{
> +    printf("Nregions: %d\n", vmsg->payload.memory.nregions);
> +
> +    VhostUserMemory *memory = &vmsg->payload.memory;
> +    dev->nregions = memory->nregions;
> +    int i;
> +    for (i = 0; i < dev->nregions; i++) {
> +        VhostUserMemoryRegion *msg_region = &memory->regions[i];
> +        VhostDevRegion *dev_region = &dev->regions[i];
> +
> +        printf("Region %d\n", i);
> +        printf("    guest_phys_addr: 0x%016"PRIx64"\n",
> +               msg_region->guest_phys_addr);
> +        printf("    memory_size:     0x%016"PRIx64"\n",
> +               msg_region->memory_size);
> +        printf("    userspace_addr   0x%016"PRIx64"\n",
> +               msg_region->userspace_addr);
> +        printf("    mmap_offset      0x%016"PRIx64"\n",
> +               msg_region->mmap_offset);
> +
> +        dev_region->gpa         = msg_region->guest_phys_addr;
> +        dev_region->size        = msg_region->memory_size;
> +        dev_region->qva         = msg_region->userspace_addr;
> +        dev_region->mmap_offset = msg_region->mmap_offset;
> +
> +        void *mmap_addr;
> +
> +        /* We don't use offset argument of mmap() since the
> +         * mapped address has to be page aligned, and we use huge
> +         * pages.  */
> +        mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset,
> +                         PROT_READ | PROT_WRITE, MAP_SHARED,
> +                         vmsg->fds[i], 0);
> +
> +        if (mmap_addr == MAP_FAILED) {
> +            vhost_user_die("mmap");
> +        }
> +
> +        dev_region->mmap_addr = (uint64_t) mmap_addr;
> +        printf("    mmap_addr:       0x%016"PRIx64"\n", dev_region->mmap_addr);
> +    }
> +
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_log_base_exec(VhostDev *dev,
> +                          VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_log_fd_exec(VhostDev *dev,
> +                        VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_num_exec(VhostDev *dev,
> +                           VhostUserMsg *vmsg)
> +{
> +    unsigned int index = vmsg->payload.state.index;
> +    unsigned int num = vmsg->payload.state.num;
> +
> +    printf("State.index: %d\n", index);
> +    printf("State.num:   %d\n", num);
> +    dev->virtqueue[index].size = num;
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_addr_exec(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    struct vhost_vring_addr *vra = &vmsg->payload.addr;
> +    printf("vhost_vring_addr:\n");
> +    printf("    index:  %d\n", vra->index);
> +    printf("    flags:  %d\n", vra->flags);
> +    printf("    desc_user_addr:   0x%016llx\n", vra->desc_user_addr);
> +    printf("    used_user_addr:   0x%016llx\n", vra->used_user_addr);
> +    printf("    avail_user_addr:  0x%016llx\n", vra->avail_user_addr);
> +    printf("    log_guest_addr:   0x%016llx\n", vra->log_guest_addr);
> +
> +    unsigned int index = vra->index;
> +    VirtQueue *vq = &dev->virtqueue[index];
> +
> +    vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr);
> +    vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr);
> +    vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr);
> +
> +    printf("Setting virtq addresses:\n");
> +    printf("    vring_desc  at %p\n", vq->desc);
> +    printf("    vring_used  at %p\n", vq->used);
> +    printf("    vring_avail at %p\n", vq->avail);
> +
> +    vq->last_used_index = vq->used->idx;
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_base_exec(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    unsigned int index = vmsg->payload.state.index;
> +    unsigned int num = vmsg->payload.state.num;
> +
> +    printf("State.index: %d\n", index);
> +    printf("State.num:   %d\n", num);
> +    dev->virtqueue[index].last_avail_index = num;
> +
> +    return 0;
> +}
> +
> +static int
> +vhost_user_get_vring_base_exec(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_kick_exec(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    uint64_t u64_arg = vmsg->payload.u64;
> +    int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
> +
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +
> +    assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
> +    assert(vmsg->fd_num == 1);
> +
> +    dev->virtqueue[index].kick_fd = vmsg->fds[0];
> +    printf("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index);
> +
> +    if (index % 2 == 1) {
> +        /* TX queue. */
> +        dispatcher_add(&dev->dispatcher, dev->virtqueue[index].kick_fd,
> +                       dev, vubr_kick_cb);
> +
> +        printf("Waiting for kicks on fd: %d for vq: %d\n",
> +               dev->virtqueue[index].kick_fd, index);
> +    }
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_call_exec(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    uint64_t u64_arg = vmsg->payload.u64;
> +    int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
> +
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +    assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
> +    assert(vmsg->fd_num == 1);
> +
> +    dev->virtqueue[index].call_fd = vmsg->fds[0];
> +    printf("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index);
> +
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_err_exec(VhostDev *dev,
> +                           VhostUserMsg *vmsg)
> +{
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_get_protocol_features_exec(VhostDev *dev,
> +                                   VhostUserMsg *vmsg)
> +{
> +    /* FIXME: unimplented */
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_protocol_features_exec(VhostDev *dev,
> +                                   VhostUserMsg *vmsg)
> +{
> +    /* FIXME: unimplented */
> +    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_get_queue_num_exec(VhostDev *dev,
> +                           VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_set_vring_enable_exec(VhostDev *dev,
> +                              VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_send_rarp_exec(VhostDev *dev,
> +                              VhostUserMsg *vmsg)
> +{
> +    printf("Function %s() not implemented yet.\n", __func__);
> +    return 0;
> +}
> +
> +static int
> +vhost_user_execute_request(VhostDev *dev,
> +                            VhostUserMsg *vmsg)
> +{
> +    /* Print out generic part of the request. */
> +    printf(
> +           "==================   Vhost user message from QEMU   ==================\n");
> +    printf("Request: %s (%d)\n", vhost_user_request_str[vmsg->request],
> +           vmsg->request);
> +    printf("Flags:   0x%x\n", vmsg->flags);
> +    printf("Size:    %d\n", vmsg->size);
> +
> +    if (vmsg->fd_num) {
> +        int i;
> +        printf("Fds:");
> +        for (i = 0; i < vmsg->fd_num; i++) {
> +            printf(" %d", vmsg->fds[i]);
> +        }
> +        printf("\n");
> +    }
> +
> +    switch (vmsg->request) {
> +    case VHOST_USER_NONE:
> +        return vhost_user_none_exec(dev, vmsg);
> +    case VHOST_USER_GET_FEATURES:
> +        return vhost_user_get_features_exec(dev, vmsg);
> +    case VHOST_USER_SET_FEATURES:
> +        return vhost_user_set_features_exec(dev, vmsg);
> +    case VHOST_USER_SET_OWNER:
> +        return vhost_user_set_owner_exec(dev, vmsg);
> +    case VHOST_USER_RESET_DEVICE:
> +        return vhost_user_reset_device_exec(dev, vmsg);
> +    case VHOST_USER_SET_MEM_TABLE:
> +        return vhost_user_set_mem_table_exec(dev, vmsg);
> +    case VHOST_USER_SET_LOG_BASE:
> +        return vhost_user_set_log_base_exec(dev, vmsg);
> +    case VHOST_USER_SET_LOG_FD:
> +        return vhost_user_set_log_fd_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_NUM:
> +        return vhost_user_set_vring_num_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_ADDR:
> +        return vhost_user_set_vring_addr_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_BASE:
> +        return vhost_user_set_vring_base_exec(dev, vmsg);
> +    case VHOST_USER_GET_VRING_BASE:
> +        return vhost_user_get_vring_base_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_KICK:
> +        return vhost_user_set_vring_kick_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_CALL:
> +        return vhost_user_set_vring_call_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_ERR:
> +        return vhost_user_set_vring_err_exec(dev, vmsg);
> +    case VHOST_USER_GET_PROTOCOL_FEATURES:
> +        return vhost_user_get_protocol_features_exec(dev, vmsg);
> +    case VHOST_USER_SET_PROTOCOL_FEATURES:
> +        return vhost_user_set_protocol_features_exec(dev, vmsg);
> +    case VHOST_USER_GET_QUEUE_NUM:
> +        return vhost_user_get_queue_num_exec(dev, vmsg);
> +    case VHOST_USER_SET_VRING_ENABLE:
> +        return vhost_user_set_vring_enable_exec(dev, vmsg);
> +    case VHOST_USER_SEND_RARP:
> +        return vhost_user_send_rarp_exec(dev, vmsg);
> +
> +
> +    case VHOST_USER_MAX:
> +        assert(vmsg->request != VHOST_USER_MAX);
> +    }
> +    return 0;
> +}
> +
> +static void
> +vhost_user_receive_cb(int sock, void *ctx)
> +{
> +    VhostDev *dev = (VhostDev *) ctx;
> +    VhostUserMsg vmsg;
> +
> +    vhost_user_message_read(sock, &vmsg);
> +
> +    int reply_requested = vhost_user_execute_request(dev, &vmsg);
> +
> +    if (reply_requested) {
> +        /* Set the version in the flags when sending the reply */
> +        vmsg.flags &= ~VHOST_USER_VERSION_MASK;
> +        vmsg.flags |= VHOST_USER_VERSION;
> +        vmsg.flags |= VHOST_USER_REPLY_MASK;
> +        vhost_user_message_write(sock, &vmsg);
> +    }
> +}
> +
> +static void
> +vhost_user_accept_cb(int sock, void *ctx)
> +{
> +    VhostDev *dev = (VhostDev *)ctx;
> +    int conn_fd;
> +    struct sockaddr_un un;
> +    socklen_t len = sizeof(un);
> +
> +    conn_fd = accept(sock, (struct sockaddr *) &un, &len);
> +    if (conn_fd  == -1) {
> +        vhost_user_die("accept()");
> +    }
> +    printf("Got connection from remote peer on sock %d\n", conn_fd);
> +    dispatcher_add(&dev->dispatcher, conn_fd, ctx, vhost_user_receive_cb);
> +}
> +
> +static VhostDev *
> +vhost_user_new(const char *path)
> +{
> +    VhostDev *dev =
> +        (VhostDev *) calloc(1, sizeof(VhostDev));
> +
> +    dev->nregions = 0;
> +
> +    int i;
> +    for (i = 0; i < MAX_NR_VIRTQUEUE; i++) {
> +        dev->virtqueue[i] = (VirtQueue) {
> +            .call_fd = -1, .kick_fd = -1,
> +            .size = 0,
> +            .last_avail_index = 0, .last_used_index = 0,
> +            .desc = 0, .avail = 0, .used = 0,
> +        };
> +    }
> +
> +    /* Get a UNIX socket. */
> +    dev->sock = socket(AF_UNIX, SOCK_STREAM, 0);
> +    if (dev->sock == -1) {
> +        vhost_user_die("socket");
> +    }
> +
> +    struct sockaddr_un un;
> +    un.sun_family = AF_UNIX;
> +    strcpy(un.sun_path, path);
> +
> +    size_t len = sizeof(un.sun_family) + strlen(path);
> +
> +    unlink(path);
> +
> +    if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) {
> +        vhost_user_die("bind");
> +    }
> +
> +    if (listen(dev->sock, 1) == -1) {
> +        vhost_user_die("listen");
> +    }
> +
> +    dispatcher_init(&dev->dispatcher);
> +    dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev,
> +                   vhost_user_accept_cb);
> +
> +    printf("Waiting for connections on UNIX socket %s ...\n", path);
> +    return dev;
> +}
> +
> +static void
> +vhost_user_backend_udp_setup(VhostDev *dev,
> +                             const char *local_host,
> +                             uint16_t local_port,
> +                             const char *dest_host,
> +                             uint16_t dest_port)
> +{
> +
> +    struct sockaddr_in si_local;
> +    int sock;
> +
> +    sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
> +    if (sock == -1) {
> +        vhost_user_die("socket");
> +    }
> +
> +    memset((char *) &si_local, 0, sizeof(struct sockaddr_in));
> +    si_local.sin_family = AF_INET;
> +    si_local.sin_port = htons(local_port);
> +    if (inet_aton(local_host, &si_local.sin_addr) == 0) {
> +        fprintf(stderr, "inet_aton() failed.\n");
> +        exit(1);
> +    }
> +
> +    if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) {
> +        vhost_user_die("bind");
> +    }
> +
> +    /* setup destination for sends */
> +    struct sockaddr_in *si_remote = &dev->backend_udp_dest;
> +    memset((char *) si_remote, 0, sizeof(struct sockaddr_in));
> +    si_remote->sin_family = AF_INET;
> +    si_remote->sin_port = htons(dest_port);
> +    if (inet_aton(dest_host, &si_remote->sin_addr) == 0) {
> +        fprintf(stderr, "inet_aton() failed.\n");
> +        exit(1);
> +    }
> +
> +    dev->backend_udp_sock = sock;
> +    dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb);
> +    printf("Waiting for data from udp backend on %s:%d...\n",
> +           local_host, local_port);
> +}
> +
> +static void
> +vhost_user_run(VhostDev *dev)
> +{
> +    while (1) {
> +        /* timeout 200ms */
> +        dispatcher_wait(&dev->dispatcher, 200000);
> +        /* Here one can try polling strategy. */
> +    }
> +}
> +
> +int
> +main(int argc, char *argv[])
> +{
> +    VhostDev *dev;
> +
> +    dev = vhost_user_new("/tmp/vubr.sock");
> +    if (!dev) {
> +        return 1;
> +    }
> +
> +    vhost_user_backend_udp_setup(dev,
> +                                 "127.0.0.1", 4444,
> +                                 "127.0.0.1", 5555);
> +    vhost_user_run(dev);
> +    return 0;
> +}
> diff --git a/tests/Makefile b/tests/Makefile
> index 9341498..0811c68 100644
> --- a/tests/Makefile
> +++ b/tests/Makefile
> @@ -522,6 +522,7 @@ tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_hel
>  tests/test-qemu-opts$(EXESUF): tests/test-qemu-opts.o $(test-util-obj-y)
>  tests/test-write-threshold$(EXESUF): tests/test-write-threshold.o $(test-block-obj-y)
>  tests/test-netfilter$(EXESUF): tests/test-netfilter.o $(qtest-obj-y)
> +tests/vhost-user-bridge$(EXESUF): tests/vhost-user-bridge.o

Needs to be limited to when there's actual vhost-user support.
Same as existing vhost-user-test I guess.

>  
>  ifeq ($(CONFIG_POSIX),y)
>  LIBS += -lutil
> -- 
> --Victor
Victor Kaplansky Oct. 27, 2015, 1:29 p.m. UTC | #2
On Tue, Oct 27, 2015 at 01:40:03PM +0200, Michael S. Tsirkin wrote:
> On Mon, Oct 26, 2015 at 07:19:23PM +0200, Victor Kaplansky wrote:
> > +
> > +#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1)
> > +
> > +typedef enum VhostUserRequest {
> > +    VHOST_USER_NONE = 0,
> > +    VHOST_USER_GET_FEATURES = 1,
> > +    VHOST_USER_SET_FEATURES = 2,
> > +    VHOST_USER_SET_OWNER = 3,
> > +    VHOST_USER_RESET_DEVICE = 4,
> > +    VHOST_USER_SET_MEM_TABLE = 5,
> > +    VHOST_USER_SET_LOG_BASE = 6,
> > +    VHOST_USER_SET_LOG_FD = 7,
> > +    VHOST_USER_SET_VRING_NUM = 8,
> > +    VHOST_USER_SET_VRING_ADDR = 9,
> > +    VHOST_USER_SET_VRING_BASE = 10,
> > +    VHOST_USER_GET_VRING_BASE = 11,
> > +    VHOST_USER_SET_VRING_KICK = 12,
> > +    VHOST_USER_SET_VRING_CALL = 13,
> > +    VHOST_USER_SET_VRING_ERR = 14,
> > +    VHOST_USER_GET_PROTOCOL_FEATURES = 15,
> > +    VHOST_USER_SET_PROTOCOL_FEATURES = 16,
> > +    VHOST_USER_GET_QUEUE_NUM = 17,
> > +    VHOST_USER_SET_VRING_ENABLE = 18,
> > +    VHOST_USER_SEND_RARP = 19,
> > +    VHOST_USER_MAX
> > +} VhostUserRequest;
> 
> 
> Maybe we need a common copy under tests/

Definitely. Even better to have a common header with linux.
Can be done as a separate patch?

> > +static void
> > +vubr_post_buffer(VhostDev *dev,
> > +                 VirtQueue *vq,
> > +                 uint8_t *buf,
> > +                 int32_t len)
> > +{
> > +    struct vring_desc *desc   = vq->desc;
> > +    struct vring_avail *avail = vq->avail;
> > +    struct vring_used *used   = vq->used;
> > +
> > +    unsigned int size = vq->size;
> > +
> > +    assert(vq->last_avail_index != avail->idx);
> 
> Why? How do you know there's anything there?

Right now the post_buffer() is unable to cope with full (or,
rather empty) RX queue, thus the assumption that RX always have
available descriptors. I'll add an explanation, and replace
assert() by {fprintf(stderr, ...); exit(1)}

> 
> > +    /* Prevent accessing descriptors, buffers, avail->ring and used before
> > +     * avail->idx */
> 
> smp_rmb then? Can be fixed later ...

'used' has been accessed by stores, consequently, at least in theory, if you
take conservative side, rmb() is not enough. IMO, the best here would be rmb()
+ compiler memory barrier to prevent compiler from hoisting stores beyond this
point.

> > +    smp_mb();
> > +
> > +    uint16_t a_index = vq->last_avail_index % size;
> > +    uint16_t u_index = vq->last_used_index % size;
> > +    uint16_t d_index = avail->ring[a_index];
> > +
> > +    int i = d_index;
> > +
> > +

> > +static int
> > +vhost_user_none_exec(VhostDev *dev,
> > +                  VhostUserMsg *vmsg)
> > +{
> > +    printf("Function %s() not implemented yet.\n", __func__);
> > +    return 0;
> > +}
> > +
> > +static int
> > +vhost_user_get_features_exec(VhostDev *dev,
> > +                          VhostUserMsg *vmsg)
> 
> Please prefix everything with vubr_ consistently.
> Same applies to types etc.

I tried to change everything to use vhost_user_ prefix, but some
functions have slipped my attention and still use vubr_ prefix.
I'll fix this. Thanks for noticing this.

-- Victor
Michael S. Tsirkin Oct. 27, 2015, 3:48 p.m. UTC | #3
On Tue, Oct 27, 2015 at 03:29:21PM +0200, Victor Kaplansky wrote:
> On Tue, Oct 27, 2015 at 01:40:03PM +0200, Michael S. Tsirkin wrote:
> > On Mon, Oct 26, 2015 at 07:19:23PM +0200, Victor Kaplansky wrote:
> > > +
> > > +#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1)
> > > +
> > > +typedef enum VhostUserRequest {
> > > +    VHOST_USER_NONE = 0,
> > > +    VHOST_USER_GET_FEATURES = 1,
> > > +    VHOST_USER_SET_FEATURES = 2,
> > > +    VHOST_USER_SET_OWNER = 3,
> > > +    VHOST_USER_RESET_DEVICE = 4,
> > > +    VHOST_USER_SET_MEM_TABLE = 5,
> > > +    VHOST_USER_SET_LOG_BASE = 6,
> > > +    VHOST_USER_SET_LOG_FD = 7,
> > > +    VHOST_USER_SET_VRING_NUM = 8,
> > > +    VHOST_USER_SET_VRING_ADDR = 9,
> > > +    VHOST_USER_SET_VRING_BASE = 10,
> > > +    VHOST_USER_GET_VRING_BASE = 11,
> > > +    VHOST_USER_SET_VRING_KICK = 12,
> > > +    VHOST_USER_SET_VRING_CALL = 13,
> > > +    VHOST_USER_SET_VRING_ERR = 14,
> > > +    VHOST_USER_GET_PROTOCOL_FEATURES = 15,
> > > +    VHOST_USER_SET_PROTOCOL_FEATURES = 16,
> > > +    VHOST_USER_GET_QUEUE_NUM = 17,
> > > +    VHOST_USER_SET_VRING_ENABLE = 18,
> > > +    VHOST_USER_SEND_RARP = 19,
> > > +    VHOST_USER_MAX
> > > +} VhostUserRequest;
> > 
> > 
> > Maybe we need a common copy under tests/
> 
> Definitely. Even better to have a common header with linux.

With QEMU? It's a matter of deciding whether it's a good idea.
If we keep a separate copy in tests, tests will break if we
change the ABI. Otherwise they will be rebuilt and pass
by mistake.

> Can be done as a separate patch?
> 
> > > +static void
> > > +vubr_post_buffer(VhostDev *dev,
> > > +                 VirtQueue *vq,
> > > +                 uint8_t *buf,
> > > +                 int32_t len)
> > > +{
> > > +    struct vring_desc *desc   = vq->desc;
> > > +    struct vring_avail *avail = vq->avail;
> > > +    struct vring_used *used   = vq->used;
> > > +
> > > +    unsigned int size = vq->size;
> > > +
> > > +    assert(vq->last_avail_index != avail->idx);
> > 
> > Why? How do you know there's anything there?
> 
> Right now the post_buffer() is unable to cope with full (or,
> rather empty) RX queue, thus the assumption that RX always have
> available descriptors. I'll add an explanation, and replace
> assert() by {fprintf(stderr, ...); exit(1)}


This doesn't matter - but what makes sure it's only invoked
if buffer is not empty?

> > 
> > > +    /* Prevent accessing descriptors, buffers, avail->ring and used before
> > > +     * avail->idx */
> > 
> > smp_rmb then? Can be fixed later ...
> 
> 'used' has been accessed by stores, consequently, at least in theory, if you
> take conservative side, rmb() is not enough.


I don't think there's a race there. vhost in kernel has an rmb here
so if there's a problem I'd like to know what it is.

But let's make it correct first.

> IMO, the best here would be rmb()
> + compiler memory barrier to prevent compiler from hoisting stores beyond this
> point.

AFAIK it's implicit in the fact we have asm with no inputs or outputs.
IOW all barriers include the compiler barrier.

> > > +    smp_mb();
> > > +
> > > +    uint16_t a_index = vq->last_avail_index % size;
> > > +    uint16_t u_index = vq->last_used_index % size;
> > > +    uint16_t d_index = avail->ring[a_index];
> > > +
> > > +    int i = d_index;
> > > +
> > > +
> 
> > > +static int
> > > +vhost_user_none_exec(VhostDev *dev,
> > > +                  VhostUserMsg *vmsg)
> > > +{
> > > +    printf("Function %s() not implemented yet.\n", __func__);
> > > +    return 0;
> > > +}
> > > +
> > > +static int
> > > +vhost_user_get_features_exec(VhostDev *dev,
> > > +                          VhostUserMsg *vmsg)
> > 
> > Please prefix everything with vubr_ consistently.
> > Same applies to types etc.
> 
> I tried to change everything to use vhost_user_ prefix, but some
> functions have slipped my attention and still use vubr_ prefix.
> I'll fix this. Thanks for noticing this.
> 
> -- Victor

Please don't use vhost_user_ prefix! That's used by vhost user code in
QEMU.  Either vhost_user_bridge_ or vubr_ or whatever.
diff mbox

Patch

diff --git a/tests/vhost-user-bridge.c b/tests/vhost-user-bridge.c
new file mode 100644
index 0000000..89f1e07
--- /dev/null
+++ b/tests/vhost-user-bridge.c
@@ -0,0 +1,1138 @@ 
+/*
+ * Vhost-User Bridge
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * Authors:
+ *  Victor Kaplansky <victork@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include <stddef.h>
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/unistd.h>
+#include <sys/mman.h>
+#include <sys/eventfd.h>
+#include <arpa/inet.h>
+
+#include <linux/vhost.h>
+
+#include "qemu/atomic.h"
+#include "standard-headers/linux/virtio_net.h"
+#include "standard-headers/linux/virtio_ring.h"
+
+#define DEBUG_VHOST_USER_BRIDGE
+
+typedef void (*CallbackFunc)(int sock, void *ctx);
+
+typedef struct Event {
+    void *ctx;
+    CallbackFunc callback;
+} Event;
+
+typedef struct Dispatcher {
+    int max_sock;
+    fd_set fdset;
+    Event events[FD_SETSIZE];
+} Dispatcher;
+
+static void
+vhost_user_die(const char *s)
+{
+    perror(s);
+    exit(1);
+}
+
+static int
+dispatcher_init(Dispatcher *dispr)
+{
+    FD_ZERO(&dispr->fdset);
+    dispr->max_sock = -1;
+    return 0;
+}
+
+static int
+dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb)
+{
+    if (sock >= FD_SETSIZE) {
+        fprintf(stderr,
+                "Error: Failed to add new event. sock %d should be less than %d\n",
+                sock, FD_SETSIZE);
+        return -1;
+    }
+
+    dispr->events[sock].ctx = ctx;
+    dispr->events[sock].callback = cb;
+
+    FD_SET(sock, &dispr->fdset);
+    if (sock > dispr->max_sock) {
+        dispr->max_sock = sock;
+    }
+    printf("Added sock %d for watching. max_sock: %d\n",
+           sock, dispr->max_sock);
+    return 0;
+}
+
+#if 0
+/* dispatcher_remove() is not currently in use but may be useful
+ * in the future. */
+static int
+dispatcher_remove(Dispatcher *dispr, int sock)
+{
+    if (sock >= FD_SETSIZE) {
+        fprintf(stderr,
+                "Error: Failed to remove event. sock %d should be less than %d\n",
+                sock, FD_SETSIZE);
+        return -1;
+    }
+
+    FD_CLR(sock, &dispr->fdset);
+    return 0;
+}
+#endif
+
+/* timeout in us */
+static int
+dispatcher_wait(Dispatcher *dispr, uint32_t timeout)
+{
+    struct timeval tv;
+    tv.tv_sec = timeout / 1000000;
+    tv.tv_usec = timeout % 1000000;
+
+    fd_set fdset = dispr->fdset;
+
+    /* wait until some of sockets become readable. */
+    int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv);
+
+    if (rc == -1) {
+        vhost_user_die("select");
+    }
+
+    /* Timeout */
+    if (rc == 0) {
+        return 0;
+    }
+
+    /* Now call callback for every ready socket. */
+
+    int sock;
+    for (sock = 0; sock < dispr->max_sock + 1; sock++)
+        if (FD_ISSET(sock, &fdset)) {
+            Event *e = &dispr->events[sock];
+            e->callback(sock, e->ctx);
+        }
+
+    return 0;
+}
+
+typedef struct VirtQueue {
+    int call_fd;
+    int kick_fd;
+    uint32_t size;
+    uint16_t last_avail_index;
+    uint16_t last_used_index;
+    struct vring_desc *desc;
+    struct vring_avail *avail;
+    struct vring_used *used;
+} VirtQueue;
+
+/* Based on qemu/hw/virtio/vhost-user.c */
+
+#define VHOST_MEMORY_MAX_NREGIONS    8
+#define VHOST_USER_F_PROTOCOL_FEATURES 30
+
+enum VhostUserProtocolFeature {
+    VHOST_USER_PROTOCOL_F_MQ = 0,
+    VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1,
+    VHOST_USER_PROTOCOL_F_RARP = 2,
+
+    VHOST_USER_PROTOCOL_F_MAX
+};
+
+#define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1)
+
+typedef enum VhostUserRequest {
+    VHOST_USER_NONE = 0,
+    VHOST_USER_GET_FEATURES = 1,
+    VHOST_USER_SET_FEATURES = 2,
+    VHOST_USER_SET_OWNER = 3,
+    VHOST_USER_RESET_DEVICE = 4,
+    VHOST_USER_SET_MEM_TABLE = 5,
+    VHOST_USER_SET_LOG_BASE = 6,
+    VHOST_USER_SET_LOG_FD = 7,
+    VHOST_USER_SET_VRING_NUM = 8,
+    VHOST_USER_SET_VRING_ADDR = 9,
+    VHOST_USER_SET_VRING_BASE = 10,
+    VHOST_USER_GET_VRING_BASE = 11,
+    VHOST_USER_SET_VRING_KICK = 12,
+    VHOST_USER_SET_VRING_CALL = 13,
+    VHOST_USER_SET_VRING_ERR = 14,
+    VHOST_USER_GET_PROTOCOL_FEATURES = 15,
+    VHOST_USER_SET_PROTOCOL_FEATURES = 16,
+    VHOST_USER_GET_QUEUE_NUM = 17,
+    VHOST_USER_SET_VRING_ENABLE = 18,
+    VHOST_USER_SEND_RARP = 19,
+    VHOST_USER_MAX
+} VhostUserRequest;
+
+typedef struct VhostUserMemoryRegion {
+    uint64_t guest_phys_addr;
+    uint64_t memory_size;
+    uint64_t userspace_addr;
+    uint64_t mmap_offset;
+} VhostUserMemoryRegion;
+
+typedef struct VhostUserMemory {
+    uint32_t nregions;
+    uint32_t padding;
+    VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS];
+} VhostUserMemory;
+
+typedef struct VhostUserMsg {
+    VhostUserRequest request;
+
+#define VHOST_USER_VERSION_MASK     (0x3)
+#define VHOST_USER_REPLY_MASK       (0x1<<2)
+    uint32_t flags;
+    uint32_t size; /* the following payload size */
+    union {
+#define VHOST_USER_VRING_IDX_MASK   (0xff)
+#define VHOST_USER_VRING_NOFD_MASK  (0x1<<8)
+        uint64_t u64;
+        struct vhost_vring_state state;
+        struct vhost_vring_addr addr;
+        VhostUserMemory memory;
+    } payload;
+    int fds[VHOST_MEMORY_MAX_NREGIONS];
+    int fd_num;
+} QEMU_PACKED VhostUserMsg;
+
+#define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64)
+
+/* The version of the protocol we support */
+#define VHOST_USER_VERSION    (0x1)
+
+#define MAX_NR_VIRTQUEUE (8)
+
+typedef struct VhostDevRegion {
+    /* Guest Phhysical address. */
+    uint64_t gpa;
+    /* Memory region size. */
+    uint64_t size;
+    /* QEMU virtual address (userspace). */
+    uint64_t qva;
+    /* Starting offset in our mmaped space. */
+    uint64_t mmap_offset;
+    /* Start addrtess of mmaped space. */
+    uint64_t mmap_addr;
+} VhostDevRegion;
+
+typedef struct VhostDev {
+    int sock;
+    Dispatcher dispatcher;
+    uint32_t nregions;
+    VhostDevRegion regions[VHOST_MEMORY_MAX_NREGIONS];
+    VirtQueue virtqueue[MAX_NR_VIRTQUEUE];
+    int backend_udp_sock;
+    struct sockaddr_in backend_udp_dest;
+} VhostDev;
+
+static const char *vhost_user_request_str[] = {
+    [VHOST_USER_NONE]                   =  "VHOST_USER_NONE",
+    [VHOST_USER_GET_FEATURES]           =  "VHOST_USER_GET_FEATURES",
+    [VHOST_USER_SET_FEATURES]           =  "VHOST_USER_SET_FEATURES",
+    [VHOST_USER_SET_OWNER]              =  "VHOST_USER_SET_OWNER",
+    [VHOST_USER_RESET_DEVICE]           =  "VHOST_USER_RESET_DEVICE",
+    [VHOST_USER_SET_MEM_TABLE]          =  "VHOST_USER_SET_MEM_TABLE",
+    [VHOST_USER_SET_LOG_BASE]           =  "VHOST_USER_SET_LOG_BASE",
+    [VHOST_USER_SET_LOG_FD]             =  "VHOST_USER_SET_LOG_FD",
+    [VHOST_USER_SET_VRING_NUM]          =  "VHOST_USER_SET_VRING_NUM",
+    [VHOST_USER_SET_VRING_ADDR]         =  "VHOST_USER_SET_VRING_ADDR",
+    [VHOST_USER_SET_VRING_BASE]         =  "VHOST_USER_SET_VRING_BASE",
+    [VHOST_USER_GET_VRING_BASE]         =  "VHOST_USER_GET_VRING_BASE",
+    [VHOST_USER_SET_VRING_KICK]         =  "VHOST_USER_SET_VRING_KICK",
+    [VHOST_USER_SET_VRING_CALL]         =  "VHOST_USER_SET_VRING_CALL",
+    [VHOST_USER_SET_VRING_ERR]          =  "VHOST_USER_SET_VRING_ERR",
+    [VHOST_USER_GET_PROTOCOL_FEATURES]  =  "VHOST_USER_GET_PROTOCOL_FEATURES",
+    [VHOST_USER_SET_PROTOCOL_FEATURES]  =  "VHOST_USER_SET_PROTOCOL_FEATURES",
+    [VHOST_USER_GET_QUEUE_NUM]          =  "VHOST_USER_GET_QUEUE_NUM",
+    [VHOST_USER_SET_VRING_ENABLE]       =  "VHOST_USER_SET_VRING_ENABLE",
+    [VHOST_USER_SEND_RARP]              =  "VHOST_USER_SEND_RARP",
+    [VHOST_USER_MAX]                    =  "VHOST_USER_MAX",
+};
+
+static void
+print_buffer(uint8_t *buf, size_t len)
+{
+    int i;
+    printf("Raw buffer:\n");
+    for (i = 0; i < len; i++) {
+        if (i % 16 == 0) {
+            printf("\n");
+        }
+        if (i % 4 == 0) {
+            printf("   ");
+        }
+        printf("%02x ", buf[i]);
+    }
+    printf("\n............................................................\n");
+}
+
+/* Translate guest physical address to our virtual address.  */
+static uint64_t
+gpa_to_va(VhostDev *dev, uint64_t guest_addr)
+{
+    int i;
+
+    /* Find matching memory region.  */
+    for (i = 0; i < dev->nregions; i++) {
+        VhostDevRegion *r = &dev->regions[i];
+
+        if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) {
+            return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset;
+        }
+    }
+
+    assert(!"address not found in regions");
+    return 0;
+}
+
+/* Translate qemu virtual address to our virtual address.  */
+static uint64_t
+qva_to_va(VhostDev *dev, uint64_t qemu_addr)
+{
+    int i;
+
+    /* Find matching memory region.  */
+    for (i = 0; i < dev->nregions; i++) {
+        VhostDevRegion *r = &dev->regions[i];
+
+        if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) {
+            return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset;
+        }
+    }
+
+    assert(!"address not found in regions");
+    return 0;
+}
+
+static void
+vhost_user_message_read(int conn_fd, VhostUserMsg *vmsg)
+{
+    char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { };
+    struct iovec iov = {
+        .iov_base = (char *)vmsg,
+        .iov_len = VHOST_USER_HDR_SIZE,
+    };
+    struct msghdr msg = {
+        .msg_iov = &iov,
+        .msg_iovlen = 1,
+        .msg_control = control,
+        .msg_controllen = sizeof(control),
+    };
+    size_t fd_size;
+    struct cmsghdr *cmsg;
+    int rc;
+
+    rc = recvmsg(conn_fd, &msg, 0);
+
+    if (rc <= 0) {
+        vhost_user_die("recvmsg");
+    }
+
+    vmsg->fd_num = 0;
+    for (cmsg = CMSG_FIRSTHDR(&msg);
+         cmsg != NULL;
+         cmsg = CMSG_NXTHDR(&msg, cmsg))
+    {
+        if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+            fd_size = cmsg->cmsg_len - CMSG_LEN(0);
+            vmsg->fd_num = fd_size / sizeof(int);
+            memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size);
+            break;
+        }
+    }
+
+    if (vmsg->size > sizeof(vmsg->payload)) {
+        fprintf(stderr,
+                "Error: too big message request: %d, size: vmsg->size: %u, "
+                "while sizeof(vmsg->payload) = %lu\n",
+                vmsg->request, vmsg->size, sizeof(vmsg->payload));
+        exit(1);
+    }
+
+    if (vmsg->size) {
+        rc = read(conn_fd, &vmsg->payload, vmsg->size);
+        if (rc <= 0) {
+            vhost_user_die("recvmsg");
+        }
+
+        assert(rc == vmsg->size);
+    }
+}
+
+static void
+vhost_user_message_write(int conn_fd, VhostUserMsg *vmsg)
+{
+    int rc;
+    do {
+        rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size);
+    } while (rc < 0 && errno == EINTR);
+
+    if (rc < 0) {
+        vhost_user_die("write");
+    }
+}
+
+static void
+vhost_user_backend_udp_sendbuf(VhostDev *dev,
+                                uint8_t *buf,
+                                size_t len)
+{
+    int slen = sizeof(struct sockaddr_in);
+
+    if (sendto(dev->backend_udp_sock, buf, len, 0,
+               (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) {
+        vhost_user_die("sendto()");
+    }
+}
+
+static int
+vhost_user_backend_udp_recvbuf(VhostDev *dev,
+                                uint8_t *buf,
+                                size_t buflen)
+{
+    int slen = sizeof(struct sockaddr_in);
+    int rc;
+
+    rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0,
+                  (struct sockaddr *) &dev->backend_udp_dest,
+                  (socklen_t *)&slen);
+    if (rc == -1) {
+        vhost_user_die("recvfrom()");
+    }
+
+    return rc;
+}
+
+static void
+vubr_consume_raw_packet(VhostDev *dev, uint8_t *buf, uint32_t len)
+{
+    int hdrlen = sizeof(struct virtio_net_hdr_v1);
+
+#ifdef DEBUG_VHOST_USER_BRIDGE
+    print_buffer(buf, len);
+#endif
+    vhost_user_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen);
+}
+
+/* Kick the guest if necessary. */
+static void
+virtqueue_kick(VirtQueue *vq)
+{
+    if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
+        printf("Kicking the guest...\n");
+        eventfd_write(vq->call_fd, 1);
+    }
+}
+
+static void
+vubr_post_buffer(VhostDev *dev,
+                 VirtQueue *vq,
+                 uint8_t *buf,
+                 int32_t len)
+{
+    struct vring_desc *desc   = vq->desc;
+    struct vring_avail *avail = vq->avail;
+    struct vring_used *used   = vq->used;
+
+    unsigned int size = vq->size;
+
+    assert(vq->last_avail_index != avail->idx);
+    /* Prevent accessing descriptors, buffers, avail->ring and used before
+     * avail->idx */
+    smp_mb();
+
+    uint16_t a_index = vq->last_avail_index % size;
+    uint16_t u_index = vq->last_used_index % size;
+    uint16_t d_index = avail->ring[a_index];
+
+    int i = d_index;
+
+
+#ifdef DEBUG_VHOST_USER_BRIDGE
+    printf("Post packet to guest on vq:\n");
+    printf("    size             = %d\n", vq->size);
+    printf("    last_avail_index = %d\n", vq->last_avail_index);
+    printf("    last_used_index  = %d\n", vq->last_used_index);
+    printf("    a_index = %d\n", a_index);
+    printf("    u_index = %d\n", u_index);
+    printf("    d_index = %d\n", d_index);
+    printf("    desc[%d].addr    = 0x%016"PRIx64"\n", i, desc[i].addr);
+    printf("    desc[%d].len     = %d\n", i, desc[i].len);
+    printf("    desc[%d].flags   = %d\n", i, desc[i].flags);
+    printf("    avail->idx = %d\n", avail->idx);
+    printf("    used->idx  = %d\n", used->idx);
+#endif
+
+    if (!(desc[i].flags & VRING_DESC_F_WRITE)) {
+        /* FIXME: we should find writable descriptor. */
+        fprintf(stderr, "Error: descriptor is not writable. Exiting.\n");
+        exit(1);
+    }
+
+    void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
+    uint32_t chunk_len = desc[i].len;
+
+    if (len <= chunk_len) {
+        memcpy(chunk_start, buf, len);
+    } else {
+        fprintf(stderr,
+                "Received too long packet from the backend. Dropping...\n");
+        return;
+    }
+
+    /* Add descriptor to the used ring. */
+    used->ring[u_index].id = d_index;
+    used->ring[u_index].len = len;
+
+    vq->last_avail_index++;
+    vq->last_used_index++;
+
+    /* Prevent accessing avail, descriptors, buffers and used->ring after
+     * the store to used->idx */
+    smp_mb();
+    used->idx = vq->last_used_index;
+
+    /* Kick the guest if necessary. */
+    virtqueue_kick(vq);
+}
+
+static int
+vubr_process_desc(VhostDev *dev, VirtQueue *vq)
+{
+    struct vring_desc *desc   = vq->desc;
+    struct vring_avail *avail = vq->avail;
+    struct vring_used *used   = vq->used;
+
+    unsigned int size = vq->size;
+
+    uint16_t a_index = vq->last_avail_index % size;
+    uint16_t u_index = vq->last_used_index % size;
+    uint16_t d_index = avail->ring[a_index];
+
+    uint32_t i, len = 0;
+    size_t buf_size = 4096;
+    uint8_t buf[4096];
+
+#ifdef DEBUG_VHOST_USER_BRIDGE
+    printf("Chunks: ");
+#endif
+
+    i = d_index;
+    do {
+        void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
+        uint32_t chunk_len = desc[i].len;
+
+        if (len + chunk_len < buf_size) {
+            memcpy(buf + len, chunk_start, chunk_len);
+#ifdef DEBUG_VHOST_USER_BRIDGE
+            printf("%d ", chunk_len);
+#endif
+        } else {
+            fprintf(stderr, "Error: too long packet. Dropping...\n");
+            break;
+        }
+
+        len += chunk_len;
+
+        if (!(desc[i].flags & VRING_DESC_F_NEXT)) {
+            break;
+        }
+
+        i = desc[i].next;
+    } while (1);
+
+    if (!len) {
+        return -1;
+    }
+
+    /* Add descriptor to the used ring. */
+    used->ring[u_index].id = d_index;
+    used->ring[u_index].len = len;
+
+#ifdef DEBUG_VHOST_USER_BRIDGE
+    printf("\n");
+#endif
+
+    vubr_consume_raw_packet(dev, buf, len);
+
+    return 0;
+}
+
+static void
+vubr_process_avail(VhostDev *dev, VirtQueue *vq)
+{
+    struct vring_avail *avail = vq->avail;
+    struct vring_used *used = vq->used;
+
+    while (vq->last_avail_index != avail->idx) {
+        /* Prevent accessing avail->ring, descriptors and buffers before
+         * avail->idx */
+        smp_mb();
+
+        vubr_process_desc(dev, vq);
+        vq->last_avail_index++;
+        vq->last_used_index++;
+    }
+
+    /* Prevent accessing avail->ring, descriptors, buffers and used->ring,
+     * after user->idx */
+    smp_mb();
+
+    used->idx = vq->last_used_index;
+}
+
+static void
+vubr_backend_recv_cb(int sock, void *ctx)
+{
+    VhostDev *dev = (VhostDev *) ctx;
+    VirtQueue *rx_vq = &dev->virtqueue[0];
+    uint8_t buf[4096];
+    struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf;
+    int hdrlen = sizeof(struct virtio_net_hdr_v1);
+    int buflen = sizeof(buf);
+    int len;
+
+#ifdef DEBUG_VHOST_USER_BRIDGE
+    printf("\n\n   ***   IN UDP RECEIVE CALLBACK    ***\n\n");
+#endif
+
+    *hdr = (struct virtio_net_hdr_v1) { };
+    hdr->num_buffers = 1;
+
+    len = vhost_user_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen);
+    vubr_post_buffer(dev, rx_vq, buf, len + hdrlen);
+}
+
+static void
+vubr_kick_cb(int sock, void *ctx)
+{
+    VhostDev *dev = (VhostDev *) ctx;
+    eventfd_t kick_data;
+    ssize_t rc;
+
+    rc = eventfd_read(sock, &kick_data);
+
+    if (rc == -1) {
+        vhost_user_die("eventfd_read()");
+    } else {
+        printf("Got kick_data: %016"PRIx64"\n", kick_data);
+        vubr_process_avail(dev, &dev->virtqueue[1]);
+    }
+}
+
+static int
+vhost_user_none_exec(VhostDev *dev,
+                  VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_get_features_exec(VhostDev *dev,
+                          VhostUserMsg *vmsg)
+{
+    vmsg->payload.u64 =
+            ((1ULL << VIRTIO_NET_F_MRG_RXBUF) |
+             (1ULL << VIRTIO_NET_F_CTRL_VQ) |
+             (1ULL << VIRTIO_NET_F_CTRL_RX) |
+             (1ULL << VHOST_F_LOG_ALL));
+    vmsg->size = sizeof(vmsg->payload.u64);
+
+    printf("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+
+    /* reply */
+    return 1;
+}
+
+static int
+vhost_user_set_features_exec(VhostDev *dev,
+                          VhostUserMsg *vmsg)
+{
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+    return 0;
+}
+
+static int
+vhost_user_set_owner_exec(VhostDev *dev,
+                       VhostUserMsg *vmsg)
+{
+    return 0;
+}
+
+static int
+vhost_user_reset_device_exec(VhostDev *dev,
+                          VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_set_mem_table_exec(VhostDev *dev,
+                           VhostUserMsg *vmsg)
+{
+    printf("Nregions: %d\n", vmsg->payload.memory.nregions);
+
+    VhostUserMemory *memory = &vmsg->payload.memory;
+    dev->nregions = memory->nregions;
+    int i;
+    for (i = 0; i < dev->nregions; i++) {
+        VhostUserMemoryRegion *msg_region = &memory->regions[i];
+        VhostDevRegion *dev_region = &dev->regions[i];
+
+        printf("Region %d\n", i);
+        printf("    guest_phys_addr: 0x%016"PRIx64"\n",
+               msg_region->guest_phys_addr);
+        printf("    memory_size:     0x%016"PRIx64"\n",
+               msg_region->memory_size);
+        printf("    userspace_addr   0x%016"PRIx64"\n",
+               msg_region->userspace_addr);
+        printf("    mmap_offset      0x%016"PRIx64"\n",
+               msg_region->mmap_offset);
+
+        dev_region->gpa         = msg_region->guest_phys_addr;
+        dev_region->size        = msg_region->memory_size;
+        dev_region->qva         = msg_region->userspace_addr;
+        dev_region->mmap_offset = msg_region->mmap_offset;
+
+        void *mmap_addr;
+
+        /* We don't use offset argument of mmap() since the
+         * mapped address has to be page aligned, and we use huge
+         * pages.  */
+        mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset,
+                         PROT_READ | PROT_WRITE, MAP_SHARED,
+                         vmsg->fds[i], 0);
+
+        if (mmap_addr == MAP_FAILED) {
+            vhost_user_die("mmap");
+        }
+
+        dev_region->mmap_addr = (uint64_t) mmap_addr;
+        printf("    mmap_addr:       0x%016"PRIx64"\n", dev_region->mmap_addr);
+    }
+
+    return 0;
+}
+
+static int
+vhost_user_set_log_base_exec(VhostDev *dev,
+                          VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_set_log_fd_exec(VhostDev *dev,
+                        VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_set_vring_num_exec(VhostDev *dev,
+                           VhostUserMsg *vmsg)
+{
+    unsigned int index = vmsg->payload.state.index;
+    unsigned int num = vmsg->payload.state.num;
+
+    printf("State.index: %d\n", index);
+    printf("State.num:   %d\n", num);
+    dev->virtqueue[index].size = num;
+    return 0;
+}
+
+static int
+vhost_user_set_vring_addr_exec(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    struct vhost_vring_addr *vra = &vmsg->payload.addr;
+    printf("vhost_vring_addr:\n");
+    printf("    index:  %d\n", vra->index);
+    printf("    flags:  %d\n", vra->flags);
+    printf("    desc_user_addr:   0x%016llx\n", vra->desc_user_addr);
+    printf("    used_user_addr:   0x%016llx\n", vra->used_user_addr);
+    printf("    avail_user_addr:  0x%016llx\n", vra->avail_user_addr);
+    printf("    log_guest_addr:   0x%016llx\n", vra->log_guest_addr);
+
+    unsigned int index = vra->index;
+    VirtQueue *vq = &dev->virtqueue[index];
+
+    vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr);
+    vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr);
+    vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr);
+
+    printf("Setting virtq addresses:\n");
+    printf("    vring_desc  at %p\n", vq->desc);
+    printf("    vring_used  at %p\n", vq->used);
+    printf("    vring_avail at %p\n", vq->avail);
+
+    vq->last_used_index = vq->used->idx;
+    return 0;
+}
+
+static int
+vhost_user_set_vring_base_exec(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    unsigned int index = vmsg->payload.state.index;
+    unsigned int num = vmsg->payload.state.num;
+
+    printf("State.index: %d\n", index);
+    printf("State.num:   %d\n", num);
+    dev->virtqueue[index].last_avail_index = num;
+
+    return 0;
+}
+
+static int
+vhost_user_get_vring_base_exec(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_set_vring_kick_exec(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    uint64_t u64_arg = vmsg->payload.u64;
+    int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
+
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+
+    assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
+    assert(vmsg->fd_num == 1);
+
+    dev->virtqueue[index].kick_fd = vmsg->fds[0];
+    printf("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index);
+
+    if (index % 2 == 1) {
+        /* TX queue. */
+        dispatcher_add(&dev->dispatcher, dev->virtqueue[index].kick_fd,
+                       dev, vubr_kick_cb);
+
+        printf("Waiting for kicks on fd: %d for vq: %d\n",
+               dev->virtqueue[index].kick_fd, index);
+    }
+    return 0;
+}
+
+static int
+vhost_user_set_vring_call_exec(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    uint64_t u64_arg = vmsg->payload.u64;
+    int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
+
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+    assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
+    assert(vmsg->fd_num == 1);
+
+    dev->virtqueue[index].call_fd = vmsg->fds[0];
+    printf("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index);
+
+    return 0;
+}
+
+static int
+vhost_user_set_vring_err_exec(VhostDev *dev,
+                           VhostUserMsg *vmsg)
+{
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+    return 0;
+}
+
+static int
+vhost_user_get_protocol_features_exec(VhostDev *dev,
+                                   VhostUserMsg *vmsg)
+{
+    /* FIXME: unimplented */
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+    return 0;
+}
+
+static int
+vhost_user_set_protocol_features_exec(VhostDev *dev,
+                                   VhostUserMsg *vmsg)
+{
+    /* FIXME: unimplented */
+    printf("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
+    return 0;
+}
+
+static int
+vhost_user_get_queue_num_exec(VhostDev *dev,
+                           VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_set_vring_enable_exec(VhostDev *dev,
+                              VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_send_rarp_exec(VhostDev *dev,
+                              VhostUserMsg *vmsg)
+{
+    printf("Function %s() not implemented yet.\n", __func__);
+    return 0;
+}
+
+static int
+vhost_user_execute_request(VhostDev *dev,
+                            VhostUserMsg *vmsg)
+{
+    /* Print out generic part of the request. */
+    printf(
+           "==================   Vhost user message from QEMU   ==================\n");
+    printf("Request: %s (%d)\n", vhost_user_request_str[vmsg->request],
+           vmsg->request);
+    printf("Flags:   0x%x\n", vmsg->flags);
+    printf("Size:    %d\n", vmsg->size);
+
+    if (vmsg->fd_num) {
+        int i;
+        printf("Fds:");
+        for (i = 0; i < vmsg->fd_num; i++) {
+            printf(" %d", vmsg->fds[i]);
+        }
+        printf("\n");
+    }
+
+    switch (vmsg->request) {
+    case VHOST_USER_NONE:
+        return vhost_user_none_exec(dev, vmsg);
+    case VHOST_USER_GET_FEATURES:
+        return vhost_user_get_features_exec(dev, vmsg);
+    case VHOST_USER_SET_FEATURES:
+        return vhost_user_set_features_exec(dev, vmsg);
+    case VHOST_USER_SET_OWNER:
+        return vhost_user_set_owner_exec(dev, vmsg);
+    case VHOST_USER_RESET_DEVICE:
+        return vhost_user_reset_device_exec(dev, vmsg);
+    case VHOST_USER_SET_MEM_TABLE:
+        return vhost_user_set_mem_table_exec(dev, vmsg);
+    case VHOST_USER_SET_LOG_BASE:
+        return vhost_user_set_log_base_exec(dev, vmsg);
+    case VHOST_USER_SET_LOG_FD:
+        return vhost_user_set_log_fd_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_NUM:
+        return vhost_user_set_vring_num_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_ADDR:
+        return vhost_user_set_vring_addr_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_BASE:
+        return vhost_user_set_vring_base_exec(dev, vmsg);
+    case VHOST_USER_GET_VRING_BASE:
+        return vhost_user_get_vring_base_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_KICK:
+        return vhost_user_set_vring_kick_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_CALL:
+        return vhost_user_set_vring_call_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_ERR:
+        return vhost_user_set_vring_err_exec(dev, vmsg);
+    case VHOST_USER_GET_PROTOCOL_FEATURES:
+        return vhost_user_get_protocol_features_exec(dev, vmsg);
+    case VHOST_USER_SET_PROTOCOL_FEATURES:
+        return vhost_user_set_protocol_features_exec(dev, vmsg);
+    case VHOST_USER_GET_QUEUE_NUM:
+        return vhost_user_get_queue_num_exec(dev, vmsg);
+    case VHOST_USER_SET_VRING_ENABLE:
+        return vhost_user_set_vring_enable_exec(dev, vmsg);
+    case VHOST_USER_SEND_RARP:
+        return vhost_user_send_rarp_exec(dev, vmsg);
+
+
+    case VHOST_USER_MAX:
+        assert(vmsg->request != VHOST_USER_MAX);
+    }
+    return 0;
+}
+
+static void
+vhost_user_receive_cb(int sock, void *ctx)
+{
+    VhostDev *dev = (VhostDev *) ctx;
+    VhostUserMsg vmsg;
+
+    vhost_user_message_read(sock, &vmsg);
+
+    int reply_requested = vhost_user_execute_request(dev, &vmsg);
+
+    if (reply_requested) {
+        /* Set the version in the flags when sending the reply */
+        vmsg.flags &= ~VHOST_USER_VERSION_MASK;
+        vmsg.flags |= VHOST_USER_VERSION;
+        vmsg.flags |= VHOST_USER_REPLY_MASK;
+        vhost_user_message_write(sock, &vmsg);
+    }
+}
+
+static void
+vhost_user_accept_cb(int sock, void *ctx)
+{
+    VhostDev *dev = (VhostDev *)ctx;
+    int conn_fd;
+    struct sockaddr_un un;
+    socklen_t len = sizeof(un);
+
+    conn_fd = accept(sock, (struct sockaddr *) &un, &len);
+    if (conn_fd  == -1) {
+        vhost_user_die("accept()");
+    }
+    printf("Got connection from remote peer on sock %d\n", conn_fd);
+    dispatcher_add(&dev->dispatcher, conn_fd, ctx, vhost_user_receive_cb);
+}
+
+static VhostDev *
+vhost_user_new(const char *path)
+{
+    VhostDev *dev =
+        (VhostDev *) calloc(1, sizeof(VhostDev));
+
+    dev->nregions = 0;
+
+    int i;
+    for (i = 0; i < MAX_NR_VIRTQUEUE; i++) {
+        dev->virtqueue[i] = (VirtQueue) {
+            .call_fd = -1, .kick_fd = -1,
+            .size = 0,
+            .last_avail_index = 0, .last_used_index = 0,
+            .desc = 0, .avail = 0, .used = 0,
+        };
+    }
+
+    /* Get a UNIX socket. */
+    dev->sock = socket(AF_UNIX, SOCK_STREAM, 0);
+    if (dev->sock == -1) {
+        vhost_user_die("socket");
+    }
+
+    struct sockaddr_un un;
+    un.sun_family = AF_UNIX;
+    strcpy(un.sun_path, path);
+
+    size_t len = sizeof(un.sun_family) + strlen(path);
+
+    unlink(path);
+
+    if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) {
+        vhost_user_die("bind");
+    }
+
+    if (listen(dev->sock, 1) == -1) {
+        vhost_user_die("listen");
+    }
+
+    dispatcher_init(&dev->dispatcher);
+    dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev,
+                   vhost_user_accept_cb);
+
+    printf("Waiting for connections on UNIX socket %s ...\n", path);
+    return dev;
+}
+
+static void
+vhost_user_backend_udp_setup(VhostDev *dev,
+                             const char *local_host,
+                             uint16_t local_port,
+                             const char *dest_host,
+                             uint16_t dest_port)
+{
+
+    struct sockaddr_in si_local;
+    int sock;
+
+    sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+    if (sock == -1) {
+        vhost_user_die("socket");
+    }
+
+    memset((char *) &si_local, 0, sizeof(struct sockaddr_in));
+    si_local.sin_family = AF_INET;
+    si_local.sin_port = htons(local_port);
+    if (inet_aton(local_host, &si_local.sin_addr) == 0) {
+        fprintf(stderr, "inet_aton() failed.\n");
+        exit(1);
+    }
+
+    if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) {
+        vhost_user_die("bind");
+    }
+
+    /* setup destination for sends */
+    struct sockaddr_in *si_remote = &dev->backend_udp_dest;
+    memset((char *) si_remote, 0, sizeof(struct sockaddr_in));
+    si_remote->sin_family = AF_INET;
+    si_remote->sin_port = htons(dest_port);
+    if (inet_aton(dest_host, &si_remote->sin_addr) == 0) {
+        fprintf(stderr, "inet_aton() failed.\n");
+        exit(1);
+    }
+
+    dev->backend_udp_sock = sock;
+    dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb);
+    printf("Waiting for data from udp backend on %s:%d...\n",
+           local_host, local_port);
+}
+
+static void
+vhost_user_run(VhostDev *dev)
+{
+    while (1) {
+        /* timeout 200ms */
+        dispatcher_wait(&dev->dispatcher, 200000);
+        /* Here one can try polling strategy. */
+    }
+}
+
+int
+main(int argc, char *argv[])
+{
+    VhostDev *dev;
+
+    dev = vhost_user_new("/tmp/vubr.sock");
+    if (!dev) {
+        return 1;
+    }
+
+    vhost_user_backend_udp_setup(dev,
+                                 "127.0.0.1", 4444,
+                                 "127.0.0.1", 5555);
+    vhost_user_run(dev);
+    return 0;
+}
diff --git a/tests/Makefile b/tests/Makefile
index 9341498..0811c68 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -522,6 +522,7 @@  tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_hel
 tests/test-qemu-opts$(EXESUF): tests/test-qemu-opts.o $(test-util-obj-y)
 tests/test-write-threshold$(EXESUF): tests/test-write-threshold.o $(test-block-obj-y)
 tests/test-netfilter$(EXESUF): tests/test-netfilter.o $(qtest-obj-y)
+tests/vhost-user-bridge$(EXESUF): tests/vhost-user-bridge.o
 
 ifeq ($(CONFIG_POSIX),y)
 LIBS += -lutil