diff mbox

vhost: locking/rcu cleanup

Message ID 20100729122325.GA24337@redhat.com
State Not Applicable, archived
Headers show

Commit Message

Michael S. Tsirkin July 29, 2010, 12:23 p.m. UTC
I saw WARN_ON(!list_empty(&dev->work_list)) trigger
so our custom flush is not as airtight as need be.

This patch switches to a simple atomic counter + srcu instead of
the custom locked queue + flush implementation.

This will slow down the setup ioctls, which should not matter -
it's slow path anyway. We use the expedited flush to at least
make sure it has a sane time bound.

Works fine for me. I got reports that with many guests,
work lock is highly contended, and this patch should in theory
fix this as well - but I haven't tested this yet.

Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
---
 drivers/vhost/net.c   |   55 +++++--------------
 drivers/vhost/vhost.c |  140 ++++++++++++++++++++++---------------------------
 drivers/vhost/vhost.h |   47 +++++++++-------
 3 files changed, 103 insertions(+), 139 deletions(-)

Comments

Tejun Heo July 30, 2010, 2:49 p.m. UTC | #1
Hello,

On 07/29/2010 02:23 PM, Michael S. Tsirkin wrote:
> I saw WARN_ON(!list_empty(&dev->work_list)) trigger
> so our custom flush is not as airtight as need be.

Could be but it's also possible that something has queued something
after the last flush?  Is the problem reproducible?

> This patch switches to a simple atomic counter + srcu instead of
> the custom locked queue + flush implementation.
> 
> This will slow down the setup ioctls, which should not matter -
> it's slow path anyway. We use the expedited flush to at least
> make sure it has a sane time bound.
> 
> Works fine for me. I got reports that with many guests,
> work lock is highly contended, and this patch should in theory
> fix this as well - but I haven't tested this yet.

Hmmm... vhost_poll_flush() becomes synchronize_srcu_expedited().  Can
you please explain how it works?  synchronize_srcu_expedited() is an
extremely heavy operation involving scheduling the cpu_stop task on
all cpus.  I'm not quite sure whether doing it from every flush is a
good idea.  Is flush supposed to be a very rare operation?

Having custom implementation is fine too but let's try to implement
something generic if at all possible.

Thanks.
Michael S. Tsirkin Aug. 1, 2010, 8:41 a.m. UTC | #2
On Fri, Jul 30, 2010 at 04:49:54PM +0200, Tejun Heo wrote:
> Hello,
> 
> On 07/29/2010 02:23 PM, Michael S. Tsirkin wrote:
> > I saw WARN_ON(!list_empty(&dev->work_list)) trigger
> > so our custom flush is not as airtight as need be.
> 
> Could be but it's also possible that something has queued something
> after the last flush?
> Is the problem reproducible?

Well, We do requeue from the job itself. So need to be careful with what
we do with indexes here. Bug seemed to happen all the time when qemu was
killed under stress but now I can't reproduce anymore :(
Will try again later.

> > This patch switches to a simple atomic counter + srcu instead of
> > the custom locked queue + flush implementation.
> > 
> > This will slow down the setup ioctls, which should not matter -
> > it's slow path anyway. We use the expedited flush to at least
> > make sure it has a sane time bound.
> > 
> > Works fine for me. I got reports that with many guests,
> > work lock is highly contended, and this patch should in theory
> > fix this as well - but I haven't tested this yet.
> 
> Hmmm... vhost_poll_flush() becomes synchronize_srcu_expedited().  Can
> you please explain how it works?  synchronize_srcu_expedited() is an
> extremely heavy operation involving scheduling the cpu_stop task on
> all cpus.  I'm not quite sure whether doing it from every flush is a
> good idea.  Is flush supposed to be a very rare operation?

It is rare - happens on guest reboot typically. I guess I will
switch to regular synchronize_srcu.

> Having custom implementation is fine too but let's try to implement
> something generic if at all possible.
> 
> Thanks.

Sure. It does seem that avoiding list lock would be pretty hard
in generic code though.

> -- 
> tejun
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index f13e56b..ee69c51 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -111,8 +111,9 @@  static void tx_poll_start(struct vhost_net *net, struct socket *sock)
 
 /* Expects to be always run from workqueue - which acts as
  * read-size critical section for our kind of RCU. */
-static void handle_tx(struct vhost_net *net)
+static void handle_tx(struct vhost_dev *dev)
 {
+	struct vhost_net *net = container_of(dev, struct vhost_net, dev);
 	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX];
 	unsigned out, in, s;
 	int head;
@@ -127,7 +128,7 @@  static void handle_tx(struct vhost_net *net)
 	size_t len, total_len = 0;
 	int err, wmem;
 	size_t hdr_size;
-	struct socket *sock = rcu_dereference(vq->private_data);
+	struct socket *sock = vhost_vq_data(vq, &net->dev);
 	if (!sock)
 		return;
 
@@ -305,7 +306,7 @@  static void handle_rx_big(struct vhost_net *net)
 	size_t len, total_len = 0;
 	int err;
 	size_t hdr_size;
-	struct socket *sock = rcu_dereference(vq->private_data);
+	struct socket *sock = vhost_vq_data(vq, &net->dev);
 	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
 		return;
 
@@ -416,7 +417,7 @@  static void handle_rx_mergeable(struct vhost_net *net)
 	int err, headcount;
 	size_t vhost_hlen, sock_hlen;
 	size_t vhost_len, sock_len;
-	struct socket *sock = rcu_dereference(vq->private_data);
+	struct socket *sock = vhost_vq_data(vq, &net->dev);
 	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
 		return;
 
@@ -500,46 +501,15 @@  static void handle_rx_mergeable(struct vhost_net *net)
 	unuse_mm(net->dev.mm);
 }
 
-static void handle_rx(struct vhost_net *net)
+static void handle_rx(struct vhost_dev *dev)
 {
+	struct vhost_net *net = container_of(dev, struct vhost_net, dev);
 	if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF))
 		handle_rx_mergeable(net);
 	else
 		handle_rx_big(net);
 }
 
-static void handle_tx_kick(struct vhost_work *work)
-{
-	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
-						  poll.work);
-	struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev);
-
-	handle_tx(net);
-}
-
-static void handle_rx_kick(struct vhost_work *work)
-{
-	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
-						  poll.work);
-	struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev);
-
-	handle_rx(net);
-}
-
-static void handle_tx_net(struct vhost_work *work)
-{
-	struct vhost_net *net = container_of(work, struct vhost_net,
-					     poll[VHOST_NET_VQ_TX].work);
-	handle_tx(net);
-}
-
-static void handle_rx_net(struct vhost_work *work)
-{
-	struct vhost_net *net = container_of(work, struct vhost_net,
-					     poll[VHOST_NET_VQ_RX].work);
-	handle_rx(net);
-}
-
 static int vhost_net_open(struct inode *inode, struct file *f)
 {
 	struct vhost_net *n = kmalloc(sizeof *n, GFP_KERNEL);
@@ -550,16 +520,18 @@  static int vhost_net_open(struct inode *inode, struct file *f)
 		return -ENOMEM;
 
 	dev = &n->dev;
-	n->vqs[VHOST_NET_VQ_TX].handle_kick = handle_tx_kick;
-	n->vqs[VHOST_NET_VQ_RX].handle_kick = handle_rx_kick;
+	vhost_work_set_fn(&n->vqs[VHOST_NET_VQ_TX].work, handle_tx);
+	vhost_work_set_fn(&n->vqs[VHOST_NET_VQ_RX].work, handle_rx);
 	r = vhost_dev_init(dev, n->vqs, VHOST_NET_VQ_MAX);
 	if (r < 0) {
 		kfree(n);
 		return r;
 	}
 
-	vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT, dev);
-	vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN, dev);
+	vhost_poll_init(n->poll + VHOST_NET_VQ_TX,
+			&n->vqs[VHOST_NET_VQ_TX].work, POLLOUT, dev);
+	vhost_poll_init(n->poll + VHOST_NET_VQ_RX,
+			&n->vqs[VHOST_NET_VQ_RX].work, POLLIN, dev);
 	n->tx_poll_state = VHOST_NET_POLL_DISABLED;
 
 	f->private_data = n;
@@ -640,6 +612,7 @@  static int vhost_net_release(struct inode *inode, struct file *f)
 	/* We do an extra flush before freeing memory,
 	 * since jobs can re-queue themselves. */
 	vhost_net_flush(n);
+	vhost_dev_free(&n->dev);
 	kfree(n);
 	return 0;
 }
diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c
index e05557d..daa95c8 100644
--- a/drivers/vhost/vhost.c
+++ b/drivers/vhost/vhost.c
@@ -60,22 +60,27 @@  static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync,
 	return 0;
 }
 
+/* Must be called for each vq before vhost_dev_init. */
+void vhost_work_set_fn(struct vhost_work *work, vhost_work_fn_t fn)
+{
+	work->fn = fn;
+}
+
+static void vhost_work_init(struct vhost_work *work)
+{
+	atomic_set(&work->queue_seq, 0);
+	work->done_seq = 0;
+}
+
 /* Init poll structure */
-void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn,
+void vhost_poll_init(struct vhost_poll *poll, struct vhost_work *work,
 		     unsigned long mask, struct vhost_dev *dev)
 {
-	struct vhost_work *work = &poll->work;
-
+	poll->work = work;
 	init_waitqueue_func_entry(&poll->wait, vhost_poll_wakeup);
 	init_poll_funcptr(&poll->table, vhost_poll_func);
 	poll->mask = mask;
 	poll->dev = dev;
-
-	INIT_LIST_HEAD(&work->node);
-	work->fn = fn;
-	init_waitqueue_head(&work->done);
-	work->flushing = 0;
-	work->queue_seq = work->done_seq = 0;
 }
 
 /* Start polling a file. We add ourselves to file's wait queue. The caller must
@@ -99,40 +104,16 @@  void vhost_poll_stop(struct vhost_poll *poll)
  * locks that are also used by the callback. */
 void vhost_poll_flush(struct vhost_poll *poll)
 {
-	struct vhost_work *work = &poll->work;
-	unsigned seq;
-	int left;
-	int flushing;
-
-	spin_lock_irq(&poll->dev->work_lock);
-	seq = work->queue_seq;
-	work->flushing++;
-	spin_unlock_irq(&poll->dev->work_lock);
-	wait_event(work->done, ({
-		   spin_lock_irq(&poll->dev->work_lock);
-		   left = seq - work->done_seq <= 0;
-		   spin_unlock_irq(&poll->dev->work_lock);
-		   left;
-	}));
-	spin_lock_irq(&poll->dev->work_lock);
-	flushing = --work->flushing;
-	spin_unlock_irq(&poll->dev->work_lock);
-	BUG_ON(flushing < 0);
+	synchronize_srcu_expedited(&poll->dev->worker_srcu);
 }
 
 void vhost_poll_queue(struct vhost_poll *poll)
 {
 	struct vhost_dev *dev = poll->dev;
-	struct vhost_work *work = &poll->work;
-	unsigned long flags;
-
-	spin_lock_irqsave(&dev->work_lock, flags);
-	if (list_empty(&work->node)) {
-		list_add_tail(&work->node, &dev->work_list);
-		work->queue_seq++;
-		wake_up_process(dev->worker);
-	}
-	spin_unlock_irqrestore(&dev->work_lock, flags);
+	struct vhost_work *work = poll->work;
+
+	atomic_inc(&work->queue_seq);
+	wake_up_process(dev->worker);
 }
 
 static void vhost_vq_reset(struct vhost_dev *dev,
@@ -164,41 +145,39 @@  static void vhost_vq_reset(struct vhost_dev *dev,
 static int vhost_worker(void *data)
 {
 	struct vhost_dev *dev = data;
-	struct vhost_work *work = NULL;
-	unsigned uninitialized_var(seq);
+	struct vhost_work *uninitialized_var(work);
+	unsigned n, i, vq = 0;
+	int seq;
 
-	for (;;) {
-		/* mb paired w/ kthread_stop */
-		set_current_state(TASK_INTERRUPTIBLE);
+	n = dev->nvqs;
+repeat:
+	set_current_state(TASK_INTERRUPTIBLE);	/* mb paired w/ kthread_stop */
 
-		spin_lock_irq(&dev->work_lock);
-		if (work) {
-			work->done_seq = seq;
-			if (work->flushing)
-				wake_up_all(&work->done);
-		}
+	if (kthread_should_stop()) {
+		__set_current_state(TASK_RUNNING);
+		return 0;
+	}
 
-		if (kthread_should_stop()) {
-			spin_unlock_irq(&dev->work_lock);
-			__set_current_state(TASK_RUNNING);
-			return 0;
+	for (i = 0; i < n; ++i) {
+		work = &dev->vqs[(vq + i) % n].work;
+		seq = atomic_read(&work->queue_seq);
+		if (seq != work->done_seq) {
+			work->done_seq = seq;
+			break;
 		}
-		if (!list_empty(&dev->work_list)) {
-			work = list_first_entry(&dev->work_list,
-						struct vhost_work, node);
-			list_del_init(&work->node);
-			seq = work->queue_seq;
-		} else
-			work = NULL;
-		spin_unlock_irq(&dev->work_lock);
+		work = NULL;
+	}
 
-		if (work) {
-			__set_current_state(TASK_RUNNING);
-			work->fn(work);
-		} else
-			schedule();
+	if (work) {
+		int idx;
+		__set_current_state(TASK_RUNNING);
+		idx = srcu_read_lock(&dev->worker_srcu);
+		work->fn(dev);
+		srcu_read_unlock(&dev->worker_srcu, idx);
+	} else
+		schedule();
 
-	}
+	goto repeat;
 }
 
 long vhost_dev_init(struct vhost_dev *dev,
@@ -213,20 +192,22 @@  long vhost_dev_init(struct vhost_dev *dev,
 	dev->log_file = NULL;
 	dev->memory = NULL;
 	dev->mm = NULL;
-	spin_lock_init(&dev->work_lock);
-	INIT_LIST_HEAD(&dev->work_list);
 	dev->worker = NULL;
 
 	for (i = 0; i < dev->nvqs; ++i) {
 		dev->vqs[i].dev = dev;
 		mutex_init(&dev->vqs[i].mutex);
 		vhost_vq_reset(dev, dev->vqs + i);
-		if (dev->vqs[i].handle_kick)
+		if (dev->vqs[i].work.fn)
 			vhost_poll_init(&dev->vqs[i].poll,
-					dev->vqs[i].handle_kick, POLLIN, dev);
+					&dev->vqs[i].work, POLLIN, dev);
 	}
+	return init_srcu_struct(&dev->worker_srcu);
+}
 
-	return 0;
+void vhost_dev_free(struct vhost_dev *dev)
+{
+	cleanup_srcu_struct(&dev->worker_srcu);
 }
 
 /* Caller should have device mutex */
@@ -240,7 +221,7 @@  long vhost_dev_check_owner(struct vhost_dev *dev)
 static long vhost_dev_set_owner(struct vhost_dev *dev)
 {
 	struct task_struct *worker;
-	int err;
+	int i, err;
 	/* Is there an owner already? */
 	if (dev->mm) {
 		err = -EBUSY;
@@ -258,6 +239,10 @@  static long vhost_dev_set_owner(struct vhost_dev *dev)
 	err = cgroup_attach_task_current_cg(worker);
 	if (err)
 		goto err_cgroup;
+
+	for (i = 0; i < dev->nvqs; ++i) {
+		vhost_work_init(&dev->vqs[i].work);
+	}
 	wake_up_process(worker);	/* avoid contributing to loadavg */
 
 	return 0;
@@ -293,7 +278,7 @@  void vhost_dev_cleanup(struct vhost_dev *dev)
 {
 	int i;
 	for (i = 0; i < dev->nvqs; ++i) {
-		if (dev->vqs[i].kick && dev->vqs[i].handle_kick) {
+		if (dev->vqs[i].kick && dev->vqs[i].work.fn) {
 			vhost_poll_stop(&dev->vqs[i].poll);
 			vhost_poll_flush(&dev->vqs[i].poll);
 		}
@@ -322,7 +307,6 @@  void vhost_dev_cleanup(struct vhost_dev *dev)
 		mmput(dev->mm);
 	dev->mm = NULL;
 
-	WARN_ON(!list_empty(&dev->work_list));
 	kthread_stop(dev->worker);
 }
 
@@ -644,7 +628,7 @@  static long vhost_set_vring(struct vhost_dev *d, int ioctl, void __user *argp)
 		r = -ENOIOCTLCMD;
 	}
 
-	if (pollstop && vq->handle_kick)
+	if (pollstop && vq->work.fn)
 		vhost_poll_stop(&vq->poll);
 
 	if (ctx)
@@ -652,12 +636,12 @@  static long vhost_set_vring(struct vhost_dev *d, int ioctl, void __user *argp)
 	if (filep)
 		fput(filep);
 
-	if (pollstart && vq->handle_kick)
+	if (pollstart && vq->work.fn)
 		vhost_poll_start(&vq->poll, vq->kick);
 
 	mutex_unlock(&vq->mutex);
 
-	if (pollstop && vq->handle_kick)
+	if (pollstop && vq->work.fn)
 		vhost_poll_flush(&vq->poll);
 	return r;
 }
diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
index afd7729..9c990ea 100644
--- a/drivers/vhost/vhost.h
+++ b/drivers/vhost/vhost.h
@@ -11,9 +11,10 @@ 
 #include <linux/uio.h>
 #include <linux/virtio_config.h>
 #include <linux/virtio_ring.h>
+#include <linux/srcu.h>
 #include <asm/atomic.h>
 
-struct vhost_device;
+struct vhost_dev;
 
 enum {
 	/* Enough place for all fragments, head, and virtio net header. */
@@ -21,29 +22,33 @@  enum {
 };
 
 struct vhost_work;
-typedef void (*vhost_work_fn_t)(struct vhost_work *work);
+typedef void (*vhost_work_fn_t)(struct vhost_dev *dev);
 
 struct vhost_work {
-	struct list_head	  node;
+	/* Callback function to execute. */
 	vhost_work_fn_t		  fn;
-	wait_queue_head_t	  done;
-	int			  flushing;
-	unsigned		  queue_seq;
-	unsigned		  done_seq;
+	/* Incremented to request callback execution.
+	 * Atomic to allow multiple writers. */
+	atomic_t		  queue_seq;
+	/* Used by worker to track execution requests.
+	 * Used from a single thread so no locking. */
+	int			  done_seq;
 };
 
+void vhost_work_set_fn(struct vhost_work *work, vhost_work_fn_t fn);
+
 /* Poll a file (eventfd or socket) */
 /* Note: there's nothing vhost specific about this structure. */
 struct vhost_poll {
 	poll_table                table;
 	wait_queue_head_t        *wqh;
 	wait_queue_t              wait;
-	struct vhost_work	  work;
+	struct vhost_work	 *work;
 	unsigned long		  mask;
 	struct vhost_dev	 *dev;
 };
 
-void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn,
+void vhost_poll_init(struct vhost_poll *poll, struct vhost_work* work,
 		     unsigned long mask, struct vhost_dev *dev);
 void vhost_poll_start(struct vhost_poll *poll, struct file *file);
 void vhost_poll_stop(struct vhost_poll *poll);
@@ -72,11 +77,12 @@  struct vhost_virtqueue {
 	struct eventfd_ctx *error_ctx;
 	struct eventfd_ctx *log_ctx;
 
+	/* The work to execute when the Guest kicks us,
+	 * on Host activity, or timeout. */
+	struct vhost_work work;
+	/* Poll Guest for kicks */
 	struct vhost_poll poll;
 
-	/* The routine to call when the Guest pings us, or timeout. */
-	vhost_work_fn_t handle_kick;
-
 	/* Last available index we saw. */
 	u16 last_avail_idx;
 
@@ -99,12 +105,7 @@  struct vhost_virtqueue {
 	size_t vhost_hlen;
 	size_t sock_hlen;
 	struct vring_used_elem heads[VHOST_NET_MAX_SG];
-	/* We use a kind of RCU to access private pointer.
-	 * All readers access it from worker, which makes it possible to
-	 * flush the vhost_work instead of synchronize_rcu. Therefore readers do
-	 * not need to call rcu_read_lock/rcu_read_unlock: the beginning of
-	 * vhost_work execution acts instead of rcu_read_lock() and the end of
-	 * vhost_work execution acts instead of rcu_read_lock().
+	/* Readers use worker_srcu in device to access private pointer.
 	 * Writers use virtqueue mutex. */
 	void *private_data;
 	/* Log write descriptors */
@@ -112,6 +113,12 @@  struct vhost_virtqueue {
 	struct vhost_log log[VHOST_NET_MAX_SG];
 };
 
+static inline void *vhost_vq_data(struct vhost_virtqueue *vq,
+				  struct vhost_dev *dev)
+{
+	return srcu_dereference(vq->private_data, &dev->worker_srcu);
+}
+
 struct vhost_dev {
 	/* Readers use RCU to access memory table pointer
 	 * log base pointer and features.
@@ -124,12 +131,12 @@  struct vhost_dev {
 	int nvqs;
 	struct file *log_file;
 	struct eventfd_ctx *log_ctx;
-	spinlock_t work_lock;
-	struct list_head work_list;
 	struct task_struct *worker;
+	struct srcu_struct worker_srcu;
 };
 
 long vhost_dev_init(struct vhost_dev *, struct vhost_virtqueue *vqs, int nvqs);
+void vhost_dev_free(struct vhost_dev *);
 long vhost_dev_check_owner(struct vhost_dev *);
 long vhost_dev_reset_owner(struct vhost_dev *);
 void vhost_dev_cleanup(struct vhost_dev *);