diff mbox series

[v5,07/50] multi-process: define mpqemu-link object

Message ID 20ec6f6666cc8adb211642156f5230e478143b81.1582576372.git.jag.raman@oracle.com
State New
Headers show
Series Initial support for multi-process qemu | expand

Commit Message

Jag Raman Feb. 24, 2020, 8:54 p.m. UTC
Defines mpqemu-link object which forms the communication link between
QEMU & emulation program.
Adds functions to configure members of mpqemu-link object instance.
Adds functions to send and receive messages over the communication
channel.
Adds GMainLoop to handle events received on the communication channel.

Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
---
 v4 -> v5:
  - Cleaned up included header files

 include/io/mpqemu-link.h | 138 ++++++++++++++++++++++
 io/Makefile.objs         |   2 +
 io/mpqemu-link.c         | 294 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 434 insertions(+)
 create mode 100644 include/io/mpqemu-link.h
 create mode 100644 io/mpqemu-link.c

Comments

Stefan Hajnoczi March 10, 2020, 4:09 p.m. UTC | #1
On Mon, Feb 24, 2020 at 03:54:58PM -0500, Jagannathan Raman wrote:
> +/*
> + * TODO: Dont use mpqemu link object since it is
> + * not needed to be created via -object.
> + */

Please investigate and resolve this TODO.

> +struct conf_data_msg {
> +    uint32_t addr;
> +    uint32_t val;
> +    int l;

Please use a self-explanatory field name.  I'm not sure what 'l' is.

conf_data_msg is not used in this patch.  Please introduce things when
they are needed to make the patch series easier to review in a linear
fashion.

> +/*
> + * TODO: make all communications asynchronous and run in the main
> + * loop or existing IOThread.
> + */

Please investigate and decide how to resolve this TODO.

> +void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan)
> +{
> +    int rc;
> +    uint8_t *data;
> +    union {
> +        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
> +        struct cmsghdr align;
> +    } u;
> +    struct msghdr hdr;
> +    struct cmsghdr *chdr;
> +    int sock = chan->sock;
> +    QemuMutex *lock = &chan->send_lock;
> +
> +    struct iovec iov = {
> +        .iov_base = (char *) msg,
> +        .iov_len = MPQEMU_MSG_HDR_SIZE,
> +    };
> +
> +    memset(&hdr, 0, sizeof(hdr));
> +    memset(&u, 0, sizeof(u));
> +
> +    hdr.msg_iov = &iov;
> +    hdr.msg_iovlen = 1;
> +
> +    if (msg->num_fds > REMOTE_MAX_FDS) {
> +        qemu_log_mask(LOG_REMOTE_DEBUG, "%s: Max FDs exceeded\n", __func__);
> +        return;
> +    }
> +
> +    if (msg->num_fds > 0) {
> +        size_t fdsize = msg->num_fds * sizeof(int);
> +
> +        hdr.msg_control = &u;
> +        hdr.msg_controllen = sizeof(u);
> +
> +        chdr = CMSG_FIRSTHDR(&hdr);
> +        chdr->cmsg_len = CMSG_LEN(fdsize);
> +        chdr->cmsg_level = SOL_SOCKET;
> +        chdr->cmsg_type = SCM_RIGHTS;
> +        memcpy(CMSG_DATA(chdr), msg->fds, fdsize);
> +        hdr.msg_controllen = CMSG_SPACE(fdsize);
> +    }
> +
> +    qemu_mutex_lock(lock);
> +
> +    do {
> +        rc = sendmsg(sock, &hdr, 0);
> +    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> +
> +    if (rc < 0) {
> +        qemu_log_mask(LOG_REMOTE_DEBUG, "%s - sendmsg rc is %d, errno is %d,"
> +                      " sock %d\n", __func__, rc, errno, sock);
> +        qemu_mutex_unlock(lock);
> +        return;
> +    }
> +
> +    if (msg->bytestream) {
> +        data = msg->data2;
> +    } else {
> +        data = (uint8_t *)msg + MPQEMU_MSG_HDR_SIZE;
> +    }
> +
> +    do {
> +        rc = write(sock, data, msg->size);
> +    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> +
> +    qemu_mutex_unlock(lock);

Can this lock be avoided by using a single sendmsg(2) syscall instead of
sendmsg() + write()?  I feel deja vu here, like I maybe have raised this
in a previous revision of this patch series.

> +    msg->num_fds = 0;
> +    for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
> +         chdr = CMSG_NXTHDR(&hdr, chdr)) {
> +        if ((chdr->cmsg_level == SOL_SOCKET) &&
> +            (chdr->cmsg_type == SCM_RIGHTS)) {
> +            fdsize = chdr->cmsg_len - CMSG_LEN(0);
> +            msg->num_fds = fdsize / sizeof(int);
> +            if (msg->num_fds > REMOTE_MAX_FDS) {
> +                /*
> +                 * TODO: Security issue detected. Sender never sends more
> +                 * than REMOTE_MAX_FDS. This condition should be signaled to
> +                 * the admin
> +                 */

This TODO doesn't seem actionable.  The error is already handled.

> +                qemu_log_mask(LOG_REMOTE_DEBUG,
> +                              "%s: Max FDs exceeded\n", __func__);
> +                return -ERANGE;

The mutex must be released.
Elena Ufimtseva March 10, 2020, 6:26 p.m. UTC | #2
On Tue, Mar 10, 2020 at 04:09:41PM +0000, Stefan Hajnoczi wrote:
> On Mon, Feb 24, 2020 at 03:54:58PM -0500, Jagannathan Raman wrote:
> > +/*
> > + * TODO: Dont use mpqemu link object since it is
> > + * not needed to be created via -object.
> > + */
> 
> Please investigate and resolve this TODO.
>
Thank you Stefan for reviewing more patches.
This particular TODO have to be removed and I am guessing
followed us from the earlier code.
 
> > +struct conf_data_msg {
> > +    uint32_t addr;
> > +    uint32_t val;
> > +    int l;
> 
> Please use a self-explanatory field name.  I'm not sure what 'l' is.
> 
> conf_data_msg is not used in this patch.  Please introduce things when
> they are needed to make the patch series easier to review in a linear
> fashion.

Will do.
> 
> > +/*
> > + * TODO: make all communications asynchronous and run in the main
> > + * loop or existing IOThread.
> > + */
> 
> Please investigate and decide how to resolve this TODO.
> 
> > +void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan)
> > +{
> > +    int rc;
> > +    uint8_t *data;
> > +    union {
> > +        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
> > +        struct cmsghdr align;
> > +    } u;
> > +    struct msghdr hdr;
> > +    struct cmsghdr *chdr;
> > +    int sock = chan->sock;
> > +    QemuMutex *lock = &chan->send_lock;
> > +
> > +    struct iovec iov = {
> > +        .iov_base = (char *) msg,
> > +        .iov_len = MPQEMU_MSG_HDR_SIZE,
> > +    };
> > +
> > +    memset(&hdr, 0, sizeof(hdr));
> > +    memset(&u, 0, sizeof(u));
> > +
> > +    hdr.msg_iov = &iov;
> > +    hdr.msg_iovlen = 1;
> > +
> > +    if (msg->num_fds > REMOTE_MAX_FDS) {
> > +        qemu_log_mask(LOG_REMOTE_DEBUG, "%s: Max FDs exceeded\n", __func__);
> > +        return;
> > +    }
> > +
> > +    if (msg->num_fds > 0) {
> > +        size_t fdsize = msg->num_fds * sizeof(int);
> > +
> > +        hdr.msg_control = &u;
> > +        hdr.msg_controllen = sizeof(u);
> > +
> > +        chdr = CMSG_FIRSTHDR(&hdr);
> > +        chdr->cmsg_len = CMSG_LEN(fdsize);
> > +        chdr->cmsg_level = SOL_SOCKET;
> > +        chdr->cmsg_type = SCM_RIGHTS;
> > +        memcpy(CMSG_DATA(chdr), msg->fds, fdsize);
> > +        hdr.msg_controllen = CMSG_SPACE(fdsize);
> > +    }
> > +
> > +    qemu_mutex_lock(lock);
> > +
> > +    do {
> > +        rc = sendmsg(sock, &hdr, 0);
> > +    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> > +
> > +    if (rc < 0) {
> > +        qemu_log_mask(LOG_REMOTE_DEBUG, "%s - sendmsg rc is %d, errno is %d,"
> > +                      " sock %d\n", __func__, rc, errno, sock);
> > +        qemu_mutex_unlock(lock);
> > +        return;
> > +    }
> > +
> > +    if (msg->bytestream) {
> > +        data = msg->data2;
> > +    } else {
> > +        data = (uint8_t *)msg + MPQEMU_MSG_HDR_SIZE;
> > +    }
> > +
> > +    do {
> > +        rc = write(sock, data, msg->size);
> > +    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
> > +
> > +    qemu_mutex_unlock(lock);
> 
> Can this lock be avoided by using a single sendmsg(2) syscall instead of
> sendmsg() + write()?  I feel deja vu here, like I maybe have raised this
> in a previous revision of this patch series.
> 

Indeed, you did mention this. Sorry, it got forgotten.
It seems to be possible, we will investigate further and include in the
next version.

> > +    msg->num_fds = 0;
> > +    for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
> > +         chdr = CMSG_NXTHDR(&hdr, chdr)) {
> > +        if ((chdr->cmsg_level == SOL_SOCKET) &&
> > +            (chdr->cmsg_type == SCM_RIGHTS)) {
> > +            fdsize = chdr->cmsg_len - CMSG_LEN(0);
> > +            msg->num_fds = fdsize / sizeof(int);
> > +            if (msg->num_fds > REMOTE_MAX_FDS) {
> > +                /*
> > +                 * TODO: Security issue detected. Sender never sends more
> > +                 * than REMOTE_MAX_FDS. This condition should be signaled to
> > +                 * the admin
> > +                 */
> 
> This TODO doesn't seem actionable.  The error is already handled.
> 
> > +                qemu_log_mask(LOG_REMOTE_DEBUG,
> > +                              "%s: Max FDs exceeded\n", __func__);
> > +                return -ERANGE;
> 
> The mutex must be released.

Thank you! Will fix this and above.


Elena
Stefan Hajnoczi March 16, 2020, 11:26 a.m. UTC | #3
On Tue, Mar 10, 2020 at 11:26:23AM -0700, Elena Ufimtseva wrote:
> On Tue, Mar 10, 2020 at 04:09:41PM +0000, Stefan Hajnoczi wrote:
> > On Mon, Feb 24, 2020 at 03:54:58PM -0500, Jagannathan Raman wrote:
> > > +    msg->num_fds = 0;
> > > +    for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
> > > +         chdr = CMSG_NXTHDR(&hdr, chdr)) {
> > > +        if ((chdr->cmsg_level == SOL_SOCKET) &&
> > > +            (chdr->cmsg_type == SCM_RIGHTS)) {
> > > +            fdsize = chdr->cmsg_len - CMSG_LEN(0);
> > > +            msg->num_fds = fdsize / sizeof(int);
> > > +            if (msg->num_fds > REMOTE_MAX_FDS) {
> > > +                /*
> > > +                 * TODO: Security issue detected. Sender never sends more
> > > +                 * than REMOTE_MAX_FDS. This condition should be signaled to
> > > +                 * the admin
> > > +                 */
> > 
> > This TODO doesn't seem actionable.  The error is already handled.
> > 
> > > +                qemu_log_mask(LOG_REMOTE_DEBUG,
> > > +                              "%s: Max FDs exceeded\n", __func__);
> > > +                return -ERANGE;
> > 
> > The mutex must be released.
> 
> Thank you! Will fix this and above.

I have posted a patch series that adds lock guards (automatic unlocking)
to prevent cases like this in the future:
https://lists.gnu.org/archive/html/qemu-devel/2020-03/msg04628.html

You can use the QEMU_LOCK_GUARD() and/or WITH_QEMU_LOCK_GUARD() macros
to avoid the need for manual qemu_mutex_unlock() calls.

Stefan
diff mbox series

Patch

diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
new file mode 100644
index 0000000..2f2dd83
--- /dev/null
+++ b/include/io/mpqemu-link.h
@@ -0,0 +1,138 @@ 
+/*
+ * Communication channel between QEMU and remote device process
+ *
+ * Copyright © 2018, 2020 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef MPQEMU_LINK_H
+#define MPQEMU_LINK_H
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+
+#include "qom/object.h"
+#include "qemu/thread.h"
+
+#define TYPE_MPQEMU_LINK "mpqemu-link"
+#define MPQEMU_LINK(obj) \
+    OBJECT_CHECK(MPQemuLinkState, (obj), TYPE_MPQEMU_LINK)
+
+#define REMOTE_MAX_FDS 8
+
+#define MPQEMU_MSG_HDR_SIZE offsetof(MPQemuMsg, data1.u64)
+
+/*
+ * TODO: Dont use mpqemu link object since it is
+ * not needed to be created via -object.
+ */
+
+/**
+ * mpqemu_cmd_t:
+ * PCI_CONFIG_READ        PCI configuration space read
+ * PCI_CONFIG_WRITE       PCI configuration space write
+ *
+ * proc_cmd_t enum type to specify the command to be executed on the remote
+ * device.
+ */
+typedef enum {
+    INIT = 0,
+    PCI_CONFIG_READ,
+    PCI_CONFIG_WRITE,
+    MAX,
+} mpqemu_cmd_t;
+
+/**
+ * MPQemuMsg:
+ * @cmd: The remote command
+ * @bytestream: Indicates if the data to be shared is structured (data1)
+ *              or unstructured (data2)
+ * @size: Size of the data to be shared
+ * @data1: Structured data
+ * @fds: File descriptors to be shared with remote device
+ * @data2: Unstructured data
+ *
+ * MPQemuMsg Format of the message sent to the remote device from QEMU.
+ *
+ */
+typedef struct {
+    mpqemu_cmd_t cmd;
+    int bytestream;
+    size_t size;
+
+    union {
+        uint64_t u64;
+    } data1;
+
+    int fds[REMOTE_MAX_FDS];
+    int num_fds;
+
+    uint8_t *data2;
+} MPQemuMsg;
+
+struct conf_data_msg {
+    uint32_t addr;
+    uint32_t val;
+    int l;
+};
+
+/**
+ * MPQemuChannel:
+ * @gsrc: GSource object to be used by loop
+ * @gpfd: GPollFD object containing the socket & events to monitor
+ * @sock: Socket to send/receive communication, same as the one in gpfd
+ * @send_lock: Mutex to synchronize access to the send stream
+ * @recv_lock: Mutex to synchronize access to the recv stream
+ *
+ * Defines the channel that make up the communication link
+ * between QEMU and remote process
+ */
+
+typedef struct MPQemuChannel {
+    GSource gsrc;
+    GPollFD gpfd;
+    int sock;
+    QemuMutex send_lock;
+    QemuMutex recv_lock;
+} MPQemuChannel;
+
+typedef void (*mpqemu_link_callback)(GIOCondition cond, MPQemuChannel *chan);
+
+/*
+ * MPQemuLinkState Instance info. of the communication
+ * link between QEMU and remote process. The Link could
+ * be made up of multiple channels.
+ *
+ * ctx        GMainContext to be used for communication
+ * loop       Main loop that would be used to poll for incoming data
+ * com        Communication channel to transport control messages
+ *
+ */
+
+typedef struct MPQemuLinkState {
+    Object obj;
+
+    GMainContext *ctx;
+    GMainLoop *loop;
+
+    MPQemuChannel *com;
+
+    mpqemu_link_callback callback;
+} MPQemuLinkState;
+
+MPQemuLinkState *mpqemu_link_create(void);
+void mpqemu_link_finalize(MPQemuLinkState *s);
+
+void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan);
+int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan);
+
+void mpqemu_init_channel(MPQemuLinkState *s, MPQemuChannel **chan, int fd);
+void mpqemu_destroy_channel(MPQemuChannel *chan);
+void mpqemu_link_set_callback(MPQemuLinkState *s,
+                              mpqemu_link_callback callback);
+void mpqemu_start_coms(MPQemuLinkState *s);
+
+#endif
diff --git a/io/Makefile.objs b/io/Makefile.objs
index 9a20fce..5875ab0 100644
--- a/io/Makefile.objs
+++ b/io/Makefile.objs
@@ -10,3 +10,5 @@  io-obj-y += channel-util.o
 io-obj-y += dns-resolver.o
 io-obj-y += net-listener.o
 io-obj-y += task.o
+
+io-obj-$(CONFIG_MPQEMU) += mpqemu-link.o
diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
new file mode 100644
index 0000000..bac120b
--- /dev/null
+++ b/io/mpqemu-link.c
@@ -0,0 +1,294 @@ 
+/*
+ * Communication channel between QEMU and remote device process
+ *
+ * Copyright © 2018, 2020 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+
+#include "qemu/module.h"
+#include "io/mpqemu-link.h"
+#include "qemu/log.h"
+
+GSourceFuncs gsrc_funcs;
+
+/*
+ * TODO: make all communications asynchronous and run in the main
+ * loop or existing IOThread.
+ */
+
+static void mpqemu_link_inst_init(Object *obj)
+{
+    MPQemuLinkState *s = MPQEMU_LINK(obj);
+
+    s->ctx = g_main_context_default();
+    s->loop = g_main_loop_new(s->ctx, FALSE);
+}
+
+static const TypeInfo mpqemu_link_info = {
+    .name = TYPE_MPQEMU_LINK,
+    .parent = TYPE_OBJECT,
+    .instance_size = sizeof(MPQemuLinkState),
+    .instance_init = mpqemu_link_inst_init,
+};
+
+static void mpqemu_link_register_types(void)
+{
+    type_register_static(&mpqemu_link_info);
+}
+
+type_init(mpqemu_link_register_types)
+
+MPQemuLinkState *mpqemu_link_create(void)
+{
+    return MPQEMU_LINK(object_new(TYPE_MPQEMU_LINK));
+}
+
+void mpqemu_link_finalize(MPQemuLinkState *s)
+{
+    g_main_loop_unref(s->loop);
+    g_main_context_unref(s->ctx);
+    g_main_loop_quit(s->loop);
+
+    mpqemu_destroy_channel(s->com);
+
+    object_unref(OBJECT(s));
+}
+
+void mpqemu_msg_send(MPQemuMsg *msg, MPQemuChannel *chan)
+{
+    int rc;
+    uint8_t *data;
+    union {
+        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
+        struct cmsghdr align;
+    } u;
+    struct msghdr hdr;
+    struct cmsghdr *chdr;
+    int sock = chan->sock;
+    QemuMutex *lock = &chan->send_lock;
+
+    struct iovec iov = {
+        .iov_base = (char *) msg,
+        .iov_len = MPQEMU_MSG_HDR_SIZE,
+    };
+
+    memset(&hdr, 0, sizeof(hdr));
+    memset(&u, 0, sizeof(u));
+
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+
+    if (msg->num_fds > REMOTE_MAX_FDS) {
+        qemu_log_mask(LOG_REMOTE_DEBUG, "%s: Max FDs exceeded\n", __func__);
+        return;
+    }
+
+    if (msg->num_fds > 0) {
+        size_t fdsize = msg->num_fds * sizeof(int);
+
+        hdr.msg_control = &u;
+        hdr.msg_controllen = sizeof(u);
+
+        chdr = CMSG_FIRSTHDR(&hdr);
+        chdr->cmsg_len = CMSG_LEN(fdsize);
+        chdr->cmsg_level = SOL_SOCKET;
+        chdr->cmsg_type = SCM_RIGHTS;
+        memcpy(CMSG_DATA(chdr), msg->fds, fdsize);
+        hdr.msg_controllen = CMSG_SPACE(fdsize);
+    }
+
+    qemu_mutex_lock(lock);
+
+    do {
+        rc = sendmsg(sock, &hdr, 0);
+    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+
+    if (rc < 0) {
+        qemu_log_mask(LOG_REMOTE_DEBUG, "%s - sendmsg rc is %d, errno is %d,"
+                      " sock %d\n", __func__, rc, errno, sock);
+        qemu_mutex_unlock(lock);
+        return;
+    }
+
+    if (msg->bytestream) {
+        data = msg->data2;
+    } else {
+        data = (uint8_t *)msg + MPQEMU_MSG_HDR_SIZE;
+    }
+
+    do {
+        rc = write(sock, data, msg->size);
+    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+
+    qemu_mutex_unlock(lock);
+}
+
+
+int mpqemu_msg_recv(MPQemuMsg *msg, MPQemuChannel *chan)
+{
+    int rc;
+    uint8_t *data;
+    union {
+        char control[CMSG_SPACE(REMOTE_MAX_FDS * sizeof(int))];
+        struct cmsghdr align;
+    } u;
+    struct msghdr hdr;
+    struct cmsghdr *chdr;
+    size_t fdsize;
+    int sock = chan->sock;
+    QemuMutex *lock = &chan->recv_lock;
+
+    struct iovec iov = {
+        .iov_base = (char *) msg,
+        .iov_len = MPQEMU_MSG_HDR_SIZE,
+    };
+
+    memset(&hdr, 0, sizeof(hdr));
+    memset(&u, 0, sizeof(u));
+
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+    hdr.msg_control = &u;
+    hdr.msg_controllen = sizeof(u);
+
+    qemu_mutex_lock(lock);
+
+    do {
+        rc = recvmsg(sock, &hdr, 0);
+    } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+
+    if (rc < 0) {
+        qemu_log_mask(LOG_REMOTE_DEBUG, "%s - recvmsg rc is %d, errno is %d,"
+                      " sock %d\n", __func__, rc, errno, sock);
+        qemu_mutex_unlock(lock);
+        return rc;
+    }
+
+    msg->num_fds = 0;
+    for (chdr = CMSG_FIRSTHDR(&hdr); chdr != NULL;
+         chdr = CMSG_NXTHDR(&hdr, chdr)) {
+        if ((chdr->cmsg_level == SOL_SOCKET) &&
+            (chdr->cmsg_type == SCM_RIGHTS)) {
+            fdsize = chdr->cmsg_len - CMSG_LEN(0);
+            msg->num_fds = fdsize / sizeof(int);
+            if (msg->num_fds > REMOTE_MAX_FDS) {
+                /*
+                 * TODO: Security issue detected. Sender never sends more
+                 * than REMOTE_MAX_FDS. This condition should be signaled to
+                 * the admin
+                 */
+                qemu_log_mask(LOG_REMOTE_DEBUG,
+                              "%s: Max FDs exceeded\n", __func__);
+                return -ERANGE;
+            }
+
+            memcpy(msg->fds, CMSG_DATA(chdr), fdsize);
+            break;
+        }
+    }
+
+    if (msg->bytestream) {
+        if (!msg->size) {
+            qemu_mutex_unlock(lock);
+            return -EINVAL;
+        }
+
+        msg->data2 = calloc(1, msg->size);
+        data = msg->data2;
+    } else {
+        data = (uint8_t *)&msg->data1;
+    }
+
+    if (msg->size) {
+        do {
+            rc = read(sock, data, msg->size);
+        } while (rc < 0 && (errno == EINTR || errno == EAGAIN));
+    }
+
+    qemu_mutex_unlock(lock);
+
+    return rc;
+}
+
+static gboolean mpqemu_link_handler_prepare(GSource *gsrc, gint *timeout)
+{
+    g_assert(timeout);
+
+    *timeout = -1;
+
+    return FALSE;
+}
+
+static gboolean mpqemu_link_handler_check(GSource *gsrc)
+{
+    MPQemuChannel *chan = (MPQemuChannel *)gsrc;
+
+    return chan->gpfd.events & chan->gpfd.revents;
+}
+
+static gboolean mpqemu_link_handler_dispatch(GSource *gsrc, GSourceFunc func,
+                                             gpointer data)
+{
+    MPQemuLinkState *s = (MPQemuLinkState *)data;
+    MPQemuChannel *chan = (MPQemuChannel *)gsrc;
+
+    s->callback(chan->gpfd.revents, chan);
+
+    if ((chan->gpfd.revents & G_IO_HUP) || (chan->gpfd.revents & G_IO_ERR)) {
+        return G_SOURCE_REMOVE;
+    }
+
+    return G_SOURCE_CONTINUE;
+}
+
+void mpqemu_link_set_callback(MPQemuLinkState *s, mpqemu_link_callback callback)
+{
+    s->callback = callback;
+}
+
+void mpqemu_init_channel(MPQemuLinkState *s, MPQemuChannel **chan, int fd)
+{
+    MPQemuChannel *src;
+
+    gsrc_funcs = (GSourceFuncs){
+        .prepare = mpqemu_link_handler_prepare,
+        .check = mpqemu_link_handler_check,
+        .dispatch = mpqemu_link_handler_dispatch,
+        .finalize = NULL,
+    };
+
+    src = (MPQemuChannel *)g_source_new(&gsrc_funcs, sizeof(MPQemuChannel));
+
+    src->sock = fd;
+    qemu_mutex_init(&src->send_lock);
+    qemu_mutex_init(&src->recv_lock);
+
+    g_source_set_callback(&src->gsrc, NULL, (gpointer)s, NULL);
+    src->gpfd.fd = fd;
+    src->gpfd.events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+    g_source_add_poll(&src->gsrc, &src->gpfd);
+
+    *chan = src;
+}
+
+void mpqemu_destroy_channel(MPQemuChannel *chan)
+{
+    g_source_unref(&chan->gsrc);
+    close(chan->sock);
+    qemu_mutex_destroy(&chan->send_lock);
+    qemu_mutex_destroy(&chan->recv_lock);
+}
+
+void mpqemu_start_coms(MPQemuLinkState *s)
+{
+
+    g_assert(g_source_attach(&s->com->gsrc, s->ctx));
+
+    g_main_loop_run(s->loop);
+}