Patchwork [5/8] virtio-serial-bus: Add support for buffering guest output, throttling guests

login
register
mail settings
Submitter Amit Shah
Date Jan. 14, 2010, 1:17 p.m.
Message ID <1263475063-15238-6-git-send-email-amit.shah@redhat.com>
Download mbox | patch
Permalink /patch/42887/
State New
Headers show

Comments

Amit Shah - Jan. 14, 2010, 1:17 p.m.
We have to buffer data from guest as ports may not consume all the data
in one go or the guest could be fast in sending data and the apps may
not consume at the same rate.

We keep caching data the guest sends us till a port accepts it. However,
this could lead to an OOM condition where a rogue process on the guest
could continue pumping in data while the host continues to cache it.

We introduce a per-port byte-limit property to alleviate this condition.
When this limit is reached, we send a control message to the guest
indicating it to not send us any more data till further indication. When
the number of bytes cached go lesser than the limit specified, we tell
the guest to restart sending data.

Signed-off-by: Amit Shah <amit.shah@redhat.com>
---
 hw/virtio-serial-bus.c |  215 +++++++++++++++++++++++++++++++++++++++---------
 hw/virtio-serial.c     |    6 --
 hw/virtio-serial.h     |   30 ++++++-
 3 files changed, 203 insertions(+), 48 deletions(-)

Patch

diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
index 2576140..1ec67d2 100644
--- a/hw/virtio-serial-bus.c
+++ b/hw/virtio-serial-bus.c
@@ -44,6 +44,18 @@  struct VirtIOSerial {
     struct virtio_console_config config;
 };
 
+/* This struct holds individual buffers received for each port */
+typedef struct VirtIOSerialPortBuffer {
+    QTAILQ_ENTRY(VirtIOSerialPortBuffer) next;
+
+    uint8_t *buf;
+
+    size_t len; /* length of the buffer */
+    size_t offset; /* offset from which to consume data in the buffer */
+
+    bool previous_failure; /* Did sending out this buffer fail previously? */
+} VirtIOSerialPortBuffer;
+
 static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
 {
     VirtIOSerialPort *port;
@@ -75,11 +87,9 @@  static size_t write_to_port(VirtIOSerialPort *port,
                             const uint8_t *buf, size_t size)
 {
     VirtQueueElement elem;
-    struct virtio_console_header header;
     VirtQueue *vq;
     size_t offset = 0;
     size_t len = 0;
-    int header_len;
 
     vq = port->ivq;
     if (!virtio_queue_ready(vq)) {
@@ -88,8 +98,6 @@  static size_t write_to_port(VirtIOSerialPort *port,
     if (!size) {
         return 0;
     }
-    header.flags = 0;
-    header_len = use_multiport(port->vser) ? sizeof(header) : 0;
 
     while (offset < size) {
         int i;
@@ -97,26 +105,14 @@  static size_t write_to_port(VirtIOSerialPort *port,
         if (!virtqueue_pop(vq, &elem)) {
             break;
         }
-        if (elem.in_sg[0].iov_len < header_len) {
-            /* We can't even store our port number in this buffer. Bug? */
-            qemu_error("virtio-serial: size %zd less than expected\n",
-                    elem.in_sg[0].iov_len);
-            exit(1);
-        }
-        if (header_len) {
-            memcpy(elem.in_sg[0].iov_base, &header, header_len);
-        }
 
         for (i = 0; offset < size && i < elem.in_num; i++) {
-            /* Copy the header only in the first sg. */
-            len = MIN(elem.in_sg[i].iov_len - header_len, size - offset);
+            len = MIN(elem.in_sg[i].iov_len, size - offset);
 
-            memcpy(elem.in_sg[i].iov_base + header_len, buf + offset, len);
+            memcpy(elem.in_sg[i].iov_base, buf + offset, len);
             offset += len;
-            header_len = 0;
         }
-        header_len = use_multiport(port->vser) ? sizeof(header) : 0;
-        virtqueue_push(vq, &elem, len + header_len);
+        virtqueue_push(vq, &elem, len);
     }
 
     virtio_notify(&port->vser->vdev, vq);
@@ -157,6 +153,96 @@  static size_t send_control_event(VirtIOSerialPort *port, uint16_t event,
     return send_control_msg(port, &cpkt, sizeof(cpkt));
 }
 
+static void init_buf(VirtIOSerialPortBuffer *buf, uint8_t *buffer, size_t len)
+{
+    buf->buf = buffer;
+    buf->len = len;
+    buf->offset = 0;
+    buf->previous_failure = false;
+}
+
+static VirtIOSerialPortBuffer *alloc_buf(size_t len)
+{
+    VirtIOSerialPortBuffer *buf;
+
+    buf = qemu_malloc(sizeof(*buf));
+    buf->buf = qemu_malloc(len);
+
+    init_buf(buf, buf->buf, len);
+
+    return buf;
+}
+
+static void free_buf(VirtIOSerialPortBuffer *buf)
+{
+    qemu_free(buf->buf);
+    qemu_free(buf);
+}
+
+static void flush_queue(VirtIOSerialPort *port)
+{
+    VirtIOSerialPortBuffer *buf;
+    size_t out_size;
+    ssize_t ret;
+
+    while ((buf = QTAILQ_FIRST(&port->unflushed_buffers))) {
+        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
+
+        out_size = buf->len - buf->offset;
+        if (!port->host_connected) {
+            port->nr_bytes -= buf->len + buf->offset;
+            free_buf(buf);
+            continue;
+        }
+
+        ret = port->info->have_data(port, buf->buf + buf->offset, out_size);
+        if (ret < out_size) {
+            QTAILQ_INSERT_HEAD(&port->unflushed_buffers, buf, next);
+        }
+        if (ret <= 0) {
+            /* We're not progressing at all */
+            if (buf->previous_failure) {
+                break;
+            }
+            buf->previous_failure = true;
+        } else {
+            buf->offset += ret;
+            port->nr_bytes -= ret;
+            buf->previous_failure = false;
+        }
+        if (!(buf->len - buf->offset)) {
+            free_buf(buf);
+        }
+    }
+
+    if (port->host_throttled && port->nr_bytes < port->byte_limit) {
+        port->host_throttled = false;
+        send_control_event(port, VIRTIO_CONSOLE_THROTTLE_PORT, 0);
+    }
+}
+
+static void flush_all_ports(VirtIOSerial *vser)
+{
+    struct VirtIOSerialPort *port;
+
+    QTAILQ_FOREACH(port, &vser->ports, next) {
+        if (port->has_activity) {
+            port->has_activity = false;
+            flush_queue(port);
+        }
+    }
+}
+
+static void remove_port_buffers(VirtIOSerialPort *port)
+{
+    struct VirtIOSerialPortBuffer *buf, *buf2;
+
+    QTAILQ_FOREACH_SAFE(buf, &port->unflushed_buffers, next, buf2) {
+        QTAILQ_REMOVE(&port->unflushed_buffers, buf, next);
+        free_buf(buf);
+    }
+}
+
 /* Functions for use inside qemu to open and read from/write to ports */
 int virtio_serial_open(VirtIOSerialPort *port)
 {
@@ -176,6 +262,7 @@  int virtio_serial_close(VirtIOSerialPort *port)
     port->host_connected = false;
     send_control_event(port, VIRTIO_CONSOLE_PORT_OPEN, 0);
 
+    remove_port_buffers(port);
     return 0;
 }
 
@@ -196,10 +283,6 @@  ssize_t virtio_serial_write(VirtIOSerialPort *port, const uint8_t *buf,
 size_t virtio_serial_guest_ready(VirtIOSerialPort *port)
 {
     VirtQueue *vq = port->ivq;
-    size_t size, header_len;
-
-    header_len = use_multiport(port->vser)
-        ? sizeof(struct virtio_console_header) : 0;
 
     if (!virtio_queue_ready(vq) ||
         !(port->vser->vdev.status & VIRTIO_CONFIG_S_DRIVER_OK) ||
@@ -210,13 +293,11 @@  size_t virtio_serial_guest_ready(VirtIOSerialPort *port)
         return 0;
     }
 
-    size = 4096;
-    if (virtqueue_avail_bytes(vq, size, 0)) {
-        return size - header_len;
+    if (virtqueue_avail_bytes(vq, 4096, 0)) {
+        return 4096;
     }
-    size = header_len + 1;
-    if (virtqueue_avail_bytes(vq, size, 0)) {
-        return size - header_len;
+    if (virtqueue_avail_bytes(vq, 1, 0)) {
+        return 1;
     }
     return 0;
 }
@@ -315,6 +396,10 @@  static void control_out(VirtIODevice *vdev, VirtQueue *vq)
 
 /*
  * Guest wrote something to some port.
+ *
+ * Flush the data in the entire chunk that we received rather than
+ * splitting it into multiple buffers. VNC clients don't consume split
+ * buffers
  */
 static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
 {
@@ -325,18 +410,13 @@  static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
 
     while (virtqueue_pop(vq, &elem)) {
         VirtIOSerialPort *port;
-        struct virtio_console_header header;
-        int header_len;
-
-        header_len = use_multiport(vser) ? sizeof(header) : 0;
+        VirtIOSerialPortBuffer *buf;
 
-        if (elem.out_sg[0].iov_len < header_len) {
-            goto next_buf;
-        }
         port = find_port_by_vq(vser, vq);
         if (!port) {
             goto next_buf;
         }
+
         /*
          * A port may not have any handler registered for consuming the
          * data that the guest sends or it may not have a chardev associated
@@ -347,13 +427,24 @@  static void handle_output(VirtIODevice *vdev, VirtQueue *vq)
         }
 
         /* The guest always sends only one sg */
-        port->info->have_data(port, elem.out_sg[0].iov_base + header_len,
-                              elem.out_sg[0].iov_len - header_len);
+        buf = alloc_buf(elem.out_sg[0].iov_len);
+        memcpy(buf->buf, elem.out_sg[0].iov_base, buf->len);
+
+        QTAILQ_INSERT_TAIL(&port->unflushed_buffers, buf, next);
+        port->nr_bytes += buf->len;
+        port->has_activity = true;
+
+        if (!port->host_throttled && port->byte_limit &&
+            port->nr_bytes >= port->byte_limit) {
 
+            port->host_throttled = true;
+            send_control_event(port, VIRTIO_CONSOLE_THROTTLE_PORT, 1);
+        }
     next_buf:
         virtqueue_push(vq, &elem, elem.out_sg[0].iov_len);
     }
     virtio_notify(vdev, vq);
+    flush_all_ports(vser);
 }
 
 static void handle_input(VirtIODevice *vdev, VirtQueue *vq)
@@ -388,6 +479,7 @@  static void virtio_serial_save(QEMUFile *f, void *opaque)
     VirtIOSerial *s = opaque;
     VirtIOSerialPort *port;
     uint32_t nr_active_ports;
+    unsigned int nr_bufs;
 
     /* The virtio device */
     virtio_save(&s->vdev, f);
@@ -410,14 +502,34 @@  static void virtio_serial_save(QEMUFile *f, void *opaque)
      * Items in struct VirtIOSerialPort.
      */
     QTAILQ_FOREACH(port, &s->ports, next) {
+        VirtIOSerialPortBuffer *buf;
+
         /*
          * We put the port number because we may not have an active
          * port at id 0 that's reserved for a console port, or in case
          * of ports that might have gotten unplugged
          */
         qemu_put_be32s(f, &port->id);
+        qemu_put_be64s(f, &port->byte_limit);
+        qemu_put_be64s(f, &port->nr_bytes);
         qemu_put_byte(f, port->guest_connected);
+        qemu_put_byte(f, port->host_throttled);
 
+        /* All the pending buffers from active ports */
+        nr_bufs = 0;
+        QTAILQ_FOREACH(buf, &port->unflushed_buffers, next) {
+            nr_bufs++;
+        }
+        qemu_put_be32s(f, &nr_bufs);
+        if (!nr_bufs) {
+            continue;
+        }
+
+        QTAILQ_FOREACH(buf, &port->unflushed_buffers, next) {
+            qemu_put_be64s(f, &buf->len);
+            qemu_put_be64s(f, &buf->offset);
+            qemu_put_buffer(f, buf->buf, buf->len);
+        }
     }
 }
 
@@ -450,13 +562,33 @@  static int virtio_serial_load(QEMUFile *f, void *opaque, int version_id)
 
     /* Items in struct VirtIOSerialPort */
     for (i = 0; i < nr_active_ports; i++) {
+        VirtIOSerialPortBuffer *buf;
         uint32_t id;
+        unsigned int nr_bufs;
 
         id = qemu_get_be32(f);
         port = find_port_by_id(s, id);
 
+        port->byte_limit = qemu_get_be64(f);
+        port->nr_bytes   = qemu_get_be64(f);
         port->guest_connected = qemu_get_byte(f);
+        port->host_throttled = qemu_get_byte(f);
+
+        /* All the pending buffers from active ports */
+        qemu_get_be32s(f, &nr_bufs);
+        if (!nr_bufs) {
+            continue;
+        }
+        for (; nr_bufs; nr_bufs--) {
+            size_t len;
 
+            qemu_get_be64s(f, &len);
+            buf = alloc_buf(len);
+
+            qemu_get_be64s(f, &buf->offset);
+            qemu_get_buffer(f, buf->buf, buf->len);
+            QTAILQ_INSERT_TAIL(&port->unflushed_buffers, buf, next);
+        }
     }
 
     return 0;
@@ -492,6 +624,10 @@  static void virtser_bus_dev_print(Monitor *mon, DeviceState *qdev, int indent)
                    indent, "", port->guest_connected);
     monitor_printf(mon, "%*s dev-prop-int: host_connected: %d\n",
                    indent, "", port->host_connected);
+    monitor_printf(mon, "%*s dev-prop-int: host_throttled: %d\n",
+                   indent, "", port->host_throttled);
+    monitor_printf(mon, "%*s dev-prop-int: nr_bytes: %zu\n",
+                   indent, "", port->nr_bytes);
 }
 
 static int virtser_port_qdev_init(DeviceState *qdev, DeviceInfo *base)
@@ -522,6 +658,7 @@  static int virtser_port_qdev_init(DeviceState *qdev, DeviceInfo *base)
     if (ret) {
         return ret;
     }
+    QTAILQ_INIT(&port->unflushed_buffers);
 
     port->id = plugging_port0 ? 0 : port->vser->config.nr_ports++;
 
@@ -572,6 +709,8 @@  static int virtser_port_qdev_exit(DeviceState *qdev)
     if (port->info->exit)
         port->info->exit(dev);
 
+    remove_port_buffers(port);
+
     return 0;
 }
 
diff --git a/hw/virtio-serial.c b/hw/virtio-serial.c
index 470446b..9bcf461 100644
--- a/hw/virtio-serial.c
+++ b/hw/virtio-serial.c
@@ -68,12 +68,6 @@  static int virtconsole_initfn(VirtIOSerialDevice *dev)
 
     port->is_console = true;
 
-    /*
-     * For console ports, just assume the guest is ready to accept our
-     * data.
-     */
-    port->guest_connected = true;
-
     if (vcon->chr) {
         qemu_chr_add_handlers(vcon->chr, chr_can_read, chr_read, chr_event,
                               vcon);
diff --git a/hw/virtio-serial.h b/hw/virtio-serial.h
index b669c7f..ea87b7d 100644
--- a/hw/virtio-serial.h
+++ b/hw/virtio-serial.h
@@ -45,16 +45,13 @@  struct virtio_console_control {
     uint16_t value;		/* Extra information for the key */
 };
 
-struct virtio_console_header {
-    uint32_t flags;		/* Some message between host and guest */
-};
-
 /* Some events for the internal messages (control packets) */
 #define VIRTIO_CONSOLE_PORT_READY	0
 #define VIRTIO_CONSOLE_CONSOLE_PORT	1
 #define VIRTIO_CONSOLE_RESIZE		2
 #define VIRTIO_CONSOLE_PORT_OPEN	3
 #define VIRTIO_CONSOLE_PORT_NAME	4
+#define VIRTIO_CONSOLE_THROTTLE_PORT	5
 
 /* == In-qemu interface == */
 
@@ -96,6 +93,13 @@  struct VirtIOSerialPort {
     char *name;
 
     /*
+     * This list holds buffers pushed by the guest in case the guest
+     * sent incomplete messages or the host connection was down and
+     * the device requested to cache the data.
+     */
+    QTAILQ_HEAD(, VirtIOSerialPortBuffer) unflushed_buffers;
+
+    /*
      * This id helps identify ports between the guest and the host.
      * The guest sends a "header" with this id with each data packet
      * that it sends and the host can then find out which associated
@@ -103,6 +107,19 @@  struct VirtIOSerialPort {
      */
     uint32_t id;
 
+    /*
+     * Each port can specify the limit on number of bytes that can be
+     * outstanding in the unread buffers. This is to prevent any OOM
+     * situtation if a rogue process on the guest keeps injecting
+     * data.
+     */
+    size_t byte_limit;
+
+    /*
+     * The number of bytes we have queued up in our unread queue
+     */
+    size_t nr_bytes;
+
     /* Identify if this is a port that binds with hvc in the guest */
     uint8_t is_console;
 
@@ -110,6 +127,11 @@  struct VirtIOSerialPort {
     bool guest_connected;
     /* Is this device open for IO on the host? */
     bool host_connected;
+    /* Have we sent a throttle message to the guest? */
+    bool host_throttled;
+
+    /* Did this port get data in the recent handle_output call? */
+    bool has_activity;
 };
 
 struct VirtIOSerialPortInfo {