Patchwork [5/8] virtio-serial-bus: Add support for buffering guest output, throttling guests

login
register
mail settings
Submitter Amit Shah
Date Dec. 23, 2009, 7:52 p.m.
Message ID <1261597948-24293-6-git-send-email-amit.shah@redhat.com>
Download mbox | patch
Permalink /patch/41696/
State New
Headers show

Comments

Amit Shah - Dec. 23, 2009, 7:52 p.m.
Guests send us one buffer at a time. Current guests send buffers sized
4K bytes. If guest userspace applications sent out > 4K bytes in one
write() syscall, the write request actually sends out multiple buffers,
each of 4K in size.

This usually isn't a problem but for some apps, like VNC, the entire
data has to be sent in one go to make copy/paste work fine. So if an app
on the guest sends out guest clipboard contents, it has to be sent to
the vnc server in one go as the guest app sent it.

For this to be done, we need the guest to send us START and END markers
for each write request so that we can find out complete buffers and send
them off to ports.

This needs us to buffer all the data that comes in from the guests, hold
it off till we see all the data corresponding to one write request,
merge it all in one buffer and then send it to the port the data was
destined for.

Also, we add support for caching of these buffers till a port indicates
it's ready to receive data.

We keep caching data the guest sends us till a port accepts it. However,
this could lead to an OOM condition where a rogue process on the guest
could continue pumping in data while the host continues to cache it.

We introduce a per-port byte-limit property to alleviate this condition.
When this limit is reached, we send a control message to the guest
indicating it to not send us any more data till further indication. When
the number of bytes cached go lesser than the limit specified, we open
tell the guest to restart sending data.

Signed-off-by: Amit Shah <amit.shah@redhat.com>
---
 hw/virtio-serial-bus.c |  315 +++++++++++++++++++++++++++++++++++++++++++++++-
 hw/virtio-serial.c     |    7 +
 hw/virtio-serial.h     |   44 +++++++
 3 files changed, 364 insertions(+), 2 deletions(-)
Anthony Liguori - Dec. 23, 2009, 11:17 p.m.
On 12/23/2009 01:52 PM, Amit Shah wrote:
> Guests send us one buffer at a time. Current guests send buffers sized
> 4K bytes. If guest userspace applications sent out>  4K bytes in one
> write() syscall, the write request actually sends out multiple buffers,
> each of 4K in size.
>
> This usually isn't a problem but for some apps, like VNC, the entire
> data has to be sent in one go to make copy/paste work fine. So if an app
> on the guest sends out guest clipboard contents, it has to be sent to
> the vnc server in one go as the guest app sent it.
>
> For this to be done, we need the guest to send us START and END markers
> for each write request so that we can find out complete buffers and send
> them off to ports.
>
> This needs us to buffer all the data that comes in from the guests, hold
> it off till we see all the data corresponding to one write request,
> merge it all in one buffer and then send it to the port the data was
> destined for.
>
> Also, we add support for caching of these buffers till a port indicates
> it's ready to receive data.
>
> We keep caching data the guest sends us till a port accepts it. However,
> this could lead to an OOM condition where a rogue process on the guest
> could continue pumping in data while the host continues to cache it.
>
> We introduce a per-port byte-limit property to alleviate this condition.
> When this limit is reached, we send a control message to the guest
> indicating it to not send us any more data till further indication. When
> the number of bytes cached go lesser than the limit specified, we open
> tell the guest to restart sending data.
>
> Signed-off-by: Amit Shah<amit.shah@redhat.com>
> ---
>   hw/virtio-serial-bus.c |  315 +++++++++++++++++++++++++++++++++++++++++++++++-
>   hw/virtio-serial.c     |    7 +
>   hw/virtio-serial.h     |   44 +++++++
>   3 files changed, 364 insertions(+), 2 deletions(-)
>

> +static void flush_all_ports(VirtIOSerial *vser)
> +{
> +    struct VirtIOSerialPort *port;
> +
> +    QTAILQ_FOREACH(port,&vser->ports, next) {
> +        pthread_mutex_lock(&port->unflushed_buffers_lock);
> +        if (port->has_activity) {
> +            port->has_activity = false;
> +            flush_queue(port);
> +        }
> +        pthread_mutex_unlock(&port->unflushed_buffers_lock);
> +    }
> +}
> +
> +static void remove_port_buffers(VirtIOSerialPort *port)
> +{
> +    struct VirtIOSerialPortBuffer *buf, *buf2;
> +
> +    pthread_mutex_lock(&port->unflushed_buffers_lock);
> +    QTAILQ_FOREACH_SAFE(buf,&port->unflushed_buffers, next, buf2) {
> +        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
> +        free_buf(buf);
> +    }
> +    pthread_mutex_unlock(&port->unflushed_buffers_lock);
> +}

The locking here is unnecessary.

Regards,

Anthony Liguori

Patch

diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
index b683109..12317ba 100644
--- a/hw/virtio-serial-bus.c
+++ b/hw/virtio-serial-bus.c
@@ -52,6 +52,20 @@  struct VirtIOSerial {
     uint32_t guest_features;
 };
 
+/* This struct holds individual buffers received for each port */
+typedef struct VirtIOSerialPortBuffer {
+    QTAILQ_ENTRY(VirtIOSerialPortBuffer) next;
+
+    uint8_t *buf;
+
+    size_t len; /* length of the buffer */
+    size_t offset; /* offset from which to consume data in the buffer */
+
+    uint32_t flags; /* Sent by guest (start of data stream, end of stream) */
+
+    bool previous_failure; /* Did sending out this buffer fail previously? */
+} VirtIOSerialPortBuffer;
+
 static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
 {
     VirtIOSerialPort *port;
@@ -167,6 +181,201 @@  static size_t send_control_event(VirtIOSerialPort *port, uint16_t event,
     return send_control_msg(port, &cpkt, sizeof(cpkt));
 }
 
+static void init_buf(VirtIOSerialPortBuffer *buf, uint8_t *buffer, size_t len)
+{
+    buf->buf = buffer;
+    buf->len = len;
+    buf->offset = 0;
+    buf->flags = 0;
+    buf->previous_failure = false;
+}
+
+static VirtIOSerialPortBuffer *alloc_buf(size_t len)
+{
+    VirtIOSerialPortBuffer *buf;
+
+    buf = qemu_malloc(sizeof(*buf));
+    buf->buf = qemu_malloc(len);
+
+    init_buf(buf, buf->buf, len);
+
+    return buf;
+}
+
+static void free_buf(VirtIOSerialPortBuffer *buf)
+{
+    qemu_free(buf->buf);
+    qemu_free(buf);
+}
+
+static size_t get_complete_data_size(VirtIOSerialPort *port)
+{
+    VirtIOSerialPortBuffer *buf;
+    size_t size;
+    bool is_complete, start_seen;
+
+    size = 0;
+    is_complete = false;
+    start_seen = false;
+    QTAILQ_FOREACH(buf, &port->unflushed_buffers, next) {
+        size += buf->len - buf->offset;
+
+        if (buf->flags & VIRTIO_CONSOLE_HDR_END_DATA) {
+            is_complete = true;
+            break;
+        }
+        if (buf == QTAILQ_FIRST(&port->unflushed_buffers)
+            && !(buf->flags & VIRTIO_CONSOLE_HDR_START_DATA)) {
+
+            /* There's some data that arrived without a START flag. Flush it. */
+            is_complete = true;
+            break;
+        }
+
+        if (buf->flags & VIRTIO_CONSOLE_HDR_START_DATA) {
+            if (start_seen) {
+                /*
+                 * There's some data that arrived without an END
+                 * flag. Flush it.
+                 */
+                size -= buf->len + buf->offset;
+                is_complete = true;
+                break;
+            }
+            start_seen = true;
+        }
+    }
+    return is_complete ? size : 0;
+}
+
+/*
+ * The guest could have sent the data corresponding to one write
+ * request split up in multiple buffers. The first buffer has the
+ * VIRTIO_CONSOLE_HDR_START_DATA flag set and the last buffer has the
+ * VIRTIO_CONSOLE_HDR_END_DATA flag set. Using this information, merge
+ * the parts into one buf here to process it for output.
+ */
+static VirtIOSerialPortBuffer *get_complete_buf(VirtIOSerialPort *port)
+{
+    VirtIOSerialPortBuffer *buf, *buf2;
+    uint8_t *outbuf;
+    size_t out_offset, out_size;
+
+    out_size = get_complete_data_size(port);
+    if (!out_size)
+        return NULL;
+
+    buf = QTAILQ_FIRST(&port->unflushed_buffers);
+    if (buf->len - buf->offset == out_size) {
+        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
+        return buf;
+    }
+    out_offset = 0;
+    outbuf = qemu_malloc(out_size);
+
+    QTAILQ_FOREACH_SAFE(buf, &port->unflushed_buffers, next, buf2) {
+        size_t copy_size;
+
+        copy_size = buf->len - buf->offset;
+        memcpy(outbuf + out_offset, buf->buf + buf->offset, copy_size);
+        out_offset += copy_size;
+
+        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
+        qemu_free(buf->buf);
+
+        if (out_offset == out_size) {
+            break;
+        }
+        qemu_free(buf);
+    }
+    init_buf(buf, outbuf, out_size);
+    buf->flags = VIRTIO_CONSOLE_HDR_START_DATA | VIRTIO_CONSOLE_HDR_END_DATA;
+
+    return buf;
+}
+
+/* Call with the unflushed_buffers_lock held */
+static void flush_queue(VirtIOSerialPort *port)
+{
+    VirtIOSerialPortBuffer *buf;
+    size_t out_size;
+    ssize_t ret;
+
+    /*
+     * If a device is interested in buffering packets till it's
+     * opened, cache the data the guest sends us till a connection is
+     * established.
+     */
+    if (!port->host_connected && port->cache_buffers) {
+        return;
+    }
+    while ((buf = get_complete_buf(port))) {
+        out_size = buf->len - buf->offset;
+        if (!port->host_connected) {
+            /*
+             * Caching is disabled and host is not connected, so
+             * discard the buffer. Do this only after merging the
+             * buffer as a port can get connected in the middle of
+             * dropping buffers and the port will end up getting the
+             * incomplete output.
+             */
+            port->nr_bytes -= buf->len + buf->offset;
+            free_buf(buf);
+            continue;
+        }
+
+        ret = port->info->have_data(port, buf->buf + buf->offset, out_size);
+        if (ret < out_size) {
+            QTAILQ_INSERT_HEAD(&port->unflushed_buffers, buf, next);
+        }
+        if (ret <= 0) {
+            /* We're not progressing at all */
+            if (buf->previous_failure) {
+                break;
+            }
+            buf->previous_failure = true;
+        } else {
+            buf->offset += ret;
+            port->nr_bytes -= ret;
+            buf->previous_failure = false;
+        }
+        if (!(buf->len - buf->offset)) {
+            free_buf(buf);
+        }
+    }
+
+    if (port->host_throttled && port->nr_bytes < port->byte_limit) {
+        port->host_throttled = false;
+        send_control_event(port, VIRTIO_CONSOLE_THROTTLE_PORT, 0);
+    }
+}
+
+static void flush_all_ports(VirtIOSerial *vser)
+{
+    struct VirtIOSerialPort *port;
+
+    QTAILQ_FOREACH(port, &vser->ports, next) {
+        pthread_mutex_lock(&port->unflushed_buffers_lock);
+        if (port->has_activity) {
+            port->has_activity = false;
+            flush_queue(port);
+        }
+        pthread_mutex_unlock(&port->unflushed_buffers_lock);
+    }
+}
+
+static void remove_port_buffers(VirtIOSerialPort *port)
+{
+    struct VirtIOSerialPortBuffer *buf, *buf2;
+
+    pthread_mutex_lock(&port->unflushed_buffers_lock);
+    QTAILQ_FOREACH_SAFE(buf, &port->unflushed_buffers, next, buf2) {
+        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
+        free_buf(buf);
+    }
+    pthread_mutex_unlock(&port->unflushed_buffers_lock);
+}
+
 /* Functions for use inside qemu to open and read from/write to ports */
 int virtio_serial_open(VirtIOSerialPort *port)
 {
@@ -178,6 +387,12 @@  int virtio_serial_open(VirtIOSerialPort *port)
     port->host_connected = true;
     send_control_event(port, VIRTIO_CONSOLE_PORT_OPEN, 1);
 
+    /* Flush any buffers that were cached while the port was closed */
+    if (port->cache_buffers && port->info->have_data) {
+        pthread_mutex_lock(&port->unflushed_buffers_lock);
+        flush_queue(port);
+        pthread_mutex_unlock(&port->unflushed_buffers_lock);
+    }
     return 0;
 }
 
@@ -186,6 +401,9 @@  int virtio_serial_close(VirtIOSerialPort *port)
     port->host_connected = false;
     send_control_event(port, VIRTIO_CONSOLE_PORT_OPEN, 0);
 
+    if (!port->cache_buffers) {
+        remove_port_buffers(port);
+    }
     return 0;
 }
 
@@ -270,6 +488,13 @@  static void handle_control_message(VirtIOSerial *vser, void *buf)
             send_control_msg(port, buffer, buffer_len);
             qemu_free(buffer);
         }
+        /*
+         * We also want to signal to the guest whether or not the port
+         * is set to caching the buffers when disconnected.
+         */
+        if (port->cache_buffers) {
+            send_control_event(port, VIRTIO_CONSOLE_CACHE_BUFFERS, 1);
+        }
         if (port->host_connected) {
             send_control_event(port, VIRTIO_CONSOLE_PORT_OPEN, 1);
         }
@@ -317,6 +542,10 @@  static void control_out(VirtIODevice *vdev, VirtQueue *vq)
 
 /*
  * Guest wrote something to some port.
+ *
+ * Flush the data in the entire chunk that we received rather than
+ * splitting it into multiple buffers. VNC clients don't consume split
+ * buffers
  */
 static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
 {
@@ -327,6 +556,7 @@  static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
 
     while (virtqueue_pop(vq, &elem)) {
         VirtIOSerialPort *port;
+        VirtIOSerialPortBuffer *buf;
         struct virtio_console_header header;
         int header_len;
 
@@ -335,6 +565,9 @@  static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
         if (elem.out_sg[0].iov_len < header_len) {
             goto next_buf;
         }
+        if (header_len) {
+            memcpy(&header, elem.out_sg[0].iov_base, header_len);
+        }
         port = find_port_by_vq(vser, vq);
         if (!port) {
             goto next_buf;
@@ -349,13 +582,40 @@  static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
         }
 
         /* The guest always sends only one sg */
-        port->info->have_data(port, elem.out_sg[0].iov_base + header_len,
-                              elem.out_sg[0].iov_len - header_len);
+        buf = alloc_buf(elem.out_sg[0].iov_len - header_len);
+        memcpy(buf->buf, elem.out_sg[0].iov_base + header_len, buf->len);
+
+        if (header_len) {
+            /*
+             * Only the first buffer in a stream will have this
+             * set. This will help us identify the first buffer and
+             * the remaining buffers in the stream based on length
+             */
+            buf->flags = header.flags & (VIRTIO_CONSOLE_HDR_START_DATA
+                                         | VIRTIO_CONSOLE_HDR_END_DATA);
+        } else {
+            /* We always want to flush all the buffers in this case */
+            buf->flags = VIRTIO_CONSOLE_HDR_START_DATA
+                | VIRTIO_CONSOLE_HDR_END_DATA;
+        }
 
+        pthread_mutex_lock(&port->unflushed_buffers_lock);
+        QTAILQ_INSERT_TAIL(&port->unflushed_buffers, buf, next);
+        port->nr_bytes += buf->len;
+        port->has_activity = true;
+        pthread_mutex_unlock(&port->unflushed_buffers_lock);
+
+        if (!port->host_throttled && port->byte_limit &&
+            port->nr_bytes >= port->byte_limit) {
+
+            port->host_throttled = true;
+            send_control_event(port, VIRTIO_CONSOLE_THROTTLE_PORT, 1);
+        }
     next_buf:
         virtqueue_push(vq, &elem, elem.out_sg[0].iov_len);
     }
     virtio_notify(vdev, vq);
+    flush_all_ports(vser);
 }
 
 static void handle_input(VirtIODevice *vdev, VirtQueue *vq)
@@ -396,6 +656,7 @@  static void virtio_serial_save(QEMUFile *f, void *opaque)
     VirtIOSerial *s = opaque;
     VirtIOSerialPort *port;
     uint32_t nr_active_ports;
+    unsigned int nr_bufs;
 
     /* The virtio device */
     virtio_save(&s->vdev, f);
@@ -418,14 +679,34 @@  static void virtio_serial_save(QEMUFile *f, void *opaque)
      * Items in struct VirtIOSerialPort.
      */
     QTAILQ_FOREACH(port, &s->ports, next) {
+        VirtIOSerialPortBuffer *buf;
+
         /*
          * We put the port number because we may not have an active
          * port at id 0 that's reserved for a console port, or in case
          * of ports that might have gotten unplugged
          */
         qemu_put_be32s(f, &port->id);
+        qemu_put_be64s(f, &port->byte_limit);
+        qemu_put_be64s(f, &port->nr_bytes);
         qemu_put_byte(f, port->guest_connected);
+        qemu_put_byte(f, port->host_throttled);
 
+        /* All the pending buffers from active ports */
+        nr_bufs = 0;
+        QTAILQ_FOREACH(buf, &port->unflushed_buffers, next) {
+            nr_bufs++;
+        }
+        qemu_put_be32s(f, &nr_bufs);
+        if (!nr_bufs) {
+            continue;
+        }
+        QTAILQ_FOREACH(buf, &port->unflushed_buffers, next) {
+            qemu_put_be64s(f, &buf->len);
+            qemu_put_be64s(f, &buf->offset);
+            qemu_put_be32s(f, &buf->flags);
+            qemu_put_buffer(f, buf->buf, buf->len);
+        }
     }
 }
 
@@ -457,13 +738,34 @@  static int virtio_serial_load(QEMUFile *f, void *opaque, int version_id)
 
     /* Items in struct VirtIOSerialPort */
     for (i = 0; i < nr_active_ports; i++) {
+        VirtIOSerialPortBuffer *buf;
         uint32_t id;
+        unsigned int nr_bufs;
 
         id = qemu_get_be32(f);
         port = find_port_by_id(s, id);
 
+        port->byte_limit = qemu_get_be64(f);
+        port->nr_bytes   = qemu_get_be64(f);
         port->guest_connected = qemu_get_byte(f);
+        port->host_throttled = qemu_get_byte(f);
+
+        /* All the pending buffers from active ports */
+        qemu_get_be32s(f, &nr_bufs);
+        if (!nr_bufs) {
+            continue;
+        }
+        for (; nr_bufs; nr_bufs--) {
+            size_t len;
 
+            qemu_get_be64s(f, &len);
+            buf = alloc_buf(len);
+
+            qemu_get_be64s(f, &buf->offset);
+            qemu_get_be32s(f, &buf->flags);
+            qemu_get_buffer(f, buf->buf, buf->len);
+            QTAILQ_INSERT_TAIL(&port->unflushed_buffers, buf, next);
+        }
     }
     return 0;
 }
@@ -499,6 +801,10 @@  static void virtser_bus_dev_print(Monitor *mon, DeviceState *qdev, int indent)
                    indent, "", port->guest_connected);
     monitor_printf(mon, "%*s dev-prop-int: host_connected: %d\n",
                    indent, "", port->host_connected);
+    monitor_printf(mon, "%*s dev-prop-int: host_throttled: %d\n",
+                   indent, "", port->host_throttled);
+    monitor_printf(mon, "%*s dev-prop-int: nr_bytes: %zu\n",
+                   indent, "", port->nr_bytes);
 }
 
 static int virtser_port_qdev_init(DeviceState *qdev, DeviceInfo *base)
@@ -529,6 +835,8 @@  static int virtser_port_qdev_init(DeviceState *qdev, DeviceInfo *base)
     if (ret) {
         return ret;
     }
+    QTAILQ_INIT(&port->unflushed_buffers);
+    pthread_mutex_init(&port->unflushed_buffers_lock, NULL);
 
     port->id = plugging_port0 ? 0 : port->vser->config.nr_ports++;
 
@@ -571,6 +879,9 @@  static int virtser_port_qdev_exit(DeviceState *qdev)
     if (port->info->exit)
         port->info->exit(dev);
 
+    remove_port_buffers(port);
+    pthread_mutex_destroy(&port->unflushed_buffers_lock);
+
     return 0;
 }
 
diff --git a/hw/virtio-serial.c b/hw/virtio-serial.c
index 2cc51c5..94eeed0 100644
--- a/hw/virtio-serial.c
+++ b/hw/virtio-serial.c
@@ -66,6 +66,13 @@  static int virtconsole_initfn(VirtIOSerialDevice *dev)
 
     port->info = dev->info;
 
+    /*
+     * We're not interested in data the guest sends while nothing is
+     * connected on the host side. Just ignore it instead of saving it
+     * for later consumption.
+     */
+    port->cache_buffers = 0;
+
     /* Tell the guest we're a console so it attaches us to an hvc console */
     port->is_console = true;
 
diff --git a/hw/virtio-serial.h b/hw/virtio-serial.h
index 5505841..a64ff4d 100644
--- a/hw/virtio-serial.h
+++ b/hw/virtio-serial.h
@@ -49,12 +49,18 @@  struct virtio_console_header {
     uint32_t flags;		/* Some message between host and guest */
 };
 
+/* Messages between host and guest */
+#define VIRTIO_CONSOLE_HDR_START_DATA	(1 << 0)
+#define VIRTIO_CONSOLE_HDR_END_DATA	(1 << 1)
+
 /* Some events for the internal messages (control packets) */
 #define VIRTIO_CONSOLE_PORT_READY	0
 #define VIRTIO_CONSOLE_CONSOLE_PORT	1
 #define VIRTIO_CONSOLE_RESIZE		2
 #define VIRTIO_CONSOLE_PORT_OPEN	3
 #define VIRTIO_CONSOLE_PORT_NAME	4
+#define VIRTIO_CONSOLE_THROTTLE_PORT	5
+#define VIRTIO_CONSOLE_CACHE_BUFFERS	6
 
 /* == In-qemu interface == */
 
@@ -96,6 +102,18 @@  struct VirtIOSerialPort {
     char *name;
 
     /*
+     * This list holds buffers pushed by the guest in case the guest
+     * sent incomplete messages or the host connection was down and
+     * the device requested to cache the data.
+     */
+    QTAILQ_HEAD(, VirtIOSerialPortBuffer) unflushed_buffers;
+
+    /*
+     * This lock protects the unflushed_buffer_head list
+     */
+    pthread_mutex_t unflushed_buffers_lock;
+
+    /*
      * This id helps identify ports between the guest and the host.
      * The guest sends a "header" with this id with each data packet
      * that it sends and the host can then find out which associated
@@ -103,6 +121,27 @@  struct VirtIOSerialPort {
      */
     uint32_t id;
 
+    /*
+     * Each port can specify the limit on number of bytes that can be
+     * outstanding in the unread buffers. This is to prevent any OOM
+     * situtation if a rogue process on the guest keeps injecting
+     * data.
+     */
+    size_t byte_limit;
+
+    /*
+     * The number of bytes we have queued up in our unread queue
+     */
+    size_t nr_bytes;
+
+    /*
+     * This boolean, when set, means "queue data that gets sent to
+     * this port when the host is not connected". The queued data, if
+     * any, is then sent out to the port when the host connection is
+     * opened.
+     */
+    uint8_t cache_buffers;
+
     /* Identify if this is a port that binds with hvc in the guest */
     uint8_t is_console;
 
@@ -110,6 +149,11 @@  struct VirtIOSerialPort {
     bool guest_connected;
     /* Is this device open for IO on the host? */
     bool host_connected;
+    /* Have we sent a throttle message to the guest? */
+    bool host_throttled;
+
+    /* Did this port get data in the recent handle_output call? */
+    bool has_activity;
 };
 
 struct VirtIOSerialPortInfo {