diff mbox series

[v9,2/5] generic vhost user server

Message ID 20200614183907.514282-3-coiby.xu@gmail.com
State New
Headers show
Series vhost-user block device backend implementation | expand

Commit Message

Coiby Xu June 14, 2020, 6:39 p.m. UTC
Sharing QEMU devices via vhost-user protocol.

Only one vhost-user client can connect to the server one time.

Signed-off-by: Coiby Xu <coiby.xu@gmail.com>
---
 util/Makefile.objs       |   1 +
 util/vhost-user-server.c | 400 +++++++++++++++++++++++++++++++++++++++
 util/vhost-user-server.h |  61 ++++++
 3 files changed, 462 insertions(+)
 create mode 100644 util/vhost-user-server.c
 create mode 100644 util/vhost-user-server.h

Comments

Kevin Wolf June 18, 2020, 1:29 p.m. UTC | #1
Am 14.06.2020 um 20:39 hat Coiby Xu geschrieben:
> Sharing QEMU devices via vhost-user protocol.
> 
> Only one vhost-user client can connect to the server one time.
> 
> Signed-off-by: Coiby Xu <coiby.xu@gmail.com>
> ---
>  util/Makefile.objs       |   1 +
>  util/vhost-user-server.c | 400 +++++++++++++++++++++++++++++++++++++++
>  util/vhost-user-server.h |  61 ++++++
>  3 files changed, 462 insertions(+)
>  create mode 100644 util/vhost-user-server.c
>  create mode 100644 util/vhost-user-server.h
> 
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index cc5e37177a..b4d4af06dc 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -66,6 +66,7 @@ util-obj-y += hbitmap.o
>  util-obj-y += main-loop.o
>  util-obj-y += nvdimm-utils.o
>  util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
> +util-obj-$(CONFIG_LINUX) += vhost-user-server.o
>  util-obj-y += qemu-coroutine-sleep.o
>  util-obj-y += qemu-co-shared-resource.o
>  util-obj-y += qemu-sockets.o
> diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c
> new file mode 100644
> index 0000000000..393beeb6b9
> --- /dev/null
> +++ b/util/vhost-user-server.c
> @@ -0,0 +1,400 @@
> +/*
> + * Sharing QEMU devices via vhost-user protocol
> + *
> + * Author: Coiby Xu <coiby.xu@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +#include "qemu/osdep.h"
> +#include <sys/eventfd.h>
> +#include "qemu/main-loop.h"
> +#include "vhost-user-server.h"
> +
> +static void vmsg_close_fds(VhostUserMsg *vmsg)
> +{
> +    int i;
> +    for (i = 0; i < vmsg->fd_num; i++) {
> +        close(vmsg->fds[i]);
> +    }
> +}
> +
> +static void vmsg_unblock_fds(VhostUserMsg *vmsg)
> +{
> +    int i;
> +    for (i = 0; i < vmsg->fd_num; i++) {
> +        qemu_set_nonblock(vmsg->fds[i]);
> +    }
> +}
> +
> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
> +                      gpointer opaque);
> +
> +static void close_client(VuServer *server)
> +{
> +    vu_deinit(&server->vu_dev);
> +    object_unref(OBJECT(server->sioc));
> +    object_unref(OBJECT(server->ioc));
> +    server->sioc_slave = NULL;

Where is sioc_slave closed/freed?

> +    object_unref(OBJECT(server->ioc_slave));
> +    /*
> +     * Set the callback function for network listener so another
> +     * vhost-user client can connect to this server
> +     */
> +    qio_net_listener_set_client_func(server->listener,
> +                                     vu_accept,
> +                                     server,
> +                                     NULL);

If connecting another client to the server should work, don't we have to
set at least server->sioc = NULL so that vu_accept() won't error out?

> +}
> +
> +static void panic_cb(VuDev *vu_dev, const char *buf)
> +{
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +
> +    if (buf) {
> +        error_report("vu_panic: %s", buf);
> +    }
> +
> +    if (server->sioc) {
> +        close_client(server);
> +        server->sioc = NULL;
> +    }
> +
> +    if (server->device_panic_notifier) {
> +        server->device_panic_notifier(server);
> +    }
> +}
> +
> +static QIOChannel *slave_io_channel(VuServer *server, int fd,
> +                                    Error **local_err)
> +{
> +    if (server->sioc_slave) {
> +        if (fd == server->sioc_slave->fd) {
> +            return server->ioc_slave;
> +        }
> +    } else {
> +        server->sioc_slave = qio_channel_socket_new_fd(fd, local_err);
> +        if (!*local_err) {
> +            server->ioc_slave = QIO_CHANNEL(server->sioc_slave);
> +            return server->ioc_slave;
> +        }
> +    }
> +
> +    return NULL;
> +}
> +
> +static bool coroutine_fn
> +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
> +{
> +    struct iovec iov = {
> +        .iov_base = (char *)vmsg,
> +        .iov_len = VHOST_USER_HDR_SIZE,
> +    };
> +    int rc, read_bytes = 0;
> +    Error *local_err = NULL;
> +    /*
> +     * Store fds/nfds returned from qio_channel_readv_full into
> +     * temporary variables.
> +     *
> +     * VhostUserMsg is a packed structure, gcc will complain about passing
> +     * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
> +     * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
> +     * thus two temporary variables nfds and fds are used here.
> +     */
> +    size_t nfds = 0, nfds_t = 0;
> +    int *fds_t = NULL;
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +    QIOChannel *ioc = NULL;
> +
> +    if (conn_fd == server->sioc->fd) {
> +        ioc = server->ioc;
> +    } else {
> +        /* Slave communication will also use this function to read msg */
> +        ioc = slave_io_channel(server, conn_fd, &local_err);
> +    }
> +
> +    if (!ioc) {
> +        error_report_err(local_err);
> +        goto fail;
> +    }
> +
> +    assert(qemu_in_coroutine());
> +    do {
> +        /*
> +         * qio_channel_readv_full may have short reads, keeping calling it
> +         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
> +         */
> +        rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &local_err);
> +        if (rc < 0) {
> +            if (rc == QIO_CHANNEL_ERR_BLOCK) {
> +                qio_channel_yield(ioc, G_IO_IN);
> +                continue;
> +            } else {
> +                error_report_err(local_err);
> +                return false;
> +            }
> +        }
> +        read_bytes += rc;
> +        if (nfds_t > 0) {
> +            if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
> +                error_report("A maximum of %d fds are allowed, "
> +                             "however got %lu fds now",
> +                             VHOST_MEMORY_MAX_NREGIONS, nfds + nfds_t);
> +                goto fail;
> +            }
> +            memcpy(vmsg->fds + nfds, fds_t,
> +                   nfds_t *sizeof(vmsg->fds[0]));
> +            nfds += nfds_t;
> +            g_free(fds_t);
> +        }
> +        if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
> +            break;
> +        }
> +        iov.iov_base = (char *)vmsg + read_bytes;
> +        iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
> +    } while (true);
> +
> +    vmsg->fd_num = nfds;
> +    /* qio_channel_readv_full will make socket fds blocking, unblock them */
> +    vmsg_unblock_fds(vmsg);
> +    if (vmsg->size > sizeof(vmsg->payload)) {
> +        error_report("Error: too big message request: %d, "
> +                     "size: vmsg->size: %u, "
> +                     "while sizeof(vmsg->payload) = %zu",
> +                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
> +        goto fail;
> +    }
> +
> +    struct iovec iov_payload = {
> +        .iov_base = (char *)&vmsg->payload,
> +        .iov_len = vmsg->size,
> +    };
> +    if (vmsg->size) {
> +        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
> +        if (rc == -1) {
> +            error_report_err(local_err);
> +            goto fail;
> +        }
> +    }
> +
> +    return true;
> +
> +fail:
> +    vmsg_close_fds(vmsg);
> +
> +    return false;
> +}
> +
> +
> +static void vu_client_start(VuServer *server);
> +static coroutine_fn void vu_client_trip(void *opaque)
> +{
> +    VuServer *server = opaque;
> +
> +    while (!server->aio_context_changed && server->sioc) {
> +        vu_dispatch(&server->vu_dev);
> +    }
> +
> +    if (server->aio_context_changed && server->sioc) {
> +        server->aio_context_changed = false;
> +        vu_client_start(server);
> +    }
> +}

This is somewhat convoluted, but ok. As soon as my patch "util/async:
Add aio_co_reschedule_self()" is merged, we can use it to simplify this
a bit.

> +static void vu_client_start(VuServer *server)
> +{
> +    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
> +    aio_co_enter(server->ctx, server->co_trip);
> +}
> +
> +/*
> + * a wrapper for vu_kick_cb
> + *
> + * since aio_dispatch can only pass one user data pointer to the
> + * callback function, pack VuDev and pvt into a struct. Then unpack it
> + * and pass them to vu_kick_cb
> + */
> +static void kick_handler(void *opaque)
> +{
> +    KickInfo *kick_info = opaque;
> +    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
> +}
> +
> +
> +static void
> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
> +          vu_watch_cb cb, void *pvt)
> +{
> +
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +    g_assert(vu_dev);
> +    g_assert(fd >= 0);
> +    long index = (intptr_t) pvt;
> +    g_assert(cb);
> +    KickInfo *kick_info = &server->kick_info[index];
> +    if (!kick_info->cb) {
> +        kick_info->fd = fd;
> +        kick_info->cb = cb;
> +        qemu_set_nonblock(fd);
> +        aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
> +                           NULL, NULL, kick_info);
> +        kick_info->vu_dev = vu_dev;
> +    }
> +}
> +
> +
> +static void remove_watch(VuDev *vu_dev, int fd)
> +{
> +    VuServer *server;
> +    int i;
> +    int index = -1;
> +    g_assert(vu_dev);
> +    g_assert(fd >= 0);
> +
> +    server = container_of(vu_dev, VuServer, vu_dev);
> +    for (i = 0; i < vu_dev->max_queues; i++) {
> +        if (server->kick_info[i].fd == fd) {
> +            index = i;
> +            break;
> +        }
> +    }
> +
> +    if (index == -1) {
> +        return;
> +    }
> +    server->kick_info[i].cb = NULL;
> +    aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
> +}
> +
> +
> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
> +                      gpointer opaque)
> +{
> +    VuServer *server = opaque;
> +
> +    if (server->sioc) {
> +        warn_report("Only one vhost-user client is allowed to "
> +                    "connect the server one time");
> +        return;
> +    }
> +
> +    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
> +                 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
> +        error_report("Failed to initialized libvhost-user");
> +        return;
> +    }
> +
> +    /*
> +     * Unset the callback function for network listener to make another
> +     * vhost-user client keeping waiting until this client disconnects
> +     */
> +    qio_net_listener_set_client_func(server->listener,
> +                                     NULL,
> +                                     NULL,
> +                                     NULL);
> +    server->sioc = sioc;
> +    server->kick_info = g_new0(KickInfo, server->max_queues);
> +    /*
> +     * Increase the object reference, so sioc will not freed by
> +     * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
> +     */
> +    object_ref(OBJECT(server->sioc));
> +    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
> +    server->ioc = QIO_CHANNEL(sioc);
> +    object_ref(OBJECT(server->ioc));
> +    qio_channel_attach_aio_context(server->ioc, server->ctx);
> +    qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
> +    vu_client_start(server);
> +}
> +
> +
> +void vhost_user_server_stop(VuServer *server)
> +{
> +    if (!server) {
> +        return;
> +    }

There is no reason why the caller should even pass NULL.

> +    if (server->sioc) {
> +        close_client(server);
> +        object_unref(OBJECT(server->sioc));

close_client() already unrefs it. Do we really hold two references? If
so, why?

I can see that vu_accept() takes an extra reference, but the comment
there says this is because QIOChannel takes ownership.

> +    }
> +
> +    if (server->listener) {
> +        qio_net_listener_disconnect(server->listener);
> +        object_unref(OBJECT(server->listener));
> +    }
> +
> +    g_free(server->kick_info);

Don't we need to wait for co_trip to terminate somewhere? Probably
before freeing any objects because it could still use them.

I assume vhost_user_server_stop() is always called from the main thread
whereas co_trip runs in the server AioContext, so extra care is
necessary.

> +}
> +
> +static void detach_context(VuServer *server)
> +{
> +    int i;
> +    AioContext *ctx = server->ioc->ctx;
> +    qio_channel_detach_aio_context(server->ioc);
> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
> +        if (server->kick_info[i].cb) {
> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
> +                               NULL, NULL, NULL);
> +        }
> +    }
> +}
> +
> +static void attach_context(VuServer *server, AioContext *ctx)
> +{
> +    int i;
> +    qio_channel_attach_aio_context(server->ioc, ctx);
> +    server->aio_context_changed = true;
> +    if (server->co_trip) {
> +        aio_co_schedule(ctx, server->co_trip);
> +    }
> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
> +        if (server->kick_info[i].cb) {
> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
> +                               kick_handler, NULL, NULL,
> +                               &server->kick_info[i]);
> +        }
> +    }
> +}

There is a lot of duplication between detach_context() and
attach_context(). I think implementing this directly in
vhost_user_server_set_aio_context() for both cases at once would result
in simpler code.

> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
> +{
> +    server->ctx = ctx ? ctx : qemu_get_aio_context();
> +    if (!server->sioc) {
> +        return;
> +    }
> +    if (ctx) {
> +        attach_context(server, ctx);
> +    } else {
> +        detach_context(server);
> +    }
> +}

What happens if the VuServer is already attached to an AioContext and
you change it to another AioContext? Shouldn't it be detached from the
old context and attached to the new one instead of only doing the
latter?

> +
> +bool vhost_user_server_start(VuServer *server,
> +                             SocketAddress *socket_addr,
> +                             AioContext *ctx,
> +                             uint16_t max_queues,
> +                             DevicePanicNotifierFn *device_panic_notifier,
> +                             const VuDevIface *vu_iface,
> +                             Error **errp)
> +{

I think this is the function that is supposed to initialise the VuServer
object, so would it be better to first zero it out completely?

Or alternatively assign it completely like this (which automatically
zeroes any unspecified field):

    *server = (VuServer) {
        .vu_iface       = vu_iface,
        .max_queues     = max_queues,
        ...
    }

> +    server->listener = qio_net_listener_new();
> +    if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
> +                                   errp) < 0) {
> +        return false;
> +    }
> +
> +    qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
> +
> +    server->vu_iface = vu_iface;
> +    server->max_queues = max_queues;
> +    server->ctx = ctx;
> +    server->device_panic_notifier = device_panic_notifier;
> +    qio_net_listener_set_client_func(server->listener,
> +                                     vu_accept,
> +                                     server,
> +                                     NULL);
> +
> +    return true;
> +}
> diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
> new file mode 100644
> index 0000000000..5baf58f96a
> --- /dev/null
> +++ b/util/vhost-user-server.h
> @@ -0,0 +1,61 @@
> +/*
> + * Sharing QEMU devices via vhost-user protocol
> + *
> + * Author: Coiby Xu <coiby.xu@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef VHOST_USER_SERVER_H
> +#define VHOST_USER_SERVER_H
> +
> +#include "contrib/libvhost-user/libvhost-user.h"
> +#include "io/channel-socket.h"
> +#include "io/channel-file.h"
> +#include "io/net-listener.h"
> +#include "qemu/error-report.h"
> +#include "qapi/error.h"
> +#include "standard-headers/linux/virtio_blk.h"
> +
> +typedef struct KickInfo {
> +    VuDev *vu_dev;
> +    int fd; /*kick fd*/
> +    long index; /*queue index*/
> +    vu_watch_cb cb;
> +} KickInfo;
> +
> +typedef struct VuServer {
> +    QIONetListener *listener;
> +    AioContext *ctx;
> +    void (*device_panic_notifier)(struct VuServer *server) ;

Extra space before the semicolon.

> +    int max_queues;
> +    const VuDevIface *vu_iface;
> +    VuDev vu_dev;
> +    QIOChannel *ioc; /* The I/O channel with the client */
> +    QIOChannelSocket *sioc; /* The underlying data channel with the client */
> +    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
> +    QIOChannel *ioc_slave;
> +    QIOChannelSocket *sioc_slave;
> +    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
> +    KickInfo *kick_info; /* an array with the length of the queue number */

"an array with @max_queues elements"?

> +    /* restart coroutine co_trip if AIOContext is changed */
> +    bool aio_context_changed;
> +} VuServer;
> +
> +
> +typedef void DevicePanicNotifierFn(struct VuServer *server);
> +
> +bool vhost_user_server_start(VuServer *server,
> +                             SocketAddress *unix_socket,
> +                             AioContext *ctx,
> +                             uint16_t max_queues,
> +                             DevicePanicNotifierFn *device_panic_notifier,
> +                             const VuDevIface *vu_iface,
> +                             Error **errp);
> +
> +void vhost_user_server_stop(VuServer *server);
> +
> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);
> +
> +#endif /* VHOST_USER_SERVER_H */

Kevin
Stefan Hajnoczi June 19, 2020, 12:13 p.m. UTC | #2
On Mon, Jun 15, 2020 at 02:39:04AM +0800, Coiby Xu wrote:
> +/*
> + * a wrapper for vu_kick_cb
> + *
> + * since aio_dispatch can only pass one user data pointer to the
> + * callback function, pack VuDev and pvt into a struct. Then unpack it
> + * and pass them to vu_kick_cb
> + */
> +static void kick_handler(void *opaque)
> +{
> +    KickInfo *kick_info = opaque;
> +    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);

Where is kick_info->index assigned? It appears to be NULL in all cases.

> +}
> +
> +
> +static void
> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
> +          vu_watch_cb cb, void *pvt)
> +{
> +
> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> +    g_assert(vu_dev);
> +    g_assert(fd >= 0);
> +    long index = (intptr_t) pvt;

The meaning of the pvt argument is not defined in the library interface.
set_watch() callbacks shouldn't interpret pvt.

You could modify libvhost-user to explicitly pass the virtqueue index
(or -1 if the fd is not associated with a virtqueue), but it's nice to
avoid libvhost-user API changes so that existing libvhost-user
applications don't require modifications.

What I would do here is to change the ->kick_info[] data struct. How
about a linked list of VuFdWatch objects? That way the code can handle
any number of fd watches and doesn't make assumptions about virtqueues.
set_watch() is a generic fd monitoring interface and doesn't need to be
tied to virtqueues.
Coiby Xu Aug. 17, 2020, 8:24 a.m. UTC | #3
On Fri, Jun 19, 2020 at 01:13:00PM +0100, Stefan Hajnoczi wrote:
>On Mon, Jun 15, 2020 at 02:39:04AM +0800, Coiby Xu wrote:
>> +/*
>> + * a wrapper for vu_kick_cb
>> + *
>> + * since aio_dispatch can only pass one user data pointer to the
>> + * callback function, pack VuDev and pvt into a struct. Then unpack it
>> + * and pass them to vu_kick_cb
>> + */
>> +static void kick_handler(void *opaque)
>> +{
>> +    KickInfo *kick_info = opaque;
>> +    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
>
>Where is kick_info->index assigned? It appears to be NULL in all cases.
>
>> +}
>> +
>> +
>> +static void
>> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
>> +          vu_watch_cb cb, void *pvt)
>> +{
>> +
>> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
>> +    g_assert(vu_dev);
>> +    g_assert(fd >= 0);
>> +    long index = (intptr_t) pvt;
>
>The meaning of the pvt argument is not defined in the library interface.
>set_watch() callbacks shouldn't interpret pvt.
>
>You could modify libvhost-user to explicitly pass the virtqueue index
>(or -1 if the fd is not associated with a virtqueue), but it's nice to
>avoid libvhost-user API changes so that existing libvhost-user
>applications don't require modifications.
>
>What I would do here is to change the ->kick_info[] data struct. How
>about a linked list of VuFdWatch objects? That way the code can handle
>any number of fd watches and doesn't make assumptions about virtqueues.
>set_watch() is a generic fd monitoring interface and doesn't need to be
>tied to virtqueues.

A linked list of VuFdWatch objects has been adopted in v10. Thank you!

--
Best regards,
Coiby
Coiby Xu Aug. 17, 2020, 8:59 a.m. UTC | #4
On Thu, Jun 18, 2020 at 03:29:26PM +0200, Kevin Wolf wrote:
>Am 14.06.2020 um 20:39 hat Coiby Xu geschrieben:
>> Sharing QEMU devices via vhost-user protocol.
>>
>> Only one vhost-user client can connect to the server one time.
>>
>> Signed-off-by: Coiby Xu <coiby.xu@gmail.com>
>> ---
>>  util/Makefile.objs       |   1 +
>>  util/vhost-user-server.c | 400 +++++++++++++++++++++++++++++++++++++++
>>  util/vhost-user-server.h |  61 ++++++
>>  3 files changed, 462 insertions(+)
>>  create mode 100644 util/vhost-user-server.c
>>  create mode 100644 util/vhost-user-server.h
>>
>> diff --git a/util/Makefile.objs b/util/Makefile.objs
>> index cc5e37177a..b4d4af06dc 100644
>> --- a/util/Makefile.objs
>> +++ b/util/Makefile.objs
>> @@ -66,6 +66,7 @@ util-obj-y += hbitmap.o
>>  util-obj-y += main-loop.o
>>  util-obj-y += nvdimm-utils.o
>>  util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
>> +util-obj-$(CONFIG_LINUX) += vhost-user-server.o
>>  util-obj-y += qemu-coroutine-sleep.o
>>  util-obj-y += qemu-co-shared-resource.o
>>  util-obj-y += qemu-sockets.o
>> diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c
>> new file mode 100644
>> index 0000000000..393beeb6b9
>> --- /dev/null
>> +++ b/util/vhost-user-server.c
>> @@ -0,0 +1,400 @@
>> +/*
>> + * Sharing QEMU devices via vhost-user protocol
>> + *
>> + * Author: Coiby Xu <coiby.xu@gmail.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +#include "qemu/osdep.h"
>> +#include <sys/eventfd.h>
>> +#include "qemu/main-loop.h"
>> +#include "vhost-user-server.h"
>> +
>> +static void vmsg_close_fds(VhostUserMsg *vmsg)
>> +{
>> +    int i;
>> +    for (i = 0; i < vmsg->fd_num; i++) {
>> +        close(vmsg->fds[i]);
>> +    }
>> +}
>> +
>> +static void vmsg_unblock_fds(VhostUserMsg *vmsg)
>> +{
>> +    int i;
>> +    for (i = 0; i < vmsg->fd_num; i++) {
>> +        qemu_set_nonblock(vmsg->fds[i]);
>> +    }
>> +}
>> +
>> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
>> +                      gpointer opaque);
>> +
>> +static void close_client(VuServer *server)
>> +{
>> +    vu_deinit(&server->vu_dev);
>> +    object_unref(OBJECT(server->sioc));
>> +    object_unref(OBJECT(server->ioc));
>> +    server->sioc_slave = NULL;
>
>Where is sioc_slave closed/freed?

Thank you for pointing out my neglect! When working on v10, I realized
communication on the slave channel can't be done easily in a coroutine.
So I simply dropped the support.

>> +    object_unref(OBJECT(server->ioc_slave));
>> +    /*
>> +     * Set the callback function for network listener so another
>> +     * vhost-user client can connect to this server
>> +     */
>> +    qio_net_listener_set_client_func(server->listener,
>> +                                     vu_accept,
>> +                                     server,
>> +                                     NULL);
>
>If connecting another client to the server should work, don't we have to
>set at least server->sioc = NULL so that vu_accept() won't error out?

Previously I set `server->sioc = NULL` in the panic_cb, i.e. only when the
client disconnects, because I thought it's different from the case that the
server is shutdown. But this differentiating is not necessary. In v10, I
has moved `server->sioc = NULL` into `close_client`.

>
>> +}
>> +
>> +static void panic_cb(VuDev *vu_dev, const char *buf)
>> +{
>> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
>> +
>> +    if (buf) {
>> +        error_report("vu_panic: %s", buf);
>> +    }
>> +
>> +    if (server->sioc) {
>> +        close_client(server);
>> +        server->sioc = NULL;
>> +    }
>> +
>> +    if (server->device_panic_notifier) {
>> +        server->device_panic_notifier(server);
>> +    }
>> +}
>> +
>> +static QIOChannel *slave_io_channel(VuServer *server, int fd,
>> +                                    Error **local_err)
>> +{
>> +    if (server->sioc_slave) {
>> +        if (fd == server->sioc_slave->fd) {
>> +            return server->ioc_slave;
>> +        }
>> +    } else {
>> +        server->sioc_slave = qio_channel_socket_new_fd(fd, local_err);
>> +        if (!*local_err) {
>> +            server->ioc_slave = QIO_CHANNEL(server->sioc_slave);
>> +            return server->ioc_slave;
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static bool coroutine_fn
>> +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
>> +{
>> +    struct iovec iov = {
>> +        .iov_base = (char *)vmsg,
>> +        .iov_len = VHOST_USER_HDR_SIZE,
>> +    };
>> +    int rc, read_bytes = 0;
>> +    Error *local_err = NULL;
>> +    /*
>> +     * Store fds/nfds returned from qio_channel_readv_full into
>> +     * temporary variables.
>> +     *
>> +     * VhostUserMsg is a packed structure, gcc will complain about passing
>> +     * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
>> +     * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
>> +     * thus two temporary variables nfds and fds are used here.
>> +     */
>> +    size_t nfds = 0, nfds_t = 0;
>> +    int *fds_t = NULL;
>> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
>> +    QIOChannel *ioc = NULL;
>> +
>> +    if (conn_fd == server->sioc->fd) {
>> +        ioc = server->ioc;
>> +    } else {
>> +        /* Slave communication will also use this function to read msg */
>> +        ioc = slave_io_channel(server, conn_fd, &local_err);
>> +    }
>> +
>> +    if (!ioc) {
>> +        error_report_err(local_err);
>> +        goto fail;
>> +    }
>> +
>> +    assert(qemu_in_coroutine());
>> +    do {
>> +        /*
>> +         * qio_channel_readv_full may have short reads, keeping calling it
>> +         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
>> +         */
>> +        rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &local_err);
>> +        if (rc < 0) {
>> +            if (rc == QIO_CHANNEL_ERR_BLOCK) {
>> +                qio_channel_yield(ioc, G_IO_IN);
>> +                continue;
>> +            } else {
>> +                error_report_err(local_err);
>> +                return false;
>> +            }
>> +        }
>> +        read_bytes += rc;
>> +        if (nfds_t > 0) {
>> +            if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
>> +                error_report("A maximum of %d fds are allowed, "
>> +                             "however got %lu fds now",
>> +                             VHOST_MEMORY_MAX_NREGIONS, nfds + nfds_t);
>> +                goto fail;
>> +            }
>> +            memcpy(vmsg->fds + nfds, fds_t,
>> +                   nfds_t *sizeof(vmsg->fds[0]));
>> +            nfds += nfds_t;
>> +            g_free(fds_t);
>> +        }
>> +        if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
>> +            break;
>> +        }
>> +        iov.iov_base = (char *)vmsg + read_bytes;
>> +        iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
>> +    } while (true);
>> +
>> +    vmsg->fd_num = nfds;
>> +    /* qio_channel_readv_full will make socket fds blocking, unblock them */
>> +    vmsg_unblock_fds(vmsg);
>> +    if (vmsg->size > sizeof(vmsg->payload)) {
>> +        error_report("Error: too big message request: %d, "
>> +                     "size: vmsg->size: %u, "
>> +                     "while sizeof(vmsg->payload) = %zu",
>> +                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
>> +        goto fail;
>> +    }
>> +
>> +    struct iovec iov_payload = {
>> +        .iov_base = (char *)&vmsg->payload,
>> +        .iov_len = vmsg->size,
>> +    };
>> +    if (vmsg->size) {
>> +        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
>> +        if (rc == -1) {
>> +            error_report_err(local_err);
>> +            goto fail;
>> +        }
>> +    }
>> +
>> +    return true;
>> +
>> +fail:
>> +    vmsg_close_fds(vmsg);
>> +
>> +    return false;
>> +}
>> +
>> +
>> +static void vu_client_start(VuServer *server);
>> +static coroutine_fn void vu_client_trip(void *opaque)
>> +{
>> +    VuServer *server = opaque;
>> +
>> +    while (!server->aio_context_changed && server->sioc) {
>> +        vu_dispatch(&server->vu_dev);
>> +    }
>> +
>> +    if (server->aio_context_changed && server->sioc) {
>> +        server->aio_context_changed = false;
>> +        vu_client_start(server);
>> +    }
>> +}
>
>This is somewhat convoluted, but ok. As soon as my patch "util/async:
>Add aio_co_reschedule_self()" is merged, we can use it to simplify this
>a bit.

I will simplify this when your patch is merged.

>
>> +static void vu_client_start(VuServer *server)
>> +{
>> +    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
>> +    aio_co_enter(server->ctx, server->co_trip);
>> +}
>> +
>> +/*
>> + * a wrapper for vu_kick_cb
>> + *
>> + * since aio_dispatch can only pass one user data pointer to the
>> + * callback function, pack VuDev and pvt into a struct. Then unpack it
>> + * and pass them to vu_kick_cb
>> + */
>> +static void kick_handler(void *opaque)
>> +{
>> +    KickInfo *kick_info = opaque;
>> +    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
>> +}
>> +
>> +
>> +static void
>> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
>> +          vu_watch_cb cb, void *pvt)
>> +{
>> +
>> +    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
>> +    g_assert(vu_dev);
>> +    g_assert(fd >= 0);
>> +    long index = (intptr_t) pvt;
>> +    g_assert(cb);
>> +    KickInfo *kick_info = &server->kick_info[index];
>> +    if (!kick_info->cb) {
>> +        kick_info->fd = fd;
>> +        kick_info->cb = cb;
>> +        qemu_set_nonblock(fd);
>> +        aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
>> +                           NULL, NULL, kick_info);
>> +        kick_info->vu_dev = vu_dev;
>> +    }
>> +}
>> +
>> +
>> +static void remove_watch(VuDev *vu_dev, int fd)
>> +{
>> +    VuServer *server;
>> +    int i;
>> +    int index = -1;
>> +    g_assert(vu_dev);
>> +    g_assert(fd >= 0);
>> +
>> +    server = container_of(vu_dev, VuServer, vu_dev);
>> +    for (i = 0; i < vu_dev->max_queues; i++) {
>> +        if (server->kick_info[i].fd == fd) {
>> +            index = i;
>> +            break;
>> +        }
>> +    }
>> +
>> +    if (index == -1) {
>> +        return;
>> +    }
>> +    server->kick_info[i].cb = NULL;
>> +    aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
>> +}
>> +
>> +
>> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
>> +                      gpointer opaque)
>> +{
>> +    VuServer *server = opaque;
>> +
>> +    if (server->sioc) {
>> +        warn_report("Only one vhost-user client is allowed to "
>> +                    "connect the server one time");
>> +        return;
>> +    }
>> +
>> +    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
>> +                 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
>> +        error_report("Failed to initialized libvhost-user");
>> +        return;
>> +    }
>> +
>> +    /*
>> +     * Unset the callback function for network listener to make another
>> +     * vhost-user client keeping waiting until this client disconnects
>> +     */
>> +    qio_net_listener_set_client_func(server->listener,
>> +                                     NULL,
>> +                                     NULL,
>> +                                     NULL);
>> +    server->sioc = sioc;
>> +    server->kick_info = g_new0(KickInfo, server->max_queues);
>> +    /*
>> +     * Increase the object reference, so sioc will not freed by
>> +     * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
>> +     */
>> +    object_ref(OBJECT(server->sioc));
>> +    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
>> +    server->ioc = QIO_CHANNEL(sioc);
>> +    object_ref(OBJECT(server->ioc));
>> +    qio_channel_attach_aio_context(server->ioc, server->ctx);
>> +    qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
>> +    vu_client_start(server);
>> +}
>> +
>> +
>> +void vhost_user_server_stop(VuServer *server)
>> +{
>> +    if (!server) {
>> +        return;
>> +    }
>
>There is no reason why the caller should even pass NULL.

Removed in v10.

>> +    if (server->sioc) {
>> +        close_client(server);
>> +        object_unref(OBJECT(server->sioc));
>
>close_client() already unrefs it. Do we really hold two references? If
>so, why?
>
>I can see that vu_accept() takes an extra reference, but the comment
>there says this is because QIOChannel takes ownership.

This is my neglect! Thank you!

>> +    }
>> +
>> +    if (server->listener) {
>> +        qio_net_listener_disconnect(server->listener);
>> +        object_unref(OBJECT(server->listener));
>> +    }
>> +
>> +    g_free(server->kick_info);
>
>Don't we need to wait for co_trip to terminate somewhere? Probably
>before freeing any objects because it could still use them.
>
>I assume vhost_user_server_stop() is always called from the main thread
>whereas co_trip runs in the server AioContext, so extra care is
>necessary.
>
>> +}
>> +
>> +static void detach_context(VuServer *server)
>> +{
>> +    int i;
>> +    AioContext *ctx = server->ioc->ctx;
>> +    qio_channel_detach_aio_context(server->ioc);
>> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
>> +        if (server->kick_info[i].cb) {
>> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
>> +                               NULL, NULL, NULL);
>> +        }
>> +    }
>> +}
>> +
>> +static void attach_context(VuServer *server, AioContext *ctx)
>> +{
>> +    int i;
>> +    qio_channel_attach_aio_context(server->ioc, ctx);
>> +    server->aio_context_changed = true;
>> +    if (server->co_trip) {
>> +        aio_co_schedule(ctx, server->co_trip);
>> +    }
>> +    for (i = 0; i < server->vu_dev.max_queues; i++) {
>> +        if (server->kick_info[i].cb) {
>> +            aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
>> +                               kick_handler, NULL, NULL,
>> +                               &server->kick_info[i]);
>> +        }
>> +    }
>> +}
>
>There is a lot of duplication between detach_context() and
>attach_context(). I think implementing this directly in
>vhost_user_server_set_aio_context() for both cases at once would result
>in simpler code.

Thank you for the advice! In v10, both cases have been dealt with in
vhost_user_server_set_aio_context since in both cases we need to iterate
over the kick handlers.

>
>> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
>> +{
>> +    server->ctx = ctx ? ctx : qemu_get_aio_context();
>> +    if (!server->sioc) {
>> +        return;
>> +    }
>> +    if (ctx) {
>> +        attach_context(server, ctx);
>> +    } else {
>> +        detach_context(server);
>> +    }
>> +}
>
>What happens if the VuServer is already attached to an AioContext and
>you change it to another AioContext? Shouldn't it be detached from the
>old context and attached to the new one instead of only doing the
>latter?

Based on my understanding, when there's a change of a block drive's AioConext,
it will first call context detachment hook and then call context
attachment hook. So this is not an issue.

>
>> +
>> +bool vhost_user_server_start(VuServer *server,
>> +                             SocketAddress *socket_addr,
>> +                             AioContext *ctx,
>> +                             uint16_t max_queues,
>> +                             DevicePanicNotifierFn *device_panic_notifier,
>> +                             const VuDevIface *vu_iface,
>> +                             Error **errp)
>> +{
>
>I think this is the function that is supposed to initialise the VuServer
>object, so would it be better to first zero it out completely?
>
>Or alternatively assign it completely like this (which automatically
>zeroes any unspecified field):
>
>    *server = (VuServer) {
>        .vu_iface       = vu_iface,
>        .max_queues     = max_queues,
>        ...
>    }

Thank you for the suggestion!

>
>> +    server->listener = qio_net_listener_new();
>> +    if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
>> +                                   errp) < 0) {
>> +        return false;
>> +    }
>> +
>> +    qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
>> +
>> +    server->vu_iface = vu_iface;
>> +    server->max_queues = max_queues;
>> +    server->ctx = ctx;
>> +    server->device_panic_notifier = device_panic_notifier;
>> +    qio_net_listener_set_client_func(server->listener,
>> +                                     vu_accept,
>> +                                     server,
>> +                                     NULL);
>> +
>> +    return true;
>> +}
>> diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
>> new file mode 100644
>> index 0000000000..5baf58f96a
>> --- /dev/null
>> +++ b/util/vhost-user-server.h
>> @@ -0,0 +1,61 @@
>> +/*
>> + * Sharing QEMU devices via vhost-user protocol
>> + *
>> + * Author: Coiby Xu <coiby.xu@gmail.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef VHOST_USER_SERVER_H
>> +#define VHOST_USER_SERVER_H
>> +
>> +#include "contrib/libvhost-user/libvhost-user.h"
>> +#include "io/channel-socket.h"
>> +#include "io/channel-file.h"
>> +#include "io/net-listener.h"
>> +#include "qemu/error-report.h"
>> +#include "qapi/error.h"
>> +#include "standard-headers/linux/virtio_blk.h"
>> +
>> +typedef struct KickInfo {
>> +    VuDev *vu_dev;
>> +    int fd; /*kick fd*/
>> +    long index; /*queue index*/
>> +    vu_watch_cb cb;
>> +} KickInfo;
>> +
>> +typedef struct VuServer {
>> +    QIONetListener *listener;
>> +    AioContext *ctx;
>> +    void (*device_panic_notifier)(struct VuServer *server) ;
>
>Extra space before the semicolon.
>
>> +    int max_queues;
>> +    const VuDevIface *vu_iface;
>> +    VuDev vu_dev;
>> +    QIOChannel *ioc; /* The I/O channel with the client */
>> +    QIOChannelSocket *sioc; /* The underlying data channel with the client */
>> +    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
>> +    QIOChannel *ioc_slave;
>> +    QIOChannelSocket *sioc_slave;
>> +    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
>> +    KickInfo *kick_info; /* an array with the length of the queue number */
>
>"an array with @max_queues elements"?

As following Stefan's advice, a linked list is used. So this problem
disappears.

>> +    /* restart coroutine co_trip if AIOContext is changed */
>> +    bool aio_context_changed;
>> +} VuServer;
>> +
>> +
>> +typedef void DevicePanicNotifierFn(struct VuServer *server);
>> +
>> +bool vhost_user_server_start(VuServer *server,
>> +                             SocketAddress *unix_socket,
>> +                             AioContext *ctx,
>> +                             uint16_t max_queues,
>> +                             DevicePanicNotifierFn *device_panic_notifier,
>> +                             const VuDevIface *vu_iface,
>> +                             Error **errp);
>> +
>> +void vhost_user_server_stop(VuServer *server);
>> +
>> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);
>> +
>> +#endif /* VHOST_USER_SERVER_H */
>
>Kevin
>

Thank you for reviewing the code!

--
Best regards,
Coiby
diff mbox series

Patch

diff --git a/util/Makefile.objs b/util/Makefile.objs
index cc5e37177a..b4d4af06dc 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -66,6 +66,7 @@  util-obj-y += hbitmap.o
 util-obj-y += main-loop.o
 util-obj-y += nvdimm-utils.o
 util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
+util-obj-$(CONFIG_LINUX) += vhost-user-server.o
 util-obj-y += qemu-coroutine-sleep.o
 util-obj-y += qemu-co-shared-resource.o
 util-obj-y += qemu-sockets.o
diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c
new file mode 100644
index 0000000000..393beeb6b9
--- /dev/null
+++ b/util/vhost-user-server.c
@@ -0,0 +1,400 @@ 
+/*
+ * Sharing QEMU devices via vhost-user protocol
+ *
+ * Author: Coiby Xu <coiby.xu@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include <sys/eventfd.h>
+#include "qemu/main-loop.h"
+#include "vhost-user-server.h"
+
+static void vmsg_close_fds(VhostUserMsg *vmsg)
+{
+    int i;
+    for (i = 0; i < vmsg->fd_num; i++) {
+        close(vmsg->fds[i]);
+    }
+}
+
+static void vmsg_unblock_fds(VhostUserMsg *vmsg)
+{
+    int i;
+    for (i = 0; i < vmsg->fd_num; i++) {
+        qemu_set_nonblock(vmsg->fds[i]);
+    }
+}
+
+static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
+                      gpointer opaque);
+
+static void close_client(VuServer *server)
+{
+    vu_deinit(&server->vu_dev);
+    object_unref(OBJECT(server->sioc));
+    object_unref(OBJECT(server->ioc));
+    server->sioc_slave = NULL;
+    object_unref(OBJECT(server->ioc_slave));
+    /*
+     * Set the callback function for network listener so another
+     * vhost-user client can connect to this server
+     */
+    qio_net_listener_set_client_func(server->listener,
+                                     vu_accept,
+                                     server,
+                                     NULL);
+}
+
+static void panic_cb(VuDev *vu_dev, const char *buf)
+{
+    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+
+    if (buf) {
+        error_report("vu_panic: %s", buf);
+    }
+
+    if (server->sioc) {
+        close_client(server);
+        server->sioc = NULL;
+    }
+
+    if (server->device_panic_notifier) {
+        server->device_panic_notifier(server);
+    }
+}
+
+static QIOChannel *slave_io_channel(VuServer *server, int fd,
+                                    Error **local_err)
+{
+    if (server->sioc_slave) {
+        if (fd == server->sioc_slave->fd) {
+            return server->ioc_slave;
+        }
+    } else {
+        server->sioc_slave = qio_channel_socket_new_fd(fd, local_err);
+        if (!*local_err) {
+            server->ioc_slave = QIO_CHANNEL(server->sioc_slave);
+            return server->ioc_slave;
+        }
+    }
+
+    return NULL;
+}
+
+static bool coroutine_fn
+vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
+{
+    struct iovec iov = {
+        .iov_base = (char *)vmsg,
+        .iov_len = VHOST_USER_HDR_SIZE,
+    };
+    int rc, read_bytes = 0;
+    Error *local_err = NULL;
+    /*
+     * Store fds/nfds returned from qio_channel_readv_full into
+     * temporary variables.
+     *
+     * VhostUserMsg is a packed structure, gcc will complain about passing
+     * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
+     * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
+     * thus two temporary variables nfds and fds are used here.
+     */
+    size_t nfds = 0, nfds_t = 0;
+    int *fds_t = NULL;
+    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+    QIOChannel *ioc = NULL;
+
+    if (conn_fd == server->sioc->fd) {
+        ioc = server->ioc;
+    } else {
+        /* Slave communication will also use this function to read msg */
+        ioc = slave_io_channel(server, conn_fd, &local_err);
+    }
+
+    if (!ioc) {
+        error_report_err(local_err);
+        goto fail;
+    }
+
+    assert(qemu_in_coroutine());
+    do {
+        /*
+         * qio_channel_readv_full may have short reads, keeping calling it
+         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
+         */
+        rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t, &local_err);
+        if (rc < 0) {
+            if (rc == QIO_CHANNEL_ERR_BLOCK) {
+                qio_channel_yield(ioc, G_IO_IN);
+                continue;
+            } else {
+                error_report_err(local_err);
+                return false;
+            }
+        }
+        read_bytes += rc;
+        if (nfds_t > 0) {
+            if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
+                error_report("A maximum of %d fds are allowed, "
+                             "however got %lu fds now",
+                             VHOST_MEMORY_MAX_NREGIONS, nfds + nfds_t);
+                goto fail;
+            }
+            memcpy(vmsg->fds + nfds, fds_t,
+                   nfds_t *sizeof(vmsg->fds[0]));
+            nfds += nfds_t;
+            g_free(fds_t);
+        }
+        if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
+            break;
+        }
+        iov.iov_base = (char *)vmsg + read_bytes;
+        iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
+    } while (true);
+
+    vmsg->fd_num = nfds;
+    /* qio_channel_readv_full will make socket fds blocking, unblock them */
+    vmsg_unblock_fds(vmsg);
+    if (vmsg->size > sizeof(vmsg->payload)) {
+        error_report("Error: too big message request: %d, "
+                     "size: vmsg->size: %u, "
+                     "while sizeof(vmsg->payload) = %zu",
+                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
+        goto fail;
+    }
+
+    struct iovec iov_payload = {
+        .iov_base = (char *)&vmsg->payload,
+        .iov_len = vmsg->size,
+    };
+    if (vmsg->size) {
+        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
+        if (rc == -1) {
+            error_report_err(local_err);
+            goto fail;
+        }
+    }
+
+    return true;
+
+fail:
+    vmsg_close_fds(vmsg);
+
+    return false;
+}
+
+
+static void vu_client_start(VuServer *server);
+static coroutine_fn void vu_client_trip(void *opaque)
+{
+    VuServer *server = opaque;
+
+    while (!server->aio_context_changed && server->sioc) {
+        vu_dispatch(&server->vu_dev);
+    }
+
+    if (server->aio_context_changed && server->sioc) {
+        server->aio_context_changed = false;
+        vu_client_start(server);
+    }
+}
+
+static void vu_client_start(VuServer *server)
+{
+    server->co_trip = qemu_coroutine_create(vu_client_trip, server);
+    aio_co_enter(server->ctx, server->co_trip);
+}
+
+/*
+ * a wrapper for vu_kick_cb
+ *
+ * since aio_dispatch can only pass one user data pointer to the
+ * callback function, pack VuDev and pvt into a struct. Then unpack it
+ * and pass them to vu_kick_cb
+ */
+static void kick_handler(void *opaque)
+{
+    KickInfo *kick_info = opaque;
+    kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
+}
+
+
+static void
+set_watch(VuDev *vu_dev, int fd, int vu_evt,
+          vu_watch_cb cb, void *pvt)
+{
+
+    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
+    g_assert(vu_dev);
+    g_assert(fd >= 0);
+    long index = (intptr_t) pvt;
+    g_assert(cb);
+    KickInfo *kick_info = &server->kick_info[index];
+    if (!kick_info->cb) {
+        kick_info->fd = fd;
+        kick_info->cb = cb;
+        qemu_set_nonblock(fd);
+        aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
+                           NULL, NULL, kick_info);
+        kick_info->vu_dev = vu_dev;
+    }
+}
+
+
+static void remove_watch(VuDev *vu_dev, int fd)
+{
+    VuServer *server;
+    int i;
+    int index = -1;
+    g_assert(vu_dev);
+    g_assert(fd >= 0);
+
+    server = container_of(vu_dev, VuServer, vu_dev);
+    for (i = 0; i < vu_dev->max_queues; i++) {
+        if (server->kick_info[i].fd == fd) {
+            index = i;
+            break;
+        }
+    }
+
+    if (index == -1) {
+        return;
+    }
+    server->kick_info[i].cb = NULL;
+    aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
+}
+
+
+static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
+                      gpointer opaque)
+{
+    VuServer *server = opaque;
+
+    if (server->sioc) {
+        warn_report("Only one vhost-user client is allowed to "
+                    "connect the server one time");
+        return;
+    }
+
+    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
+                 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
+        error_report("Failed to initialized libvhost-user");
+        return;
+    }
+
+    /*
+     * Unset the callback function for network listener to make another
+     * vhost-user client keeping waiting until this client disconnects
+     */
+    qio_net_listener_set_client_func(server->listener,
+                                     NULL,
+                                     NULL,
+                                     NULL);
+    server->sioc = sioc;
+    server->kick_info = g_new0(KickInfo, server->max_queues);
+    /*
+     * Increase the object reference, so sioc will not freed by
+     * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
+     */
+    object_ref(OBJECT(server->sioc));
+    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
+    server->ioc = QIO_CHANNEL(sioc);
+    object_ref(OBJECT(server->ioc));
+    qio_channel_attach_aio_context(server->ioc, server->ctx);
+    qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
+    vu_client_start(server);
+}
+
+
+void vhost_user_server_stop(VuServer *server)
+{
+    if (!server) {
+        return;
+    }
+
+    if (server->sioc) {
+        close_client(server);
+        object_unref(OBJECT(server->sioc));
+    }
+
+    if (server->listener) {
+        qio_net_listener_disconnect(server->listener);
+        object_unref(OBJECT(server->listener));
+    }
+
+    g_free(server->kick_info);
+}
+
+static void detach_context(VuServer *server)
+{
+    int i;
+    AioContext *ctx = server->ioc->ctx;
+    qio_channel_detach_aio_context(server->ioc);
+    for (i = 0; i < server->vu_dev.max_queues; i++) {
+        if (server->kick_info[i].cb) {
+            aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
+                               NULL, NULL, NULL);
+        }
+    }
+}
+
+static void attach_context(VuServer *server, AioContext *ctx)
+{
+    int i;
+    qio_channel_attach_aio_context(server->ioc, ctx);
+    server->aio_context_changed = true;
+    if (server->co_trip) {
+        aio_co_schedule(ctx, server->co_trip);
+    }
+    for (i = 0; i < server->vu_dev.max_queues; i++) {
+        if (server->kick_info[i].cb) {
+            aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
+                               kick_handler, NULL, NULL,
+                               &server->kick_info[i]);
+        }
+    }
+}
+
+void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
+{
+    server->ctx = ctx ? ctx : qemu_get_aio_context();
+    if (!server->sioc) {
+        return;
+    }
+    if (ctx) {
+        attach_context(server, ctx);
+    } else {
+        detach_context(server);
+    }
+}
+
+
+bool vhost_user_server_start(VuServer *server,
+                             SocketAddress *socket_addr,
+                             AioContext *ctx,
+                             uint16_t max_queues,
+                             DevicePanicNotifierFn *device_panic_notifier,
+                             const VuDevIface *vu_iface,
+                             Error **errp)
+{
+    server->listener = qio_net_listener_new();
+    if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
+                                   errp) < 0) {
+        return false;
+    }
+
+    qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
+
+    server->vu_iface = vu_iface;
+    server->max_queues = max_queues;
+    server->ctx = ctx;
+    server->device_panic_notifier = device_panic_notifier;
+    qio_net_listener_set_client_func(server->listener,
+                                     vu_accept,
+                                     server,
+                                     NULL);
+
+    return true;
+}
diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
new file mode 100644
index 0000000000..5baf58f96a
--- /dev/null
+++ b/util/vhost-user-server.h
@@ -0,0 +1,61 @@ 
+/*
+ * Sharing QEMU devices via vhost-user protocol
+ *
+ * Author: Coiby Xu <coiby.xu@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef VHOST_USER_SERVER_H
+#define VHOST_USER_SERVER_H
+
+#include "contrib/libvhost-user/libvhost-user.h"
+#include "io/channel-socket.h"
+#include "io/channel-file.h"
+#include "io/net-listener.h"
+#include "qemu/error-report.h"
+#include "qapi/error.h"
+#include "standard-headers/linux/virtio_blk.h"
+
+typedef struct KickInfo {
+    VuDev *vu_dev;
+    int fd; /*kick fd*/
+    long index; /*queue index*/
+    vu_watch_cb cb;
+} KickInfo;
+
+typedef struct VuServer {
+    QIONetListener *listener;
+    AioContext *ctx;
+    void (*device_panic_notifier)(struct VuServer *server) ;
+    int max_queues;
+    const VuDevIface *vu_iface;
+    VuDev vu_dev;
+    QIOChannel *ioc; /* The I/O channel with the client */
+    QIOChannelSocket *sioc; /* The underlying data channel with the client */
+    /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
+    QIOChannel *ioc_slave;
+    QIOChannelSocket *sioc_slave;
+    Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
+    KickInfo *kick_info; /* an array with the length of the queue number */
+    /* restart coroutine co_trip if AIOContext is changed */
+    bool aio_context_changed;
+} VuServer;
+
+
+typedef void DevicePanicNotifierFn(struct VuServer *server);
+
+bool vhost_user_server_start(VuServer *server,
+                             SocketAddress *unix_socket,
+                             AioContext *ctx,
+                             uint16_t max_queues,
+                             DevicePanicNotifierFn *device_panic_notifier,
+                             const VuDevIface *vu_iface,
+                             Error **errp);
+
+void vhost_user_server_stop(VuServer *server);
+
+void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);
+
+#endif /* VHOST_USER_SERVER_H */