diff mbox

[v3] Add Mergeable receive buffer support to vhost_net

Message ID 1270585973.28407.3.camel@lab1.dls
State Not Applicable, archived
Delegated to: David Miller
Headers show

Commit Message

David Stevens April 6, 2010, 8:32 p.m. UTC
This patch adds support for the Mergeable Receive Buffers feature to
vhost_net.

						+-DLS

Changes from previous revision:
1) renamed:
	vhost_discard_vq_desc -> vhost_discard_desc
	vhost_get_heads -> vhost_get_desc_n
	vhost_get_vq_desc -> vhost_get_desc
2) added heads as argument to ghost_get_desc_n
3) changed "vq->heads" from iovec to vring_used_elem, removed casts
4) changed vhost_add_used to do multiple elements in a single
copy_to_user,
	or two when we wrap the ring.
5) removed rxmaxheadcount and available buffer checks in favor of
running until
	an allocation failure, but making sure we break the loop if we get
	two in a row, indicating we have at least 1 buffer, but not enough
	for the current receive packet
6) restore non-vnet header handling

Signed-Off-By: David L Stevens <dlstevens@us.ibm.com>

diff -ruNp net-next-p0/drivers/vhost/vhost.c
net-next-v3/drivers/vhost/vhost.c
--- net-next-p0/drivers/vhost/vhost.c	2010-03-22 12:04:38.000000000
-0700
+++ net-next-v3/drivers/vhost/vhost.c	2010-04-06 12:57:51.000000000
-0700
@@ -856,6 +856,47 @@ static unsigned get_indirect(struct vhos
 	return 0;
 }
 
+/* This is a multi-buffer version of vhost_get_vq_desc
+ * @vq		- the relevant virtqueue
+ * datalen	- data length we'll be reading
+ * @iovcount	- returned count of io vectors we fill
+ * @log		- vhost log
+ * @log_num	- log offset
+ *	returns number of buffer heads allocated, 0 on error
+ */
+int vhost_get_desc_n(struct vhost_virtqueue *vq, struct vring_used_elem
*heads,
+		     int datalen, int *iovcount, struct vhost_log *log,
+		     unsigned int *log_num)
+{
+	int out, in;
+	int seg = 0;		/* iov index */
+	int hc = 0;		/* head count */
+
+	while (datalen > 0) {
+		if (hc >= VHOST_NET_MAX_SG)
+			goto err;
+		heads[hc].id = vhost_get_desc(vq->dev, vq, vq->iov+seg,
+					      ARRAY_SIZE(vq->iov)-seg, &out,
+					      &in, log, log_num);
+		if (heads[hc].id == vq->num)
+			goto err;
+		if (out || in <= 0) {
+			vq_err(vq, "unexpected descriptor format for RX: "
+				"out %d, in %d\n", out, in);
+			goto err;
+		}
+		heads[hc].len = iov_length(vq->iov+seg, in);
+		datalen -= heads[hc].len;
+		hc++;
+		seg += in;
+	}
+	*iovcount = seg;
+	return hc;
+err:
+	vhost_discard_desc(vq, hc);
+	return 0;
+}
+
 /* This looks in the virtqueue and for the first available buffer, and
converts
  * it to an iovec for convenient access.  Since descriptors consist of
some
  * number of output then some number of input descriptors, it's
actually two
@@ -863,7 +904,7 @@ static unsigned get_indirect(struct vhos
  *
  * This function returns the descriptor number found, or vq->num (which
  * is never a valid descriptor number) if none was found. */
-unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct
vhost_virtqueue *vq,
+unsigned vhost_get_desc(struct vhost_dev *dev, struct vhost_virtqueue
*vq,
 			   struct iovec iov[], unsigned int iov_size,
 			   unsigned int *out_num, unsigned int *in_num,
 			   struct vhost_log *log, unsigned int *log_num)
@@ -981,31 +1022,42 @@ unsigned vhost_get_vq_desc(struct vhost_
 }
 
 /* Reverse the effect of vhost_get_vq_desc. Useful for error handling.
*/
-void vhost_discard_vq_desc(struct vhost_virtqueue *vq)
+void vhost_discard_desc(struct vhost_virtqueue *vq, int n)
 {
-	vq->last_avail_idx--;
+	vq->last_avail_idx -= n;
 }
 
 /* After we've used one of their buffers, we tell them about it.  We'll
then
  * want to notify the guest, using eventfd. */
-int vhost_add_used(struct vhost_virtqueue *vq, unsigned int head, int
len)
+int vhost_add_used(struct vhost_virtqueue *vq, struct vring_used_elem
*heads,
+		   int count)
 {
 	struct vring_used_elem *used;
+	int start, n;
+
+	if (count <= 0)
+		return -EINVAL;
 
-	/* The virtqueue contains a ring of used buffers.  Get a pointer to
the
-	 * next entry in that used ring. */
-	used = &vq->used->ring[vq->last_used_idx % vq->num];
-	if (put_user(head, &used->id)) {
-		vq_err(vq, "Failed to write used id");
+	start = vq->last_used_idx % vq->num;
+	if (vq->num - start < count)
+		n = vq->num - start;
+	else
+		n = count;
+	used = vq->used->ring + start;
+	if (copy_to_user(used, heads, sizeof(heads[0])*n)) {
+		vq_err(vq, "Failed to write used");
 		return -EFAULT;
 	}
-	if (put_user(len, &used->len)) {
-		vq_err(vq, "Failed to write used len");
-		return -EFAULT;
+	if (n < count) {	/* wrapped the ring */
+		used = vq->used->ring;
+		if (copy_to_user(used, heads+n, sizeof(heads[0])*(count-n))) {
+			vq_err(vq, "Failed to write used");
+			return -EFAULT;
+		}
 	}
 	/* Make sure buffer is written before we update index. */
 	smp_wmb();
-	if (put_user(vq->last_used_idx + 1, &vq->used->idx)) {
+	if (put_user(vq->last_used_idx+count, &vq->used->idx)) {
 		vq_err(vq, "Failed to increment used idx");
 		return -EFAULT;
 	}
@@ -1023,7 +1075,7 @@ int vhost_add_used(struct vhost_virtqueu
 		if (vq->log_ctx)
 			eventfd_signal(vq->log_ctx, 1);
 	}
-	vq->last_used_idx++;
+	vq->last_used_idx += count;
 	return 0;
 }
 
@@ -1049,10 +1101,23 @@ void vhost_signal(struct vhost_dev *dev,
 
 /* And here's the combo meal deal.  Supersize me! */
 void vhost_add_used_and_signal(struct vhost_dev *dev,
-			       struct vhost_virtqueue *vq,
-			       unsigned int head, int len)
+			       struct vhost_virtqueue *vq, unsigned int id,
+			       int len)
+{
+	struct vring_used_elem head;
+
+	head.id = id;
+	head.len = len;
+	vhost_add_used(vq, &head, 1);
+	vhost_signal(dev, vq);
+}
+
+/* multi-buffer version of vhost_add_used_and_signal */
+void vhost_add_used_and_signal_n(struct vhost_dev *dev,
+				 struct vhost_virtqueue *vq,
+				 struct vring_used_elem *heads, int count)
 {
-	vhost_add_used(vq, head, len);
+	vhost_add_used(vq, heads, count);
 	vhost_signal(dev, vq);
 }
 
diff -ruNp net-next-p0/drivers/vhost/vhost.h
net-next-v3/drivers/vhost/vhost.h
--- net-next-p0/drivers/vhost/vhost.h	2010-03-22 12:04:38.000000000
-0700
+++ net-next-v3/drivers/vhost/vhost.h	2010-04-05 20:33:57.000000000
-0700
@@ -85,6 +85,7 @@ struct vhost_virtqueue {
 	struct iovec iov[VHOST_NET_MAX_SG];
 	struct iovec hdr[VHOST_NET_MAX_SG];
 	size_t hdr_size;
+	struct vring_used_elem heads[VHOST_NET_MAX_SG];
 	/* We use a kind of RCU to access private pointer.
 	 * All readers access it from workqueue, which makes it possible to
 	 * flush the workqueue instead of synchronize_rcu. Therefore readers
do
@@ -120,16 +121,22 @@ long vhost_dev_ioctl(struct vhost_dev *,
 int vhost_vq_access_ok(struct vhost_virtqueue *vq);
 int vhost_log_access_ok(struct vhost_dev *);
 
-unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue
*,
+int vhost_get_desc_n(struct vhost_virtqueue *, struct vring_used_elem
*heads,
+		     int datalen, int *iovcount, struct vhost_log *log,
+		     unsigned int *log_num);
+unsigned vhost_get_desc(struct vhost_dev *, struct vhost_virtqueue *,
 			   struct iovec iov[], unsigned int iov_count,
 			   unsigned int *out_num, unsigned int *in_num,
 			   struct vhost_log *log, unsigned int *log_num);
-void vhost_discard_vq_desc(struct vhost_virtqueue *);
+void vhost_discard_desc(struct vhost_virtqueue *, int);
 
-int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int
len);
-void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
+int vhost_add_used(struct vhost_virtqueue *, struct vring_used_elem
*heads,
+		    int count);
 void vhost_add_used_and_signal(struct vhost_dev *, struct
vhost_virtqueue *,
-			       unsigned int head, int len);
+			       unsigned int id, int len);
+void vhost_add_used_and_signal_n(struct vhost_dev *, struct
vhost_virtqueue *,
+			       struct vring_used_elem *heads, int count);
+void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
 void vhost_disable_notify(struct vhost_virtqueue *);
 bool vhost_enable_notify(struct vhost_virtqueue *);
 
@@ -149,7 +156,8 @@ enum {
 	VHOST_FEATURES = (1 << VIRTIO_F_NOTIFY_ON_EMPTY) |
 			 (1 << VIRTIO_RING_F_INDIRECT_DESC) |
 			 (1 << VHOST_F_LOG_ALL) |
-			 (1 << VHOST_NET_F_VIRTIO_NET_HDR),
+			 (1 << VHOST_NET_F_VIRTIO_NET_HDR) |
+			 (1 << VIRTIO_NET_F_MRG_RXBUF),
 };
 
 static inline int vhost_has_feature(struct vhost_dev *dev, int bit)


--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Michael S. Tsirkin April 7, 2010, 10:59 a.m. UTC | #1
On Tue, Apr 06, 2010 at 01:32:53PM -0700, David L Stevens wrote:
> 
> This patch adds support for the Mergeable Receive Buffers feature to
> vhost_net.
> 
> 						+-DLS
> 
> Changes from previous revision:
> 1) renamed:
> 	vhost_discard_vq_desc -> vhost_discard_desc
> 	vhost_get_heads -> vhost_get_desc_n
> 	vhost_get_vq_desc -> vhost_get_desc
> 2) added heads as argument to ghost_get_desc_n
> 3) changed "vq->heads" from iovec to vring_used_elem, removed casts
> 4) changed vhost_add_used to do multiple elements in a single
> copy_to_user,
> 	or two when we wrap the ring.
> 5) removed rxmaxheadcount and available buffer checks in favor of
> running until
> 	an allocation failure, but making sure we break the loop if we get
> 	two in a row, indicating we have at least 1 buffer, but not enough
> 	for the current receive packet
> 6) restore non-vnet header handling
> 
> Signed-Off-By: David L Stevens <dlstevens@us.ibm.com>

Thanks!
There's some whitespace damage, are you sending with your new
sendmail setup? It seems to have worked for qemu patches ...

> diff -ruNp net-next-p0/drivers/vhost/net.c
> net-next-v3/drivers/vhost/net.c
> --- net-next-p0/drivers/vhost/net.c	2010-03-22 12:04:38.000000000 -0700
> +++ net-next-v3/drivers/vhost/net.c	2010-04-06 12:54:56.000000000 -0700
> @@ -130,9 +130,8 @@ static void handle_tx(struct vhost_net *
>  	hdr_size = vq->hdr_size;
>  
>  	for (;;) {
> -		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> -					 ARRAY_SIZE(vq->iov),
> -					 &out, &in,
> +		head = vhost_get_desc(&net->dev, vq, vq->iov,
> +					 ARRAY_SIZE(vq->iov), &out, &in,
>  					 NULL, NULL);
>  		/* Nothing new?  Wait for eventfd to tell us they refilled. */
>  		if (head == vq->num) {
> @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
>  		/* TODO: Check specific error and bomb out unless ENOBUFS? */
>  		err = sock->ops->sendmsg(NULL, sock, &msg, len);
>  		if (unlikely(err < 0)) {
> -			vhost_discard_vq_desc(vq);
> -			tx_poll_start(net, sock);
> +			if (err == -EAGAIN) {
> +				vhost_discard_desc(vq, 1);
> +				tx_poll_start(net, sock);
> +			} else {
> +				vq_err(vq, "sendmsg: errno %d\n", -err);
> +				/* drop packet; do not discard/resend */
> +				vhost_add_used_and_signal(&net->dev, vq, head,
> +							  0);

vhost does not currently has a consistent error handling strategy:
if we drop packets, need to think which other errors should cause
packet drops.  I prefer to just call vq_err for now,
and have us look at handling segfaults etc in a consistent way
separately.

> +			}
>  			break;
>  		}
>  		if (err != len)
> @@ -186,12 +192,25 @@ static void handle_tx(struct vhost_net *
>  	unuse_mm(net->dev.mm);
>  }
>  
> +static int vhost_head_len(struct sock *sk)
> +{
> +	struct sk_buff *head;
> +	int len = 0;
> +
> +	lock_sock(sk);
> +	head = skb_peek(&sk->sk_receive_queue);
> +	if (head)
> +		len = head->len;
> +	release_sock(sk);
> +	return len;
> +}
> +

I wonder whether it makes sense to check
skb_queue_empty(&sk->sk_receive_queue)
outside the lock, to reduce the cost of this call
on an empty queue (we know that it happens at least once
each time we exit the loop on rx)?

>  /* Expects to be always run from workqueue - which acts as
>   * read-size critical section for our kind of RCU. */
>  static void handle_rx(struct vhost_net *net)
>  {
>  	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> -	unsigned head, out, in, log, s;
> +	unsigned in, log, s;
>  	struct vhost_log *vq_log;
>  	struct msghdr msg = {
>  		.msg_name = NULL,
> @@ -202,13 +221,14 @@ static void handle_rx(struct vhost_net *
>  		.msg_flags = MSG_DONTWAIT,
>  	};
>  
> -	struct virtio_net_hdr hdr = {
> -		.flags = 0,
> -		.gso_type = VIRTIO_NET_HDR_GSO_NONE
> +	struct virtio_net_hdr_mrg_rxbuf hdr = {
> +		.hdr.flags = 0,
> +		.hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE
>  	};
>  
> +	int retries = 0;
>  	size_t len, total_len = 0;
> -	int err;
> +	int err, headcount, datalen;
>  	size_t hdr_size;
>  	struct socket *sock = rcu_dereference(vq->private_data);
>  	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
>  	vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
>  		vq->log : NULL;
>  
> -	for (;;) {
> -		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> -					 ARRAY_SIZE(vq->iov),
> -					 &out, &in,
> -					 vq_log, &log);
> +	while ((datalen = vhost_head_len(sock->sk))) {
> +		headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> +					     vq_log, &log);

This looks like a bug, I think we need to pass
datalen + header size to vhost_get_desc_n.
Not sure how we know the header size that backend will use though.
Maybe just look at our features.

>  		/* OK, now we need to know about added descriptors. */
> -		if (head == vq->num) {
> -			if (unlikely(vhost_enable_notify(vq))) {
> +		if (!headcount) {
> +			if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
>  				/* They have slipped one in as we were
>  				 * doing that: check again. */
>  				vhost_disable_notify(vq);
> +				retries++;
>  				continue;
>  			}

Hmm. The reason we have the code at all, as the comment says, is because
guest could have added more buffers between the time we read last index
and the time we enabled notification. So if we just break like this
the race still exists. We could remember the
last head value we observed, and have vhost_enable_notify check
against this value?

Need to think about it.

Another concern here is that on retries vhost_get_desc_n
is doing extra work, rescanning the same descriptor
again and again. Not sure how common this is, might be
worthwhile to add a TODO to consider this at least.

> +			retries = 0;
>  			/* Nothing new?  Wait for eventfd to tell us
>  			 * they refilled. */
>  			break;
>  		}
>  		/* We don't need to be notified again. */
> -		if (out) {
> -			vq_err(vq, "Unexpected descriptor format for RX: "
> -			       "out %d, int %d\n",
> -			       out, in);
> -			break;
> -		}
> -		/* Skip header. TODO: support TSO/mergeable rx buffers. */
> +		/* Skip header. TODO: support TSO. */
>  		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
>  		msg.msg_iovlen = in;
>  		len = iov_length(vq->iov, in);
> @@ -261,14 +275,33 @@ static void handle_rx(struct vhost_net *
>  					 len, MSG_DONTWAIT | MSG_TRUNC);
>  		/* TODO: Check specific error and bomb out unless EAGAIN? */
>  		if (err < 0) {

I think we need to compare err and datalen and drop packet on mismatch as well.
The check err > len won't be needed then.

> -			vhost_discard_vq_desc(vq);
> +			vhost_discard_desc(vq, headcount);
>  			break;
>  		}
>  		/* TODO: Should check and handle checksum. */
> +		if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> +			struct virtio_net_hdr_mrg_rxbuf *vhdr =
> +				(struct virtio_net_hdr_mrg_rxbuf *)
> +				vq->iov[0].iov_base;
> +			/* add num_buffers */
> +			if (vhost_has_feature(&net->dev,
> +					      VHOST_NET_F_VIRTIO_NET_HDR))
> +				hdr.num_buffers = headcount;

Why is the above necessary?

> +			else if (vq->iov[0].iov_len < sizeof(*vhdr)) {

I think length check is already done when we copy the header. No?

> +				vq_err(vq, "tiny buffers < %d unsupported",
> +					vq->iov[0].iov_len);
> +				vhost_discard_desc(vq, headcount);
> +				break;

Problem here is that recvmsg might modify iov.
This is why I think we need to use vq->hdr here.

> +			} else if (put_user(headcount, &vhdr->num_buffers)) {

The above put_user writes out a 32 bit value, right?
This seems wrong.

How about using
	memcpy_toiovecend(vq->hdr, &headcount,
			  offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
			  sizeof headcount);

this way we also do not make any assumptions about layout.

> +				vq_err(vq, "Failed num_buffers write");
> +				vhost_discard_desc(vq, headcount);
> +				break;
> +			}
> +		}
>  		if (err > len) {
>  			pr_err("Discarded truncated rx packet: "
>  			       " len %d > %zd\n", err, len);
> -			vhost_discard_vq_desc(vq);
> +			vhost_discard_desc(vq, headcount);
>  			continue;
>  		}
>  		len = err;
> @@ -279,7 +312,7 @@ static void handle_rx(struct vhost_net *
>  			break;
>  		}
>  		len += hdr_size;
> -		vhost_add_used_and_signal(&net->dev, vq, head, len);
> +		vhost_add_used_and_signal_n(&net->dev, vq, vq->heads, headcount);
>  		if (unlikely(vq_log))
>  			vhost_log_write(vq, vq_log, log, len);
>  		total_len += len;
> @@ -560,9 +593,14 @@ done:
>  
>  static int vhost_net_set_features(struct vhost_net *n, u64 features)
>  {
> -	size_t hdr_size = features & (1 << VHOST_NET_F_VIRTIO_NET_HDR) ?
> -		sizeof(struct virtio_net_hdr) : 0;
> +	size_t hdr_size = 0;
>  	int i;
> +
> +	if (features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)) {
> +		hdr_size = sizeof(struct virtio_net_hdr);
> +		if (features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> +			hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);

My personal style for this would be:
  	if (!(features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)))
		hdr_size = 0
	else if (!(features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
		hdr_size = sizeof(virtio_net_hdr);
	else
		hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);

which results in more symmetry and less nesting.

>  	mutex_lock(&n->dev.mutex);
>  	if ((features & (1 << VHOST_F_LOG_ALL)) &&
>  	    !vhost_log_access_ok(&n->dev)) {
> diff -ruNp net-next-p0/drivers/vhost/vhost.c
> net-next-v3/drivers/vhost/vhost.c
> --- net-next-p0/drivers/vhost/vhost.c	2010-03-22 12:04:38.000000000
> -0700
> +++ net-next-v3/drivers/vhost/vhost.c	2010-04-06 12:57:51.000000000
> -0700
> @@ -856,6 +856,47 @@ static unsigned get_indirect(struct vhos
>  	return 0;
>  }
>  
> +/* This is a multi-buffer version of vhost_get_vq_desc
> + * @vq		- the relevant virtqueue
> + * datalen	- data length we'll be reading
> + * @iovcount	- returned count of io vectors we fill
> + * @log		- vhost log
> + * @log_num	- log offset
> + *	returns number of buffer heads allocated, 0 on error

This is unusual. Let's return a negative error code on error.

> + */
> +int vhost_get_desc_n(struct vhost_virtqueue *vq, struct vring_used_elem
> *heads,
> +		     int datalen, int *iovcount, struct vhost_log *log,
> +		     unsigned int *log_num)
> +{
> +	int out, in;
> +	int seg = 0;		/* iov index */
> +	int hc = 0;		/* head count */
> +
> +	while (datalen > 0) {
> +		if (hc >= VHOST_NET_MAX_SG)
> +			goto err;
> +		heads[hc].id = vhost_get_desc(vq->dev, vq, vq->iov+seg,
> +					      ARRAY_SIZE(vq->iov)-seg, &out,
> +					      &in, log, log_num);
> +		if (heads[hc].id == vq->num)
> +			goto err;
> +		if (out || in <= 0) {
> +			vq_err(vq, "unexpected descriptor format for RX: "
> +				"out %d, in %d\n", out, in);
> +			goto err;
> +		}
> +		heads[hc].len = iov_length(vq->iov+seg, in);
> +		datalen -= heads[hc].len;

This signed/unsigned mix makes me nervuous.
Let's make datalen unsigned, add unsigned total_len, and
while (datalen < total_len).

> +		hc++;
> +		seg += in;
> +	}
> +	*iovcount = seg;
> +	return hc;
> +err:
> +	vhost_discard_desc(vq, hc);
> +	return 0;
> +}
> +
>  /* This looks in the virtqueue and for the first available buffer, and
> converts
>   * it to an iovec for convenient access.  Since descriptors consist of
> some
>   * number of output then some number of input descriptors, it's
> actually two
> @@ -863,7 +904,7 @@ static unsigned get_indirect(struct vhos
>   *
>   * This function returns the descriptor number found, or vq->num (which
>   * is never a valid descriptor number) if none was found. */
> -unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct
> vhost_virtqueue *vq,
> +unsigned vhost_get_desc(struct vhost_dev *dev, struct vhost_virtqueue
> *vq,
>  			   struct iovec iov[], unsigned int iov_size,
>  			   unsigned int *out_num, unsigned int *in_num,
>  			   struct vhost_log *log, unsigned int *log_num)
> @@ -981,31 +1022,42 @@ unsigned vhost_get_vq_desc(struct vhost_
>  }
>  
>  /* Reverse the effect of vhost_get_vq_desc. Useful for error handling.
> */
> -void vhost_discard_vq_desc(struct vhost_virtqueue *vq)
> +void vhost_discard_desc(struct vhost_virtqueue *vq, int n)
>  {
> -	vq->last_avail_idx--;
> +	vq->last_avail_idx -= n;
>  }
>  
>  /* After we've used one of their buffers, we tell them about it.  We'll
> then
>   * want to notify the guest, using eventfd. */
> -int vhost_add_used(struct vhost_virtqueue *vq, unsigned int head, int
> len)
> +int vhost_add_used(struct vhost_virtqueue *vq, struct vring_used_elem
> *heads,
> +		   int count)

I think we are better off with vhost_add_used and vhost_add_used_n:
the version with _n has a lot of extra complexity, and tx always
adds them 1 by one.

>  {
>  	struct vring_used_elem *used;
> +	int start, n;
> +
> +	if (count <= 0)
> +		return -EINVAL;
>  
> -	/* The virtqueue contains a ring of used buffers.  Get a pointer to
> the
> -	 * next entry in that used ring. */
> -	used = &vq->used->ring[vq->last_used_idx % vq->num];
> -	if (put_user(head, &used->id)) {
> -		vq_err(vq, "Failed to write used id");
> +	start = vq->last_used_idx % vq->num;
> +	if (vq->num - start < count)
> +		n = vq->num - start;
> +	else
> +		n = count;

use min?

> +	used = vq->used->ring + start;
> +	if (copy_to_user(used, heads, sizeof(heads[0])*n)) {
> +		vq_err(vq, "Failed to write used");
>  		return -EFAULT;
>  	}
> -	if (put_user(len, &used->len)) {
> -		vq_err(vq, "Failed to write used len");
> -		return -EFAULT;
> +	if (n < count) {	/* wrapped the ring */
> +		used = vq->used->ring;
> +		if (copy_to_user(used, heads+n, sizeof(heads[0])*(count-n))) {
> +			vq_err(vq, "Failed to write used");
> +			return -EFAULT;
> +		}
>  	}
>  	/* Make sure buffer is written before we update index. */
>  	smp_wmb();
> -	if (put_user(vq->last_used_idx + 1, &vq->used->idx)) {
> +	if (put_user(vq->last_used_idx+count, &vq->used->idx)) {

I am a bit confused ... will this write a 32 or 16 bit value?
count is 32 bit ... Maybe we are better off with
  u16 idx = vq->last_used_idx + count
  put_user(idx, &vq->used->idx)
  vq->last_used_idx = idx

>  		vq_err(vq, "Failed to increment used idx");
>  		return -EFAULT;
>  	}
> @@ -1023,7 +1075,7 @@ int vhost_add_used(struct vhost_virtqueu
>  		if (vq->log_ctx)
>  			eventfd_signal(vq->log_ctx, 1);
>  	}
> -	vq->last_used_idx++;
> +	vq->last_used_idx += count;
>  	return 0;
>  }
>  
> @@ -1049,10 +1101,23 @@ void vhost_signal(struct vhost_dev *dev,
>  
>  /* And here's the combo meal deal.  Supersize me! */
>  void vhost_add_used_and_signal(struct vhost_dev *dev,
> -			       struct vhost_virtqueue *vq,
> -			       unsigned int head, int len)
> +			       struct vhost_virtqueue *vq, unsigned int id,
> +			       int len)
> +{
> +	struct vring_used_elem head;
> +
> +	head.id = id;
> +	head.len = len;
> +	vhost_add_used(vq, &head, 1);
> +	vhost_signal(dev, vq);
> +}
> +
> +/* multi-buffer version of vhost_add_used_and_signal */
> +void vhost_add_used_and_signal_n(struct vhost_dev *dev,
> +				 struct vhost_virtqueue *vq,
> +				 struct vring_used_elem *heads, int count)
>  {
> -	vhost_add_used(vq, head, len);
> +	vhost_add_used(vq, heads, count);
>  	vhost_signal(dev, vq);
>  }
>  
> diff -ruNp net-next-p0/drivers/vhost/vhost.h
> net-next-v3/drivers/vhost/vhost.h
> --- net-next-p0/drivers/vhost/vhost.h	2010-03-22 12:04:38.000000000
> -0700
> +++ net-next-v3/drivers/vhost/vhost.h	2010-04-05 20:33:57.000000000
> -0700
> @@ -85,6 +85,7 @@ struct vhost_virtqueue {
>  	struct iovec iov[VHOST_NET_MAX_SG];
>  	struct iovec hdr[VHOST_NET_MAX_SG];
>  	size_t hdr_size;
> +	struct vring_used_elem heads[VHOST_NET_MAX_SG];
>  	/* We use a kind of RCU to access private pointer.
>  	 * All readers access it from workqueue, which makes it possible to
>  	 * flush the workqueue instead of synchronize_rcu. Therefore readers
> do
> @@ -120,16 +121,22 @@ long vhost_dev_ioctl(struct vhost_dev *,
>  int vhost_vq_access_ok(struct vhost_virtqueue *vq);
>  int vhost_log_access_ok(struct vhost_dev *);
>  
> -unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue
> *,
> +int vhost_get_desc_n(struct vhost_virtqueue *, struct vring_used_elem
> *heads,
> +		     int datalen, int *iovcount, struct vhost_log *log,
> +		     unsigned int *log_num);
> +unsigned vhost_get_desc(struct vhost_dev *, struct vhost_virtqueue *,
>  			   struct iovec iov[], unsigned int iov_count,
>  			   unsigned int *out_num, unsigned int *in_num,
>  			   struct vhost_log *log, unsigned int *log_num);
> -void vhost_discard_vq_desc(struct vhost_virtqueue *);
> +void vhost_discard_desc(struct vhost_virtqueue *, int);
>  
> -int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int
> len);
> -void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
> +int vhost_add_used(struct vhost_virtqueue *, struct vring_used_elem
> *heads,
> +		    int count);
>  void vhost_add_used_and_signal(struct vhost_dev *, struct
> vhost_virtqueue *,
> -			       unsigned int head, int len);
> +			       unsigned int id, int len);
> +void vhost_add_used_and_signal_n(struct vhost_dev *, struct
> vhost_virtqueue *,
> +			       struct vring_used_elem *heads, int count);
> +void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
>  void vhost_disable_notify(struct vhost_virtqueue *);
>  bool vhost_enable_notify(struct vhost_virtqueue *);
>  
> @@ -149,7 +156,8 @@ enum {
>  	VHOST_FEATURES = (1 << VIRTIO_F_NOTIFY_ON_EMPTY) |
>  			 (1 << VIRTIO_RING_F_INDIRECT_DESC) |
>  			 (1 << VHOST_F_LOG_ALL) |
> -			 (1 << VHOST_NET_F_VIRTIO_NET_HDR),
> +			 (1 << VHOST_NET_F_VIRTIO_NET_HDR) |
> +			 (1 << VIRTIO_NET_F_MRG_RXBUF),
>  };
>  
>  static inline int vhost_has_feature(struct vhost_dev *dev, int bit)
> 
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Michael S. Tsirkin April 7, 2010, 11:35 a.m. UTC | #2
Some corrections:

On Wed, Apr 07, 2010 at 01:59:10PM +0300, Michael S. Tsirkin wrote:
> On Tue, Apr 06, 2010 at 01:32:53PM -0700, David L Stevens wrote:
> > 
> > This patch adds support for the Mergeable Receive Buffers feature to
> > vhost_net.
> > 
> > 						+-DLS
> > 
> > Changes from previous revision:
> > 1) renamed:
> > 	vhost_discard_vq_desc -> vhost_discard_desc
> > 	vhost_get_heads -> vhost_get_desc_n
> > 	vhost_get_vq_desc -> vhost_get_desc
> > 2) added heads as argument to ghost_get_desc_n
> > 3) changed "vq->heads" from iovec to vring_used_elem, removed casts
> > 4) changed vhost_add_used to do multiple elements in a single
> > copy_to_user,
> > 	or two when we wrap the ring.
> > 5) removed rxmaxheadcount and available buffer checks in favor of
> > running until
> > 	an allocation failure, but making sure we break the loop if we get
> > 	two in a row, indicating we have at least 1 buffer, but not enough
> > 	for the current receive packet
> > 6) restore non-vnet header handling
> > 
> > Signed-Off-By: David L Stevens <dlstevens@us.ibm.com>
> 
> Thanks!
> There's some whitespace damage, are you sending with your new
> sendmail setup? It seems to have worked for qemu patches ...
> 
> > diff -ruNp net-next-p0/drivers/vhost/net.c
> > net-next-v3/drivers/vhost/net.c
> > --- net-next-p0/drivers/vhost/net.c	2010-03-22 12:04:38.000000000 -0700
> > +++ net-next-v3/drivers/vhost/net.c	2010-04-06 12:54:56.000000000 -0700
> > @@ -130,9 +130,8 @@ static void handle_tx(struct vhost_net *
> >  	hdr_size = vq->hdr_size;
> >  
> >  	for (;;) {
> > -		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > -					 ARRAY_SIZE(vq->iov),
> > -					 &out, &in,
> > +		head = vhost_get_desc(&net->dev, vq, vq->iov,
> > +					 ARRAY_SIZE(vq->iov), &out, &in,
> >  					 NULL, NULL);
> >  		/* Nothing new?  Wait for eventfd to tell us they refilled. */
> >  		if (head == vq->num) {
> > @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
> >  		/* TODO: Check specific error and bomb out unless ENOBUFS? */
> >  		err = sock->ops->sendmsg(NULL, sock, &msg, len);
> >  		if (unlikely(err < 0)) {
> > -			vhost_discard_vq_desc(vq);
> > -			tx_poll_start(net, sock);
> > +			if (err == -EAGAIN) {
> > +				vhost_discard_desc(vq, 1);
> > +				tx_poll_start(net, sock);
> > +			} else {
> > +				vq_err(vq, "sendmsg: errno %d\n", -err);
> > +				/* drop packet; do not discard/resend */
> > +				vhost_add_used_and_signal(&net->dev, vq, head,
> > +							  0);
> 
> vhost does not currently has a consistent error handling strategy:
> if we drop packets, need to think which other errors should cause
> packet drops.  I prefer to just call vq_err for now,
> and have us look at handling segfaults etc in a consistent way
> separately.
> 
> > +			}
> >  			break;
> >  		}
> >  		if (err != len)
> > @@ -186,12 +192,25 @@ static void handle_tx(struct vhost_net *
> >  	unuse_mm(net->dev.mm);
> >  }
> >  
> > +static int vhost_head_len(struct sock *sk)
> > +{
> > +	struct sk_buff *head;
> > +	int len = 0;
> > +
> > +	lock_sock(sk);
> > +	head = skb_peek(&sk->sk_receive_queue);
> > +	if (head)
> > +		len = head->len;
> > +	release_sock(sk);
> > +	return len;
> > +}
> > +
> 
> I wonder whether it makes sense to check
> skb_queue_empty(&sk->sk_receive_queue)
> outside the lock, to reduce the cost of this call
> on an empty queue (we know that it happens at least once
> each time we exit the loop on rx)?
> 
> >  /* Expects to be always run from workqueue - which acts as
> >   * read-size critical section for our kind of RCU. */
> >  static void handle_rx(struct vhost_net *net)
> >  {
> >  	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
> > -	unsigned head, out, in, log, s;
> > +	unsigned in, log, s;
> >  	struct vhost_log *vq_log;
> >  	struct msghdr msg = {
> >  		.msg_name = NULL,
> > @@ -202,13 +221,14 @@ static void handle_rx(struct vhost_net *
> >  		.msg_flags = MSG_DONTWAIT,
> >  	};
> >  
> > -	struct virtio_net_hdr hdr = {
> > -		.flags = 0,
> > -		.gso_type = VIRTIO_NET_HDR_GSO_NONE
> > +	struct virtio_net_hdr_mrg_rxbuf hdr = {
> > +		.hdr.flags = 0,
> > +		.hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE
> >  	};
> >  
> > +	int retries = 0;
> >  	size_t len, total_len = 0;
> > -	int err;
> > +	int err, headcount, datalen;
> >  	size_t hdr_size;
> >  	struct socket *sock = rcu_dereference(vq->private_data);
> >  	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
> >  	vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> >  		vq->log : NULL;
> >  
> > -	for (;;) {
> > -		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > -					 ARRAY_SIZE(vq->iov),
> > -					 &out, &in,
> > -					 vq_log, &log);
> > +	while ((datalen = vhost_head_len(sock->sk))) {
> > +		headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> > +					     vq_log, &log);
> 
> This looks like a bug, I think we need to pass
> datalen + header size to vhost_get_desc_n.
> Not sure how we know the header size that backend will use though.
> Maybe just look at our features.
> 
> >  		/* OK, now we need to know about added descriptors. */
> > -		if (head == vq->num) {
> > -			if (unlikely(vhost_enable_notify(vq))) {
> > +		if (!headcount) {
> > +			if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
> >  				/* They have slipped one in as we were
> >  				 * doing that: check again. */
> >  				vhost_disable_notify(vq);
> > +				retries++;
> >  				continue;
> >  			}
> 
> Hmm. The reason we have the code at all, as the comment says, is because
> guest could have added more buffers between the time we read last index
> and the time we enabled notification. So if we just break like this
> the race still exists. We could remember the
> last head value we observed, and have vhost_enable_notify check
> against this value?
> 
> Need to think about it.
> 
> Another concern here is that on retries vhost_get_desc_n
> is doing extra work, rescanning the same descriptor
> again and again. Not sure how common this is, might be
> worthwhile to add a TODO to consider this at least.
> 
> > +			retries = 0;
> >  			/* Nothing new?  Wait for eventfd to tell us
> >  			 * they refilled. */
> >  			break;
> >  		}
> >  		/* We don't need to be notified again. */
> > -		if (out) {
> > -			vq_err(vq, "Unexpected descriptor format for RX: "
> > -			       "out %d, int %d\n",
> > -			       out, in);
> > -			break;
> > -		}
> > -		/* Skip header. TODO: support TSO/mergeable rx buffers. */
> > +		/* Skip header. TODO: support TSO. */
> >  		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> >  		msg.msg_iovlen = in;
> >  		len = iov_length(vq->iov, in);
> > @@ -261,14 +275,33 @@ static void handle_rx(struct vhost_net *
> >  					 len, MSG_DONTWAIT | MSG_TRUNC);
> >  		/* TODO: Check specific error and bomb out unless EAGAIN? */
> >  		if (err < 0) {
> 
> I think we need to compare err and datalen and drop packet on mismatch as well.
> The check err > len won't be needed then.
> 
> > -			vhost_discard_vq_desc(vq);
> > +			vhost_discard_desc(vq, headcount);
> >  			break;
> >  		}
> >  		/* TODO: Should check and handle checksum. */
> > +		if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> > +			struct virtio_net_hdr_mrg_rxbuf *vhdr =
> > +				(struct virtio_net_hdr_mrg_rxbuf *)
> > +				vq->iov[0].iov_base;
> > +			/* add num_buffers */
> > +			if (vhost_has_feature(&net->dev,
> > +					      VHOST_NET_F_VIRTIO_NET_HDR))
> > +				hdr.num_buffers = headcount;
> 
> Why is the above necessary?
> 
> > +			else if (vq->iov[0].iov_len < sizeof(*vhdr)) {
> 
> I think length check is already done when we copy the header. No?
> 
> > +				vq_err(vq, "tiny buffers < %d unsupported",
> > +					vq->iov[0].iov_len);
> > +				vhost_discard_desc(vq, headcount);
> > +				break;
> 
> Problem here is that recvmsg might modify iov.
> This is why I think we need to use vq->hdr here.
> 
> > +			} else if (put_user(headcount, &vhdr->num_buffers)) {
> 
> The above put_user writes out a 32 bit value, right?
> This seems wrong.

Sorry, put_user looks at the pointer type, so that's ok.
I still suggest memcpy_toiovecend to remove layout assumptions.

> How about using
> 	memcpy_toiovecend(vq->hdr, &headcount,
> 			  offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
> 			  sizeof headcount);
> 
> this way we also do not make any assumptions about layout.
> 
> > +				vq_err(vq, "Failed num_buffers write");
> > +				vhost_discard_desc(vq, headcount);
> > +				break;
> > +			}
> > +		}
> >  		if (err > len) {
> >  			pr_err("Discarded truncated rx packet: "
> >  			       " len %d > %zd\n", err, len);
> > -			vhost_discard_vq_desc(vq);
> > +			vhost_discard_desc(vq, headcount);
> >  			continue;
> >  		}
> >  		len = err;
> > @@ -279,7 +312,7 @@ static void handle_rx(struct vhost_net *
> >  			break;
> >  		}
> >  		len += hdr_size;
> > -		vhost_add_used_and_signal(&net->dev, vq, head, len);
> > +		vhost_add_used_and_signal_n(&net->dev, vq, vq->heads, headcount);
> >  		if (unlikely(vq_log))
> >  			vhost_log_write(vq, vq_log, log, len);
> >  		total_len += len;
> > @@ -560,9 +593,14 @@ done:
> >  
> >  static int vhost_net_set_features(struct vhost_net *n, u64 features)
> >  {
> > -	size_t hdr_size = features & (1 << VHOST_NET_F_VIRTIO_NET_HDR) ?
> > -		sizeof(struct virtio_net_hdr) : 0;
> > +	size_t hdr_size = 0;
> >  	int i;
> > +
> > +	if (features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)) {
> > +		hdr_size = sizeof(struct virtio_net_hdr);
> > +		if (features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> > +			hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> 
> My personal style for this would be:
>   	if (!(features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)))
> 		hdr_size = 0
> 	else if (!(features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
> 		hdr_size = sizeof(virtio_net_hdr);
> 	else
> 		hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> 
> which results in more symmetry and less nesting.
> 
> >  	mutex_lock(&n->dev.mutex);
> >  	if ((features & (1 << VHOST_F_LOG_ALL)) &&
> >  	    !vhost_log_access_ok(&n->dev)) {
> > diff -ruNp net-next-p0/drivers/vhost/vhost.c
> > net-next-v3/drivers/vhost/vhost.c
> > --- net-next-p0/drivers/vhost/vhost.c	2010-03-22 12:04:38.000000000
> > -0700
> > +++ net-next-v3/drivers/vhost/vhost.c	2010-04-06 12:57:51.000000000
> > -0700
> > @@ -856,6 +856,47 @@ static unsigned get_indirect(struct vhos
> >  	return 0;
> >  }
> >  
> > +/* This is a multi-buffer version of vhost_get_vq_desc
> > + * @vq		- the relevant virtqueue
> > + * datalen	- data length we'll be reading
> > + * @iovcount	- returned count of io vectors we fill
> > + * @log		- vhost log
> > + * @log_num	- log offset
> > + *	returns number of buffer heads allocated, 0 on error
> 
> This is unusual. Let's return a negative error code on error.
> 
> > + */
> > +int vhost_get_desc_n(struct vhost_virtqueue *vq, struct vring_used_elem
> > *heads,
> > +		     int datalen, int *iovcount, struct vhost_log *log,
> > +		     unsigned int *log_num)
> > +{
> > +	int out, in;
> > +	int seg = 0;		/* iov index */
> > +	int hc = 0;		/* head count */
> > +
> > +	while (datalen > 0) {
> > +		if (hc >= VHOST_NET_MAX_SG)
> > +			goto err;
> > +		heads[hc].id = vhost_get_desc(vq->dev, vq, vq->iov+seg,
> > +					      ARRAY_SIZE(vq->iov)-seg, &out,
> > +					      &in, log, log_num);
> > +		if (heads[hc].id == vq->num)
> > +			goto err;
> > +		if (out || in <= 0) {
> > +			vq_err(vq, "unexpected descriptor format for RX: "
> > +				"out %d, in %d\n", out, in);
> > +			goto err;
> > +		}
> > +		heads[hc].len = iov_length(vq->iov+seg, in);
> > +		datalen -= heads[hc].len;
> 
> This signed/unsigned mix makes me nervuous.
> Let's make datalen unsigned, add unsigned total_len, and
> while (datalen < total_len).
> 
> > +		hc++;
> > +		seg += in;
> > +	}
> > +	*iovcount = seg;
> > +	return hc;
> > +err:
> > +	vhost_discard_desc(vq, hc);
> > +	return 0;
> > +}
> > +
> >  /* This looks in the virtqueue and for the first available buffer, and
> > converts
> >   * it to an iovec for convenient access.  Since descriptors consist of
> > some
> >   * number of output then some number of input descriptors, it's
> > actually two
> > @@ -863,7 +904,7 @@ static unsigned get_indirect(struct vhos
> >   *
> >   * This function returns the descriptor number found, or vq->num (which
> >   * is never a valid descriptor number) if none was found. */
> > -unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct
> > vhost_virtqueue *vq,
> > +unsigned vhost_get_desc(struct vhost_dev *dev, struct vhost_virtqueue
> > *vq,
> >  			   struct iovec iov[], unsigned int iov_size,
> >  			   unsigned int *out_num, unsigned int *in_num,
> >  			   struct vhost_log *log, unsigned int *log_num)
> > @@ -981,31 +1022,42 @@ unsigned vhost_get_vq_desc(struct vhost_
> >  }
> >  
> >  /* Reverse the effect of vhost_get_vq_desc. Useful for error handling.
> > */
> > -void vhost_discard_vq_desc(struct vhost_virtqueue *vq)
> > +void vhost_discard_desc(struct vhost_virtqueue *vq, int n)
> >  {
> > -	vq->last_avail_idx--;
> > +	vq->last_avail_idx -= n;
> >  }
> >  
> >  /* After we've used one of their buffers, we tell them about it.  We'll
> > then
> >   * want to notify the guest, using eventfd. */
> > -int vhost_add_used(struct vhost_virtqueue *vq, unsigned int head, int
> > len)
> > +int vhost_add_used(struct vhost_virtqueue *vq, struct vring_used_elem
> > *heads,
> > +		   int count)
> 
> I think we are better off with vhost_add_used and vhost_add_used_n:
> the version with _n has a lot of extra complexity, and tx always
> adds them 1 by one.
> 
> >  {
> >  	struct vring_used_elem *used;
> > +	int start, n;
> > +
> > +	if (count <= 0)
> > +		return -EINVAL;
> >  
> > -	/* The virtqueue contains a ring of used buffers.  Get a pointer to
> > the
> > -	 * next entry in that used ring. */
> > -	used = &vq->used->ring[vq->last_used_idx % vq->num];
> > -	if (put_user(head, &used->id)) {
> > -		vq_err(vq, "Failed to write used id");
> > +	start = vq->last_used_idx % vq->num;
> > +	if (vq->num - start < count)
> > +		n = vq->num - start;
> > +	else
> > +		n = count;
> 
> use min?
> 
> > +	used = vq->used->ring + start;
> > +	if (copy_to_user(used, heads, sizeof(heads[0])*n)) {
> > +		vq_err(vq, "Failed to write used");
> >  		return -EFAULT;
> >  	}
> > -	if (put_user(len, &used->len)) {
> > -		vq_err(vq, "Failed to write used len");
> > -		return -EFAULT;
> > +	if (n < count) {	/* wrapped the ring */
> > +		used = vq->used->ring;
> > +		if (copy_to_user(used, heads+n, sizeof(heads[0])*(count-n))) {
> > +			vq_err(vq, "Failed to write used");
> > +			return -EFAULT;
> > +		}
> >  	}
> >  	/* Make sure buffer is written before we update index. */
> >  	smp_wmb();
> > -	if (put_user(vq->last_used_idx + 1, &vq->used->idx)) {
> > +	if (put_user(vq->last_used_idx+count, &vq->used->idx)) {
> 
> I am a bit confused ... will this write a 32 or 16 bit value?
> count is 32 bit ... Maybe we are better off with
>   u16 idx = vq->last_used_idx + count
>   put_user(idx, &vq->used->idx)
>   vq->last_used_idx = idx

The above's not necessary, put_user gets type from the pointer.

> >  		vq_err(vq, "Failed to increment used idx");
> >  		return -EFAULT;
> >  	}
> > @@ -1023,7 +1075,7 @@ int vhost_add_used(struct vhost_virtqueu
> >  		if (vq->log_ctx)
> >  			eventfd_signal(vq->log_ctx, 1);
> >  	}
> > -	vq->last_used_idx++;
> > +	vq->last_used_idx += count;
> >  	return 0;
> >  }
> >  
> > @@ -1049,10 +1101,23 @@ void vhost_signal(struct vhost_dev *dev,
> >  
> >  /* And here's the combo meal deal.  Supersize me! */
> >  void vhost_add_used_and_signal(struct vhost_dev *dev,
> > -			       struct vhost_virtqueue *vq,
> > -			       unsigned int head, int len)
> > +			       struct vhost_virtqueue *vq, unsigned int id,
> > +			       int len)
> > +{
> > +	struct vring_used_elem head;
> > +
> > +	head.id = id;
> > +	head.len = len;
> > +	vhost_add_used(vq, &head, 1);
> > +	vhost_signal(dev, vq);
> > +}
> > +
> > +/* multi-buffer version of vhost_add_used_and_signal */
> > +void vhost_add_used_and_signal_n(struct vhost_dev *dev,
> > +				 struct vhost_virtqueue *vq,
> > +				 struct vring_used_elem *heads, int count)
> >  {
> > -	vhost_add_used(vq, head, len);
> > +	vhost_add_used(vq, heads, count);
> >  	vhost_signal(dev, vq);
> >  }
> >  
> > diff -ruNp net-next-p0/drivers/vhost/vhost.h
> > net-next-v3/drivers/vhost/vhost.h
> > --- net-next-p0/drivers/vhost/vhost.h	2010-03-22 12:04:38.000000000
> > -0700
> > +++ net-next-v3/drivers/vhost/vhost.h	2010-04-05 20:33:57.000000000
> > -0700
> > @@ -85,6 +85,7 @@ struct vhost_virtqueue {
> >  	struct iovec iov[VHOST_NET_MAX_SG];
> >  	struct iovec hdr[VHOST_NET_MAX_SG];
> >  	size_t hdr_size;
> > +	struct vring_used_elem heads[VHOST_NET_MAX_SG];
> >  	/* We use a kind of RCU to access private pointer.
> >  	 * All readers access it from workqueue, which makes it possible to
> >  	 * flush the workqueue instead of synchronize_rcu. Therefore readers
> > do
> > @@ -120,16 +121,22 @@ long vhost_dev_ioctl(struct vhost_dev *,
> >  int vhost_vq_access_ok(struct vhost_virtqueue *vq);
> >  int vhost_log_access_ok(struct vhost_dev *);
> >  
> > -unsigned vhost_get_vq_desc(struct vhost_dev *, struct vhost_virtqueue
> > *,
> > +int vhost_get_desc_n(struct vhost_virtqueue *, struct vring_used_elem
> > *heads,
> > +		     int datalen, int *iovcount, struct vhost_log *log,
> > +		     unsigned int *log_num);
> > +unsigned vhost_get_desc(struct vhost_dev *, struct vhost_virtqueue *,
> >  			   struct iovec iov[], unsigned int iov_count,
> >  			   unsigned int *out_num, unsigned int *in_num,
> >  			   struct vhost_log *log, unsigned int *log_num);
> > -void vhost_discard_vq_desc(struct vhost_virtqueue *);
> > +void vhost_discard_desc(struct vhost_virtqueue *, int);
> >  
> > -int vhost_add_used(struct vhost_virtqueue *, unsigned int head, int
> > len);
> > -void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
> > +int vhost_add_used(struct vhost_virtqueue *, struct vring_used_elem
> > *heads,
> > +		    int count);
> >  void vhost_add_used_and_signal(struct vhost_dev *, struct
> > vhost_virtqueue *,
> > -			       unsigned int head, int len);
> > +			       unsigned int id, int len);
> > +void vhost_add_used_and_signal_n(struct vhost_dev *, struct
> > vhost_virtqueue *,
> > +			       struct vring_used_elem *heads, int count);
> > +void vhost_signal(struct vhost_dev *, struct vhost_virtqueue *);
> >  void vhost_disable_notify(struct vhost_virtqueue *);
> >  bool vhost_enable_notify(struct vhost_virtqueue *);
> >  
> > @@ -149,7 +156,8 @@ enum {
> >  	VHOST_FEATURES = (1 << VIRTIO_F_NOTIFY_ON_EMPTY) |
> >  			 (1 << VIRTIO_RING_F_INDIRECT_DESC) |
> >  			 (1 << VHOST_F_LOG_ALL) |
> > -			 (1 << VHOST_NET_F_VIRTIO_NET_HDR),
> > +			 (1 << VHOST_NET_F_VIRTIO_NET_HDR) |
> > +			 (1 << VIRTIO_NET_F_MRG_RXBUF),
> >  };
> >  
> >  static inline int vhost_has_feature(struct vhost_dev *dev, int bit)
> > 
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
David Stevens April 7, 2010, 5:37 p.m. UTC | #3
> 
> Thanks!
> There's some whitespace damage, are you sending with your new
> sendmail setup? It seems to have worked for qemu patches ...

        Yes, I saw some line wraps in what I received, but I checked
the original draft to be sure and they weren't there. Possibly from
the relay; Sigh.


> > @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
> >        /* TODO: Check specific error and bomb out unless ENOBUFS? */
> >        err = sock->ops->sendmsg(NULL, sock, &msg, len);
> >        if (unlikely(err < 0)) {
> > -         vhost_discard_vq_desc(vq);
> > -         tx_poll_start(net, sock);
> > +         if (err == -EAGAIN) {
> > +            vhost_discard_desc(vq, 1);
> > +            tx_poll_start(net, sock);
> > +         } else {
> > +            vq_err(vq, "sendmsg: errno %d\n", -err);
> > +            /* drop packet; do not discard/resend */
> > +            vhost_add_used_and_signal(&net->dev, vq, head,
> > +                       0);
> 
> vhost does not currently has a consistent error handling strategy:
> if we drop packets, need to think which other errors should cause
> packet drops.  I prefer to just call vq_err for now,
> and have us look at handling segfaults etc in a consistent way
> separately.

I had to add this to avoid an infinite loop when I wrote a bad
packet on the socket. I agree error handling needs a better look,
but retrying a bad packet continuously while dumping in the log
is what it was doing when I hit an error before this code. Isn't
this better, at least until a second look?


> > +}
> > +
> 
> I wonder whether it makes sense to check
> skb_queue_empty(&sk->sk_receive_queue)
> outside the lock, to reduce the cost of this call
> on an empty queue (we know that it happens at least once
> each time we exit the loop on rx)?

        I was looking at alternatives to adding the lock in the
first place, but I found I couldn't measure a difference in the
cost with and without the lock.


> > +   int retries = 0;
> >     size_t len, total_len = 0;
> > -   int err;
> > +   int err, headcount, datalen;
> >     size_t hdr_size;
> >     struct socket *sock = rcu_dereference(vq->private_data);
> >     if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
> >     vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> >        vq->log : NULL;
> > 
> > -   for (;;) {
> > -      head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > -                ARRAY_SIZE(vq->iov),
> > -                &out, &in,
> > -                vq_log, &log);
> > +   while ((datalen = vhost_head_len(sock->sk))) {
> > +      headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> > +                    vq_log, &log);
> 
> This looks like a bug, I think we need to pass
> datalen + header size to vhost_get_desc_n.
> Not sure how we know the header size that backend will use though.
> Maybe just look at our features.

        Yes; we have hdr_size, so I can add it here. It'll be 0 for
the cases where the backend and guest both have vnet header (either
the regular or larger mergeable buffers one), but should be added
in for future raw socket support.

> 
> >        /* OK, now we need to know about added descriptors. */
> > -      if (head == vq->num) {
> > -         if (unlikely(vhost_enable_notify(vq))) {
> > +      if (!headcount) {
> > +         if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
> >              /* They have slipped one in as we were
> >               * doing that: check again. */
> >              vhost_disable_notify(vq);
> > +            retries++;
> >              continue;
> >           }
> 
> Hmm. The reason we have the code at all, as the comment says, is because
> guest could have added more buffers between the time we read last index
> and the time we enabled notification. So if we just break like this
> the race still exists. We could remember the
> last head value we observed, and have vhost_enable_notify check
> against this value?

        This is to prevent a spin loop in the case where we have some
buffers available, but not enough for the current packet (ie, this
is the replacement code for the "rxmaxheadcount" business). If they
actually added something new, retrying once should see it, but what
vhost_enable_notify() returns non-zero on is not "new buffers" but
rather "not empty". In the case mentioned, we aren't empty, so
vhost_enable_notify() returns nonzero every time, but the guest hasn't
given us enough buffers to proceed, so we continuously retry; this
code breaks the spin loop until we've really got new buffers from
the guest.

> 
> Need to think about it.
> 
> Another concern here is that on retries vhost_get_desc_n
> is doing extra work, rescanning the same descriptor
> again and again. Not sure how common this is, might be
> worthwhile to add a TODO to consider this at least.

        I had a printk in there to test the code and with the
retries counter, it happens when we fill the ring (once,
because of the retries checks), and then proceeds as
desired when the guest gives us more buffers. Without the
check, it spews until we hear from the guest.

> > @@ -261,14 +275,33 @@ static void handle_rx(struct vhost_net *
> >                  len, MSG_DONTWAIT | MSG_TRUNC);
> >        /* TODO: Check specific error and bomb out unless EAGAIN? */
> >        if (err < 0) {
> 
> I think we need to compare err and datalen and drop packet on mismatch 
as well.
> The check err > len won't be needed then.

        OK, but this is the original code, unchanged by my patch. :-)
> 
> > -         vhost_discard_vq_desc(vq);
> > +         vhost_discard_desc(vq, headcount);
> >           break;
> >        }
> >        /* TODO: Should check and handle checksum. */
> > +      if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> > +         struct virtio_net_hdr_mrg_rxbuf *vhdr =
> > +            (struct virtio_net_hdr_mrg_rxbuf *)
> > +            vq->iov[0].iov_base;
> > +         /* add num_buffers */
> > +         if (vhost_has_feature(&net->dev,
> > +                     VHOST_NET_F_VIRTIO_NET_HDR))
> > +            hdr.num_buffers = headcount;
> 
> Why is the above necessary?

        This adds mergeable buffer support for the raw case.
> 
> > +         else if (vq->iov[0].iov_len < sizeof(*vhdr)) {
> 
> I think length check is already done when we copy the header. No?

        This sets num_buffers for the tap case (where hdr_size is 0).
We don't copy any headers at all for this case, but we need to
add num_buffers at offset 10 in the user buffer already read by
recvmsg().

> 
> > +            vq_err(vq, "tiny buffers < %d unsupported",
> > +               vq->iov[0].iov_len);
> > +            vhost_discard_desc(vq, headcount);
> > +            break;
> 
> Problem here is that recvmsg might modify iov.
> This is why I think we need to use vq->hdr here.

        rcvmsg() can never modify the iovec; it's the
standard socket call. To use vq->hdr, we'd have to copy
it in from user space, modify it, and then copy it back
in to user space, on every packet. In the normal tap case,
hdr_size is 0 and we read it directly from the socket to
user space. It is already correct for mergeable buffers,
except for the num_buffers value added here.

> 
> > +         } else if (put_user(headcount, &vhdr->num_buffers)) {
> 
> The above put_user writes out a 32 bit value, right?
> This seems wrong.

        I'll recheck this -- I'll make the type of "headcount" be
the type of "num_buffers" in the MRXB vnet header, if it isn't
already.

> 
> How about using
>    memcpy_toiovecend(vq->hdr, &headcount,
>            offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
>            sizeof headcount);
> 
> this way we also do not make any assumptions about layout.

        Because this overwrites the valid vnet header we got from
the tap socket with a local copy that isn't correct. For this to
work, we first need to copy_from_user() the vnet header from the
socket into vq->hdr (on every packet).
        It doesn't assume anything here-- it requires (and checks)
that the user didn't give us a <12 byte buffer, which I think is
reasonable. That's about the size of the descriptor for the buffer,
and I'd call that a broken guest. The cost of supporting those
tiny buffers in the general case would be a copy_from_user() and
copy_to_user() of the vnet_hdr on every packet, which I think is
not worth it (and especially since I don't expect any guest is
ever going to give us a <12 byte buffer in the first place).


> > @@ -560,9 +593,14 @@ done:
> > 
> >  static int vhost_net_set_features(struct vhost_net *n, u64 features)
> >  {
> > -   size_t hdr_size = features & (1 << VHOST_NET_F_VIRTIO_NET_HDR) ?
> > -      sizeof(struct virtio_net_hdr) : 0;
> > +   size_t hdr_size = 0;
> >     int i;
> > +
> > +   if (features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)) {
> > +      hdr_size = sizeof(struct virtio_net_hdr);
> > +      if (features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> > +         hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> 
> My personal style for this would be:
>      if (!(features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)))
>       hdr_size = 0
>    else if (!(features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
>       hdr_size = sizeof(virtio_net_hdr);
>    else
>       hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> 
> which results in more symmetry and less nesting.

OK.

> 
> >     mutex_lock(&n->dev.mutex);
> >     if ((features & (1 << VHOST_F_LOG_ALL)) &&
> >         !vhost_log_access_ok(&n->dev)) {
> > diff -ruNp net-next-p0/drivers/vhost/vhost.c
> > net-next-v3/drivers/vhost/vhost.c
> > --- net-next-p0/drivers/vhost/vhost.c   2010-03-22 12:04:38.000000000
> > -0700
> > +++ net-next-v3/drivers/vhost/vhost.c   2010-04-06 12:57:51.000000000
> > -0700
> > @@ -856,6 +856,47 @@ static unsigned get_indirect(struct vhos
> >     return 0;
> >  }
> > 
> > +/* This is a multi-buffer version of vhost_get_vq_desc
> > + * @vq      - the relevant virtqueue
> > + * datalen   - data length we'll be reading
> > + * @iovcount   - returned count of io vectors we fill
> > + * @log      - vhost log
> > + * @log_num   - log offset
> > + *   returns number of buffer heads allocated, 0 on error
> 
> This is unusual. Let's return a negative error code on error.

        This is like malloc -- "0" is never a valid return value, and
we don't have actual error values to return and handle; they generate
vq_err() messages within the code itself. In all cases,
it is the count of buffers we allocated (which is 0 when we fail to
allocate). So, I don't agree with this, but can do it if you insist.

> 
> > + */
> > +int vhost_get_desc_n(struct vhost_virtqueue *vq, struct 
vring_used_elem
> > *heads,
> > +           int datalen, int *iovcount, struct vhost_log *log,
> > +           unsigned int *log_num)
> > +{
> > +   int out, in;
> > +   int seg = 0;      /* iov index */
> > +   int hc = 0;      /* head count */
> > +
> > +   while (datalen > 0) {
> > +      if (hc >= VHOST_NET_MAX_SG)
> > +         goto err;
> > +      heads[hc].id = vhost_get_desc(vq->dev, vq, vq->iov+seg,
> > +                     ARRAY_SIZE(vq->iov)-seg, &out,
> > +                     &in, log, log_num);
> > +      if (heads[hc].id == vq->num)
> > +         goto err;
> > +      if (out || in <= 0) {
> > +         vq_err(vq, "unexpected descriptor format for RX: "
> > +            "out %d, in %d\n", out, in);
> > +         goto err;
> > +      }
> > +      heads[hc].len = iov_length(vq->iov+seg, in);
> > +      datalen -= heads[hc].len;
> 
> This signed/unsigned mix makes me nervuous.
> Let's make datalen unsigned, add unsigned total_len, and
> while (datalen < total_len).
> 
> > +      hc++;
> > +      seg += in;
> > +   }
> > +   *iovcount = seg;
> > +   return hc;
> > +err:
> > +   vhost_discard_desc(vq, hc);
> > +   return 0;
> > +}
> > +
> >  /* This looks in the virtqueue and for the first available buffer, 
and
> > converts
> >   * it to an iovec for convenient access.  Since descriptors consist 
of
> > some
> >   * number of output then some number of input descriptors, it's
> > actually two
> > @@ -863,7 +904,7 @@ static unsigned get_indirect(struct vhos
> >   *
> >   * This function returns the descriptor number found, or vq->num 
(which
> >   * is never a valid descriptor number) if none was found. */
> > -unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct
> > vhost_virtqueue *vq,
> > +unsigned vhost_get_desc(struct vhost_dev *dev, struct vhost_virtqueue
> > *vq,
> >              struct iovec iov[], unsigned int iov_size,
> >              unsigned int *out_num, unsigned int *in_num,
> >              struct vhost_log *log, unsigned int *log_num)
> > @@ -981,31 +1022,42 @@ unsigned vhost_get_vq_desc(struct vhost_
> >  }
> > 
> >  /* Reverse the effect of vhost_get_vq_desc. Useful for error 
handling.
> > */
> > -void vhost_discard_vq_desc(struct vhost_virtqueue *vq)
> > +void vhost_discard_desc(struct vhost_virtqueue *vq, int n)
> >  {
> > -   vq->last_avail_idx--;
> > +   vq->last_avail_idx -= n;
> >  }
> > 
> >  /* After we've used one of their buffers, we tell them about it. 
We'll
> > then
> >   * want to notify the guest, using eventfd. */
> > -int vhost_add_used(struct vhost_virtqueue *vq, unsigned int head, int
> > len)
> > +int vhost_add_used(struct vhost_virtqueue *vq, struct vring_used_elem
> > *heads,
> > +         int count)
> 
> I think we are better off with vhost_add_used and vhost_add_used_n:
> the version with _n has a lot of extra complexity, and tx always
> adds them 1 by one.

        The external code only calls vhost_add_used_and_signal(), so I
split that. I can add a single variant of vhost_add_used() too, but
I was trying to avoid adding duplicate code and external interface.
I don't actually agree with splitting these; isn't it better to have
them going through the same code path whether it's single or multiple?
A bug in one would show up as an intermittent failure, and a change
to one requires changing both. I don't think the multiple should just
do the single repeatedly, either, since the multiple variant now can
do the work in one or two copy_to_user()'s, without a loop; so splitting
these just makes a special case for single to carry in code maintenance
with no benefit, I think.

> 
> >  {
> >     struct vring_used_elem *used;
> > +   int start, n;
> > +
> > +   if (count <= 0)
> > +      return -EINVAL;
> > 
> > -   /* The virtqueue contains a ring of used buffers.  Get a pointer 
to
> > the
> > -    * next entry in that used ring. */
> > -   used = &vq->used->ring[vq->last_used_idx % vq->num];
> > -   if (put_user(head, &used->id)) {
> > -      vq_err(vq, "Failed to write used id");
> > +   start = vq->last_used_idx % vq->num;
> > +   if (vq->num - start < count)
> > +      n = vq->num - start;
> > +   else
> > +      n = count;
> 
> use min?

        Sure.

> 
> > +   used = vq->used->ring + start;
> > +   if (copy_to_user(used, heads, sizeof(heads[0])*n)) {
> > +      vq_err(vq, "Failed to write used");
> >        return -EFAULT;
> >     }
> > -   if (put_user(len, &used->len)) {
> > -      vq_err(vq, "Failed to write used len");
> > -      return -EFAULT;
> > +   if (n < count) {   /* wrapped the ring */
> > +      used = vq->used->ring;
> > +      if (copy_to_user(used, heads+n, sizeof(heads[0])*(count-n))) {
> > +         vq_err(vq, "Failed to write used");
> > +         return -EFAULT;
> > +      }
> >     }
> >     /* Make sure buffer is written before we update index. */
> >     smp_wmb();
> > -   if (put_user(vq->last_used_idx + 1, &vq->used->idx)) {
> > +   if (put_user(vq->last_used_idx+count, &vq->used->idx)) {
> 
> I am a bit confused ... will this write a 32 or 16 bit value?
> count is 32 bit ... Maybe we are better off with
>   u16 idx = vq->last_used_idx + count
>   put_user(idx, &vq->used->idx)
>   vq->last_used_idx = idx

        How about a cast?:

        if (put_user((u16)(vq->last_used_idx+count), &vq->used->idx)) {


                                        +-DLS

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Michael S. Tsirkin April 7, 2010, 6:09 p.m. UTC | #4
On Wed, Apr 07, 2010 at 10:37:17AM -0700, David Stevens wrote:
> > 
> > Thanks!
> > There's some whitespace damage, are you sending with your new
> > sendmail setup? It seems to have worked for qemu patches ...
> 
>         Yes, I saw some line wraps in what I received, but I checked
> the original draft to be sure and they weren't there. Possibly from
> the relay; Sigh.
> 
> 
> > > @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
> > >        /* TODO: Check specific error and bomb out unless ENOBUFS? */
> > >        err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > >        if (unlikely(err < 0)) {
> > > -         vhost_discard_vq_desc(vq);
> > > -         tx_poll_start(net, sock);
> > > +         if (err == -EAGAIN) {
> > > +            vhost_discard_desc(vq, 1);
> > > +            tx_poll_start(net, sock);
> > > +         } else {
> > > +            vq_err(vq, "sendmsg: errno %d\n", -err);
> > > +            /* drop packet; do not discard/resend */
> > > +            vhost_add_used_and_signal(&net->dev, vq, head,
> > > +                       0);
> > 
> > vhost does not currently has a consistent error handling strategy:
> > if we drop packets, need to think which other errors should cause
> > packet drops.  I prefer to just call vq_err for now,
> > and have us look at handling segfaults etc in a consistent way
> > separately.
> 
> I had to add this to avoid an infinite loop when I wrote a bad
> packet on the socket. I agree error handling needs a better look,
> but retrying a bad packet continuously while dumping in the log
> is what it was doing when I hit an error before this code. Isn't
> this better, at least until a second look?
> 

Hmm, what do you mean 'continuously'? Don't we only try again
on next kick?

> > > +}
> > > +
> > 
> > I wonder whether it makes sense to check
> > skb_queue_empty(&sk->sk_receive_queue)
> > outside the lock, to reduce the cost of this call
> > on an empty queue (we know that it happens at least once
> > each time we exit the loop on rx)?
> 
>         I was looking at alternatives to adding the lock in the
> first place, but I found I couldn't measure a difference in the
> cost with and without the lock.
> 
> 
> > > +   int retries = 0;
> > >     size_t len, total_len = 0;
> > > -   int err;
> > > +   int err, headcount, datalen;
> > >     size_t hdr_size;
> > >     struct socket *sock = rcu_dereference(vq->private_data);
> > >     if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> > > @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
> > >     vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> > >        vq->log : NULL;
> > > 
> > > -   for (;;) {
> > > -      head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > -                ARRAY_SIZE(vq->iov),
> > > -                &out, &in,
> > > -                vq_log, &log);
> > > +   while ((datalen = vhost_head_len(sock->sk))) {
> > > +      headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> > > +                    vq_log, &log);
> > 
> > This looks like a bug, I think we need to pass
> > datalen + header size to vhost_get_desc_n.
> > Not sure how we know the header size that backend will use though.
> > Maybe just look at our features.
> 
>         Yes; we have hdr_size, so I can add it here. It'll be 0 for
> the cases where the backend and guest both have vnet header (either
> the regular or larger mergeable buffers one), but should be added
> in for future raw socket support.

So hdr_size is the wrong thing to add then.
We need to add non-zero value for tap now.

> > 
> > >        /* OK, now we need to know about added descriptors. */
> > > -      if (head == vq->num) {
> > > -         if (unlikely(vhost_enable_notify(vq))) {
> > > +      if (!headcount) {
> > > +         if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
> > >              /* They have slipped one in as we were
> > >               * doing that: check again. */
> > >              vhost_disable_notify(vq);
> > > +            retries++;
> > >              continue;
> > >           }
> > 
> > Hmm. The reason we have the code at all, as the comment says, is because
> > guest could have added more buffers between the time we read last index
> > and the time we enabled notification. So if we just break like this
> > the race still exists. We could remember the
> > last head value we observed, and have vhost_enable_notify check
> > against this value?
> 
>         This is to prevent a spin loop in the case where we have some
> buffers available, but not enough for the current packet (ie, this
> is the replacement code for the "rxmaxheadcount" business). If they
> actually added something new, retrying once should see it, but what
> vhost_enable_notify() returns non-zero on is not "new buffers" but
> rather "not empty". In the case mentioned, we aren't empty, so
> vhost_enable_notify() returns nonzero every time, but the guest hasn't
> given us enough buffers to proceed, so we continuously retry; this
> code breaks the spin loop until we've really got new buffers from
> the guest.

What about the race I point out above? It seems to potentially
cause a deadlock.

> > 
> > Need to think about it.
> > 
> > Another concern here is that on retries vhost_get_desc_n
> > is doing extra work, rescanning the same descriptor
> > again and again. Not sure how common this is, might be
> > worthwhile to add a TODO to consider this at least.
> 
>         I had a printk in there to test the code and with the
> retries counter, it happens when we fill the ring (once,
> because of the retries checks), and then proceeds as
> desired when the guest gives us more buffers. Without the
> check, it spews until we hear from the guest.

I don't understand whether what you write above means 'yes this happens
and is worth optimizing for' or 'this happens very rarely
and is not worth optimizing for'.

> > > @@ -261,14 +275,33 @@ static void handle_rx(struct vhost_net *
> > >                  len, MSG_DONTWAIT | MSG_TRUNC);
> > >        /* TODO: Check specific error and bomb out unless EAGAIN? */
> > >        if (err < 0) {
> > 
> > I think we need to compare err and datalen and drop packet on mismatch 
> as well.
> > The check err > len won't be needed then.
> 
>         OK, but this is the original code, unchanged by my patch. :-)

Right. original code does not know the datalen.

> > > -         vhost_discard_vq_desc(vq);
> > > +         vhost_discard_desc(vq, headcount);
> > >           break;
> > >        }
> > >        /* TODO: Should check and handle checksum. */
> > > +      if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> > > +         struct virtio_net_hdr_mrg_rxbuf *vhdr =
> > > +            (struct virtio_net_hdr_mrg_rxbuf *)
> > > +            vq->iov[0].iov_base;
> > > +         /* add num_buffers */
> > > +         if (vhost_has_feature(&net->dev,
> > > +                     VHOST_NET_F_VIRTIO_NET_HDR))
> > > +            hdr.num_buffers = headcount;
> > 
> > Why is the above necessary?
> 
>         This adds mergeable buffer support for the raw case.

So my suggestion is to have a copy of the header
and then same code will fill in the field for
both raw and tap.

> > 
> > > +         else if (vq->iov[0].iov_len < sizeof(*vhdr)) {
> > 
> > I think length check is already done when we copy the header. No?
> 
>         This sets num_buffers for the tap case (where hdr_size is 0).

Ah, right. So let's just check the total length is > sizeof(*vhdr).

> We don't copy any headers at all for this case, but we need to
> add num_buffers at offset 10 in the user buffer already read by
> recvmsg().
> 
> > 
> > > +            vq_err(vq, "tiny buffers < %d unsupported",
> > > +               vq->iov[0].iov_len);
> > > +            vhost_discard_desc(vq, headcount);
> > > +            break;
> > 
> > Problem here is that recvmsg might modify iov.
> > This is why I think we need to use vq->hdr here.
> 
>         rcvmsg() can never modify the iovec;


Look at af_packet code for an example where it does.
The pointer is non-const, it's OK to modify.
tap used to modify iovec as well, the fact it doesn't
now is a side-effect of reusing same code for
recvmsg and aio read.

> it's the standard socket call.

It's an internal API that is used to implement the call.
iovec it gets is a kernel side copy of what user passed in.

> To use vq->hdr, we'd have to copy
> it in from user space, modify it, and then copy it back
> in to user space, on every packet. In the normal tap case,
> hdr_size is 0 and we read it directly from the socket to
> user space. It is already correct for mergeable buffers,
> except for the num_buffers value added here.


I don't understand what you write above, sorry.
We have iov, all we need is store the first address+length
in the hdr field.
This should be close to free and does not involve any copies
from/to userspace. All the branching and special-casing
is possibly more expensive.


> > 
> > > +         } else if (put_user(headcount, &vhdr->num_buffers)) {
> > 
> > The above put_user writes out a 32 bit value, right?
> > This seems wrong.
> 
>         I'll recheck this -- I'll make the type of "headcount" be
> the type of "num_buffers" in the MRXB vnet header, if it isn't
> already.

No, I was confused Sorry about the noise.

> > 
> > How about using
> >    memcpy_toiovecend(vq->hdr, &headcount,
> >            offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
> >            sizeof headcount);
> > 
> > this way we also do not make any assumptions about layout.
> 
>         Because this overwrites the valid vnet header we got from
> the tap socket with a local copy that isn't correct.

It does? I think that this only writes out 2 bytes at an offset.

> For this to
> work, we first need to copy_from_user() the vnet header from the
> socket into vq->hdr (on every packet).

copy_from_user from vnet header to vq->hdr does not
seem to make any sense and is not what I suggested.

>         It doesn't assume anything here-- it requires (and checks)
> that the user didn't give us a <12 byte buffer, which I think is
> reasonable.
> That's about the size of the descriptor for the buffer,
> and I'd call that a broken guest.

I think it's better not to assume this. virtio spec does
mentions layout assumptions as legacy limitations.
Guest can post descriptor consisting of 2 buffers:
1. 10 bytes. 2. 2 bytes. and I don't see a reason
not to support this unless this makes code simpler.
In this case code is more complex.


> The cost of supporting those
> tiny buffers in the general case would be a copy_from_user() and
> copy_to_user() of the vnet_hdr on every packet, which I think is
> not worth it (and especially since I don't expect any guest is
> ever going to give us a <12 byte buffer in the first place).

You keep saying this, but I do not see any need for extra
copy_to_user. Just use memcpy_toiovecend instead of put_user.


> > > @@ -560,9 +593,14 @@ done:
> > > 
> > >  static int vhost_net_set_features(struct vhost_net *n, u64 features)
> > >  {
> > > -   size_t hdr_size = features & (1 << VHOST_NET_F_VIRTIO_NET_HDR) ?
> > > -      sizeof(struct virtio_net_hdr) : 0;
> > > +   size_t hdr_size = 0;
> > >     int i;
> > > +
> > > +   if (features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)) {
> > > +      hdr_size = sizeof(struct virtio_net_hdr);
> > > +      if (features & (1 << VIRTIO_NET_F_MRG_RXBUF))
> > > +         hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> > 
> > My personal style for this would be:
> >      if (!(features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)))
> >       hdr_size = 0
> >    else if (!(features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
> >       hdr_size = sizeof(virtio_net_hdr);
> >    else
> >       hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
> > 
> > which results in more symmetry and less nesting.
> 
> OK.
> 
> > 
> > >     mutex_lock(&n->dev.mutex);
> > >     if ((features & (1 << VHOST_F_LOG_ALL)) &&
> > >         !vhost_log_access_ok(&n->dev)) {
> > > diff -ruNp net-next-p0/drivers/vhost/vhost.c
> > > net-next-v3/drivers/vhost/vhost.c
> > > --- net-next-p0/drivers/vhost/vhost.c   2010-03-22 12:04:38.000000000
> > > -0700
> > > +++ net-next-v3/drivers/vhost/vhost.c   2010-04-06 12:57:51.000000000
> > > -0700
> > > @@ -856,6 +856,47 @@ static unsigned get_indirect(struct vhos
> > >     return 0;
> > >  }
> > > 
> > > +/* This is a multi-buffer version of vhost_get_vq_desc
> > > + * @vq      - the relevant virtqueue
> > > + * datalen   - data length we'll be reading
> > > + * @iovcount   - returned count of io vectors we fill
> > > + * @log      - vhost log
> > > + * @log_num   - log offset
> > > + *   returns number of buffer heads allocated, 0 on error
> > 
> > This is unusual. Let's return a negative error code on error.
> 
>         This is like malloc -- "0" is never a valid return value, and
> we don't have actual error values to return and handle; they generate
> vq_err() messages within the code itself. In all cases,
> it is the count of buffers we allocated (which is 0 when we fail to
> allocate). So, I don't agree with this, but can do it if you insist.

malloc returns NULL, not zero.
standard error handling approaches are:
- NULL on error
- ERR_PTR on error
- >= 0 for success, -errno for error
- true for success false for error

And of course
- some custom strategy so you need to read documentation to figure out
  how to use a function
which is where this one gets classified.
So I am not saying 0 won't work, just that standard stuff is better.

> > 
> > > + */
> > > +int vhost_get_desc_n(struct vhost_virtqueue *vq, struct 
> vring_used_elem
> > > *heads,
> > > +           int datalen, int *iovcount, struct vhost_log *log,
> > > +           unsigned int *log_num)
> > > +{
> > > +   int out, in;
> > > +   int seg = 0;      /* iov index */
> > > +   int hc = 0;      /* head count */
> > > +
> > > +   while (datalen > 0) {
> > > +      if (hc >= VHOST_NET_MAX_SG)
> > > +         goto err;
> > > +      heads[hc].id = vhost_get_desc(vq->dev, vq, vq->iov+seg,
> > > +                     ARRAY_SIZE(vq->iov)-seg, &out,
> > > +                     &in, log, log_num);
> > > +      if (heads[hc].id == vq->num)
> > > +         goto err;
> > > +      if (out || in <= 0) {
> > > +         vq_err(vq, "unexpected descriptor format for RX: "
> > > +            "out %d, in %d\n", out, in);
> > > +         goto err;
> > > +      }
> > > +      heads[hc].len = iov_length(vq->iov+seg, in);
> > > +      datalen -= heads[hc].len;
> > 
> > This signed/unsigned mix makes me nervuous.
> > Let's make datalen unsigned, add unsigned total_len, and
> > while (datalen < total_len).
> > 
> > > +      hc++;
> > > +      seg += in;
> > > +   }
> > > +   *iovcount = seg;
> > > +   return hc;
> > > +err:
> > > +   vhost_discard_desc(vq, hc);
> > > +   return 0;
> > > +}
> > > +
> > >  /* This looks in the virtqueue and for the first available buffer, 
> and
> > > converts
> > >   * it to an iovec for convenient access.  Since descriptors consist 
> of
> > > some
> > >   * number of output then some number of input descriptors, it's
> > > actually two
> > > @@ -863,7 +904,7 @@ static unsigned get_indirect(struct vhos
> > >   *
> > >   * This function returns the descriptor number found, or vq->num 
> (which
> > >   * is never a valid descriptor number) if none was found. */
> > > -unsigned vhost_get_vq_desc(struct vhost_dev *dev, struct
> > > vhost_virtqueue *vq,
> > > +unsigned vhost_get_desc(struct vhost_dev *dev, struct vhost_virtqueue
> > > *vq,
> > >              struct iovec iov[], unsigned int iov_size,
> > >              unsigned int *out_num, unsigned int *in_num,
> > >              struct vhost_log *log, unsigned int *log_num)
> > > @@ -981,31 +1022,42 @@ unsigned vhost_get_vq_desc(struct vhost_
> > >  }
> > > 
> > >  /* Reverse the effect of vhost_get_vq_desc. Useful for error 
> handling.
> > > */
> > > -void vhost_discard_vq_desc(struct vhost_virtqueue *vq)
> > > +void vhost_discard_desc(struct vhost_virtqueue *vq, int n)
> > >  {
> > > -   vq->last_avail_idx--;
> > > +   vq->last_avail_idx -= n;
> > >  }
> > > 
> > >  /* After we've used one of their buffers, we tell them about it. 
> We'll
> > > then
> > >   * want to notify the guest, using eventfd. */
> > > -int vhost_add_used(struct vhost_virtqueue *vq, unsigned int head, int
> > > len)
> > > +int vhost_add_used(struct vhost_virtqueue *vq, struct vring_used_elem
> > > *heads,
> > > +         int count)
> > 
> > I think we are better off with vhost_add_used and vhost_add_used_n:
> > the version with _n has a lot of extra complexity, and tx always
> > adds them 1 by one.
> 
>         The external code only calls vhost_add_used_and_signal(), so I
> split that. I can add a single variant of vhost_add_used() too, but
> I was trying to avoid adding duplicate code and external interface.
> I don't actually agree with splitting these; isn't it better to have
> them going through the same code path whether it's single or multiple?
> A bug in one would show up as an intermittent failure, and a change
> to one requires changing both.

It's a small function so I am not really worried about maintainance.

> I don't think the multiple should just
> do the single repeatedly, either, since the multiple variant now can
> do the work in one or two copy_to_user()'s, without a loop;

Yes.

> so splitting
> these just makes a special case for single to carry in code maintenance
> with no benefit, I think.

I donnu, I have a feeling all these loops and memory accesses
on data path won't be free, and for tx this just adds overhead.

> > 
> > >  {
> > >     struct vring_used_elem *used;
> > > +   int start, n;
> > > +
> > > +   if (count <= 0)
> > > +      return -EINVAL;
> > > 
> > > -   /* The virtqueue contains a ring of used buffers.  Get a pointer 
> to
> > > the
> > > -    * next entry in that used ring. */
> > > -   used = &vq->used->ring[vq->last_used_idx % vq->num];
> > > -   if (put_user(head, &used->id)) {
> > > -      vq_err(vq, "Failed to write used id");
> > > +   start = vq->last_used_idx % vq->num;
> > > +   if (vq->num - start < count)
> > > +      n = vq->num - start;
> > > +   else
> > > +      n = count;
> > 
> > use min?
> 
>         Sure.
> 
> > 
> > > +   used = vq->used->ring + start;
> > > +   if (copy_to_user(used, heads, sizeof(heads[0])*n)) {
> > > +      vq_err(vq, "Failed to write used");
> > >        return -EFAULT;
> > >     }
> > > -   if (put_user(len, &used->len)) {
> > > -      vq_err(vq, "Failed to write used len");
> > > -      return -EFAULT;
> > > +   if (n < count) {   /* wrapped the ring */
> > > +      used = vq->used->ring;
> > > +      if (copy_to_user(used, heads+n, sizeof(heads[0])*(count-n))) {
> > > +         vq_err(vq, "Failed to write used");
> > > +         return -EFAULT;
> > > +      }
> > >     }
> > >     /* Make sure buffer is written before we update index. */
> > >     smp_wmb();
> > > -   if (put_user(vq->last_used_idx + 1, &vq->used->idx)) {
> > > +   if (put_user(vq->last_used_idx+count, &vq->used->idx)) {
> > 
> > I am a bit confused ... will this write a 32 or 16 bit value?
> > count is 32 bit ... Maybe we are better off with
> >   u16 idx = vq->last_used_idx + count
> >   put_user(idx, &vq->used->idx)
> >   vq->last_used_idx = idx
> 
>         How about a cast?:
> 
>         if (put_user((u16)(vq->last_used_idx+count), &vq->used->idx)) {
> 
> 
>                                         +-DLS

No, I was confused. Code is ok (just whitespace damaged).

--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
David Stevens April 7, 2010, 9:07 p.m. UTC | #5
kvm-owner@vger.kernel.org wrote on 04/07/2010 11:09:30 AM:

> On Wed, Apr 07, 2010 at 10:37:17AM -0700, David Stevens wrote:
> > > 
> > > Thanks!
> > > There's some whitespace damage, are you sending with your new
> > > sendmail setup? It seems to have worked for qemu patches ...
> > 
> >         Yes, I saw some line wraps in what I received, but I checked
> > the original draft to be sure and they weren't there. Possibly from
> > the relay; Sigh.
> > 
> > 
> > > > @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
> > > >        /* TODO: Check specific error and bomb out unless ENOBUFS? 
*/
> > > >        err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > > >        if (unlikely(err < 0)) {
> > > > -         vhost_discard_vq_desc(vq);
> > > > -         tx_poll_start(net, sock);
> > > > +         if (err == -EAGAIN) {
> > > > +            vhost_discard_desc(vq, 1);
> > > > +            tx_poll_start(net, sock);
> > > > +         } else {
> > > > +            vq_err(vq, "sendmsg: errno %d\n", -err);
> > > > +            /* drop packet; do not discard/resend */
> > > > +            vhost_add_used_and_signal(&net->dev, vq, head,
> > > > +                       0);
> > > 
> > > vhost does not currently has a consistent error handling strategy:
> > > if we drop packets, need to think which other errors should cause
> > > packet drops.  I prefer to just call vq_err for now,
> > > and have us look at handling segfaults etc in a consistent way
> > > separately.
> > 
> > I had to add this to avoid an infinite loop when I wrote a bad
> > packet on the socket. I agree error handling needs a better look,
> > but retrying a bad packet continuously while dumping in the log
> > is what it was doing when I hit an error before this code. Isn't
> > this better, at least until a second look?
> > 
> 
> Hmm, what do you mean 'continuously'? Don't we only try again
> on next kick?

        If the packet is corrupt (in my case, a missing vnet header
during testing), every send will fail and we never make progress.
I had thousands of error messages in the log (for the same packet)
before I added this code. If the problem is with the packet,
retrying the same one as the original code does will never recover.
        This isn't required for mergeable rx buffer support, so I
can certainly remove it from this patch, but I think the original
error handling doesn't handle a single corrupted packet very
gracefully.


> > > > @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
> > > >     vq_log = unlikely(vhost_has_feature(&net->dev, 
VHOST_F_LOG_ALL)) ?
> > > >        vq->log : NULL;
> > > > 
> > > > -   for (;;) {
> > > > -      head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > > -                ARRAY_SIZE(vq->iov),
> > > > -                &out, &in,
> > > > -                vq_log, &log);
> > > > +   while ((datalen = vhost_head_len(sock->sk))) {
> > > > +      headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> > > > +                    vq_log, &log);
> > > 
> > > This looks like a bug, I think we need to pass
> > > datalen + header size to vhost_get_desc_n.
> > > Not sure how we know the header size that backend will use though.
> > > Maybe just look at our features.
> > 
> >         Yes; we have hdr_size, so I can add it here. It'll be 0 for
> > the cases where the backend and guest both have vnet header (either
> > the regular or larger mergeable buffers one), but should be added
> > in for future raw socket support.
> 
> So hdr_size is the wrong thing to add then.
> We need to add non-zero value for tap now.

        datalen includes the vnet_hdr in the tap case, so we don't need
a non-zero hdr_size. The socket data has the entire packet and vnet_hdr
and that length is what we're getting from vhost_head_len().

> 
> > > 
> > > >        /* OK, now we need to know about added descriptors. */
> > > > -      if (head == vq->num) {
> > > > -         if (unlikely(vhost_enable_notify(vq))) {
> > > > +      if (!headcount) {
> > > > +         if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
> > > >              /* They have slipped one in as we were
> > > >               * doing that: check again. */
> > > >              vhost_disable_notify(vq);
> > > > +            retries++;
> > > >              continue;
> > > >           }
> > > 
> > > Hmm. The reason we have the code at all, as the comment says, is 
because
> > > guest could have added more buffers between the time we read last 
index
> > > and the time we enabled notification. So if we just break like this
> > > the race still exists. We could remember the
> > > last head value we observed, and have vhost_enable_notify check
> > > against this value?
> > 
> >         This is to prevent a spin loop in the case where we have some
> > buffers available, but not enough for the current packet (ie, this
> > is the replacement code for the "rxmaxheadcount" business). If they
> > actually added something new, retrying once should see it, but what
> > vhost_enable_notify() returns non-zero on is not "new buffers" but
> > rather "not empty". In the case mentioned, we aren't empty, so
> > vhost_enable_notify() returns nonzero every time, but the guest hasn't
> > given us enough buffers to proceed, so we continuously retry; this
> > code breaks the spin loop until we've really got new buffers from
> > the guest.
> 
> What about the race I point out above? It seems to potentially
> cause a deadlock.

        It certainly handles a single race, since the code is identical
when retries==0. I was assuming that a real update would always get us
enough buffers, so could not happen twice in a row. The false case of
having 1 buffer when we need 3, say, would return nonzero every time,
so this code would detect that and break that loop; never hang if a
real guest update gives us a full ring.
        If we think we've exhausted the ring and a guest update gives us
a couple buffers, but not the complete ring (which is what it does in
practice), then we'd still have a race. In that case, we should be
comparing avail_idx with itself across multiple calls, rather than
last_avail_idx (which is only updated when we post a new packet).
        I'll look at this some more. With the guest I have, this code
will always work, but I don't know that the guest is required to fill
the ring, which is what this assumes.

> 
> > > 
> > > Need to think about it.
> > > 
> > > Another concern here is that on retries vhost_get_desc_n
> > > is doing extra work, rescanning the same descriptor
> > > again and again. Not sure how common this is, might be
> > > worthwhile to add a TODO to consider this at least.
> > 
> >         I had a printk in there to test the code and with the
> > retries counter, it happens when we fill the ring (once,
> > because of the retries checks), and then proceeds as
> > desired when the guest gives us more buffers. Without the
> > check, it spews until we hear from the guest.
> 
> I don't understand whether what you write above means 'yes this happens
> and is worth optimizing for' or 'this happens very rarely
> and is not worth optimizing for'.

        I'm saying "no", even with high load, we don't hit the
retries==1 very often with the new code. It only happens when the ring
is completely full. In that case, with the old code, we would spin until 
we
hear from the guest (tens of thousands of times); with the new code,
we hit it once when the ring is full and wait for the guest to give
us more buffers; then we proceed.
        With the new code in a test run of tens of millions of packets,
I got maybe 5 or 6 times where we exhausted the ring and waited, but
with the old code, we did enable/disable tens of thousands of times
because the ring wasn't completely empty and we were spinning until
we got new buffers from the guest.


> > > > -         vhost_discard_vq_desc(vq);
> > > > +         vhost_discard_desc(vq, headcount);
> > > >           break;
> > > >        }
> > > >        /* TODO: Should check and handle checksum. */
> > > > +      if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> > > > +         struct virtio_net_hdr_mrg_rxbuf *vhdr =
> > > > +            (struct virtio_net_hdr_mrg_rxbuf *)
> > > > +            vq->iov[0].iov_base;
> > > > +         /* add num_buffers */
> > > > +         if (vhost_has_feature(&net->dev,
> > > > +                     VHOST_NET_F_VIRTIO_NET_HDR))
> > > > +            hdr.num_buffers = headcount;
> > > 
> > > Why is the above necessary?
> > 
> >         This adds mergeable buffer support for the raw case.
> 
> So my suggestion is to have a copy of the header
> and then same code will fill in the field for
> both raw and tap.

        The recvmsg() has written the vnethdr into the user
buffer in the tap case. We don't have an in-kernel copy of it at
all (hdr_size == 0) in vhost. In the raw case, recvmsg isn't writing it
to the user buffer and then we do have the local copy in hdr.
        So the code updates num_buffer (only) in user space in
the tap case, and does the necessary copy of the entire header
in the raw case. I don't think a double copy of the entire vnet_hdr
on every packet is a good idea in the tap case, and the simple
assignment with the existing copy_to_user() of the header is
cheaper than double-copying num_buffers in the raw case. So, I
do think this code is best. It could use hdr_size instead of
the (in-line) feature-bit check, but there are no unnecessary
copies as-is, and I think there would be if we don't use the
two separate ways of updating num_buffers.

> > 
> > > 
> > > > +            vq_err(vq, "tiny buffers < %d unsupported",
> > > > +               vq->iov[0].iov_len);
> > > > +            vhost_discard_desc(vq, headcount);
> > > > +            break;
> > > 
> > > Problem here is that recvmsg might modify iov.
> > > This is why I think we need to use vq->hdr here.
> > 
> >         rcvmsg() can never modify the iovec;
> 
> 
> Look at af_packet code for an example where it does.
> The pointer is non-const, it's OK to modify.
> tap used to modify iovec as well, the fact it doesn't
> now is a side-effect of reusing same code for
> recvmsg and aio read.

        OK, even assuming it can, the check is done after
the recvmsg -- it can't change between the length check
and the put_user() of num_buffer, so I'm not sure what
your concern is here. Are you saying that vq->iov may
be trashed so that it no longer points to the ring buffer?
        What I need is to write the num_buffer value at
offset 10 in the userspace ring buffer after the recvmsg().
If the iovec has been modified, I'm using the modified.
If you're saying that it may be garbage, then your suggestion
is that I save the pointer to offset 10 of the ring buffer
before the call to recvmsg so I can update it after?

> > To use vq->hdr, we'd have to copy
> > it in from user space, modify it, and then copy it back
> > in to user space, on every packet. In the normal tap case,
> > hdr_size is 0 and we read it directly from the socket to
> > user space. It is already correct for mergeable buffers,
> > except for the num_buffers value added here.
> 
> 
> I don't understand what you write above, sorry.
> We have iov, all we need is store the first address+length
> in the hdr field.

Sorry, in that discussion, I was confusing "hdr" with vq->hdr.
In the tap case, hdr_size is 0 and we have nothing in vq->hdr.
As discussed before, if you mean updating a local copy of the
header, we don't have a local copy of the header -- recvmsg()
has written it directly to the user buffer. To get a local
copy, we'd need to either copy_from_user() out of the ring or
get it from recvmsg() by changing the iovec and then later
do an extra copy of it to get it in the user buffer where we
need it.

> 
> > > 
> > > How about using
> > >    memcpy_toiovecend(vq->hdr, &headcount,
> > >            offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
> > >            sizeof headcount);
> > > 
> > > this way we also do not make any assumptions about layout.
> > 
> >         Because this overwrites the valid vnet header we got from
> > the tap socket with a local copy that isn't correct.
> 
> It does? I think that this only writes out 2 bytes at an offset.

        Oh, sorry, I misread. vq->hdr doesn't have anything in it in
the tap case, but something like this may eliminate the need to check
iov_len > sizeof(*vhdr); will investigate.


                                                        +-DLS


--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Michael S. Tsirkin April 8, 2010, 8:35 a.m. UTC | #6
On Wed, Apr 07, 2010 at 02:07:18PM -0700, David Stevens wrote:
> kvm-owner@vger.kernel.org wrote on 04/07/2010 11:09:30 AM:
> 
> > On Wed, Apr 07, 2010 at 10:37:17AM -0700, David Stevens wrote:
> > > > 
> > > > Thanks!
> > > > There's some whitespace damage, are you sending with your new
> > > > sendmail setup? It seems to have worked for qemu patches ...
> > > 
> > >         Yes, I saw some line wraps in what I received, but I checked
> > > the original draft to be sure and they weren't there. Possibly from
> > > the relay; Sigh.
> > > 
> > > 
> > > > > @@ -167,8 +166,15 @@ static void handle_tx(struct vhost_net *
> > > > >        /* TODO: Check specific error and bomb out unless ENOBUFS? 
> */
> > > > >        err = sock->ops->sendmsg(NULL, sock, &msg, len);
> > > > >        if (unlikely(err < 0)) {
> > > > > -         vhost_discard_vq_desc(vq);
> > > > > -         tx_poll_start(net, sock);
> > > > > +         if (err == -EAGAIN) {
> > > > > +            vhost_discard_desc(vq, 1);
> > > > > +            tx_poll_start(net, sock);
> > > > > +         } else {
> > > > > +            vq_err(vq, "sendmsg: errno %d\n", -err);
> > > > > +            /* drop packet; do not discard/resend */
> > > > > +            vhost_add_used_and_signal(&net->dev, vq, head,
> > > > > +                       0);
> > > > 
> > > > vhost does not currently has a consistent error handling strategy:
> > > > if we drop packets, need to think which other errors should cause
> > > > packet drops.  I prefer to just call vq_err for now,
> > > > and have us look at handling segfaults etc in a consistent way
> > > > separately.
> > > 
> > > I had to add this to avoid an infinite loop when I wrote a bad
> > > packet on the socket. I agree error handling needs a better look,
> > > but retrying a bad packet continuously while dumping in the log
> > > is what it was doing when I hit an error before this code. Isn't
> > > this better, at least until a second look?
> > > 
> > 
> > Hmm, what do you mean 'continuously'? Don't we only try again
> > on next kick?
> 
>         If the packet is corrupt (in my case, a missing vnet header
> during testing), every send will fail and we never make progress.
> I had thousands of error messages in the log (for the same packet)
> before I added this code.

Hmm, we do not want a buggy guest to be able to fill
host logs. This is only if debugging is enabled though, right?
We can also rate-limit the errors.

> If the problem is with the packet,
> retrying the same one as the original code does will never recover.
>         This isn't required for mergeable rx buffer support, so I
> can certainly remove it from this patch, but I think the original
> error handling doesn't handle a single corrupted packet very
> gracefully.
> 

An approach I considered was to have qemu poll vq_err fd
and stop the device when an error is seen. My concern with
dropping a tx packet is that it would make debugging
very difficult.


> > > > > @@ -222,31 +242,25 @@ static void handle_rx(struct vhost_net *
> > > > >     vq_log = unlikely(vhost_has_feature(&net->dev, 
> VHOST_F_LOG_ALL)) ?
> > > > >        vq->log : NULL;
> > > > > 
> > > > > -   for (;;) {
> > > > > -      head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> > > > > -                ARRAY_SIZE(vq->iov),
> > > > > -                &out, &in,
> > > > > -                vq_log, &log);
> > > > > +   while ((datalen = vhost_head_len(sock->sk))) {
> > > > > +      headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
> > > > > +                    vq_log, &log);
> > > > 
> > > > This looks like a bug, I think we need to pass
> > > > datalen + header size to vhost_get_desc_n.
> > > > Not sure how we know the header size that backend will use though.
> > > > Maybe just look at our features.
> > > 
> > >         Yes; we have hdr_size, so I can add it here. It'll be 0 for
> > > the cases where the backend and guest both have vnet header (either
> > > the regular or larger mergeable buffers one), but should be added
> > > in for future raw socket support.
> > 
> > So hdr_size is the wrong thing to add then.
> > We need to add non-zero value for tap now.
> 
>         datalen includes the vnet_hdr in the tap case, so we don't need
> a non-zero hdr_size. The socket data has the entire packet and vnet_hdr
> and that length is what we're getting from vhost_head_len().

I only see vhost_head_len returning skb->len. You are sure skb->len
includes vnet_hdr for tap rx?

> > 
> > > > 
> > > > >        /* OK, now we need to know about added descriptors. */
> > > > > -      if (head == vq->num) {
> > > > > -         if (unlikely(vhost_enable_notify(vq))) {
> > > > > +      if (!headcount) {
> > > > > +         if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
> > > > >              /* They have slipped one in as we were
> > > > >               * doing that: check again. */
> > > > >              vhost_disable_notify(vq);
> > > > > +            retries++;
> > > > >              continue;
> > > > >           }
> > > > 
> > > > Hmm. The reason we have the code at all, as the comment says, is 
> because
> > > > guest could have added more buffers between the time we read last 
> index
> > > > and the time we enabled notification. So if we just break like this
> > > > the race still exists. We could remember the
> > > > last head value we observed, and have vhost_enable_notify check
> > > > against this value?
> > > 
> > >         This is to prevent a spin loop in the case where we have some
> > > buffers available, but not enough for the current packet (ie, this
> > > is the replacement code for the "rxmaxheadcount" business). If they
> > > actually added something new, retrying once should see it, but what
> > > vhost_enable_notify() returns non-zero on is not "new buffers" but
> > > rather "not empty". In the case mentioned, we aren't empty, so
> > > vhost_enable_notify() returns nonzero every time, but the guest hasn't
> > > given us enough buffers to proceed, so we continuously retry; this
> > > code breaks the spin loop until we've really got new buffers from
> > > the guest.
> > 
> > What about the race I point out above? It seems to potentially
> > cause a deadlock.
> 
>         It certainly handles a single race, since the code is identical
> when retries==0. I was assuming that a real update would always get us
> enough buffers, so could not happen twice in a row. The false case of
> having 1 buffer when we need 3, say, would return nonzero every time,
> so this code would detect that and break that loop; never hang if a
> real guest update gives us a full ring.
>         If we think we've exhausted the ring and a guest update gives us
> a couple buffers, but not the complete ring (which is what it does in
> practice), then we'd still have a race. In that case, we should be
> comparing avail_idx with itself across multiple calls, rather than
> last_avail_idx (which is only updated when we post a new packet).
>         I'll look at this some more. With the guest I have, this code
> will always work, but I don't know that the guest is required to fill
> the ring, which is what this assumes.

I think it is legal for a guest to add buffers one by one.
My suggesting for handling this is to cache last value of available
index we have seen so that vhost_enable_notify returns
true if it changed meanwhile.

Care needs to be taken when index is changed with an ioctl.

> > 
> > > > 
> > > > Need to think about it.
> > > > 
> > > > Another concern here is that on retries vhost_get_desc_n
> > > > is doing extra work, rescanning the same descriptor
> > > > again and again. Not sure how common this is, might be
> > > > worthwhile to add a TODO to consider this at least.
> > > 
> > >         I had a printk in there to test the code and with the
> > > retries counter, it happens when we fill the ring (once,
> > > because of the retries checks), and then proceeds as
> > > desired when the guest gives us more buffers. Without the
> > > check, it spews until we hear from the guest.
> > 
> > I don't understand whether what you write above means 'yes this happens
> > and is worth optimizing for' or 'this happens very rarely
> > and is not worth optimizing for'.
> 
>         I'm saying "no",

OK then.

> even with high load, we don't hit the
> retries==1 very often with the new code. It only happens when the ring
> is completely full. In that case, with the old code, we would spin until 
> we
> hear from the guest (tens of thousands of times); with the new code,
> we hit it once when the ring is full and wait for the guest to give
> us more buffers; then we proceed.
>         With the new code in a test run of tens of millions of packets,
> I got maybe 5 or 6 times where we exhausted the ring and waited, but
> with the old code, we did enable/disable tens of thousands of times
> because the ring wasn't completely empty and we were spinning until
> we got new buffers from the guest.
> 

I presume under old code you mean some previous version of
merg. buffers patch? I don't see where current upstream would spin
waiting for guest (because a single descriptor is always
enough to fit a packet in), if we have this must fix ASAP.

> > > > > -         vhost_discard_vq_desc(vq);
> > > > > +         vhost_discard_desc(vq, headcount);
> > > > >           break;
> > > > >        }
> > > > >        /* TODO: Should check and handle checksum. */
> > > > > +      if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
> > > > > +         struct virtio_net_hdr_mrg_rxbuf *vhdr =
> > > > > +            (struct virtio_net_hdr_mrg_rxbuf *)
> > > > > +            vq->iov[0].iov_base;
> > > > > +         /* add num_buffers */
> > > > > +         if (vhost_has_feature(&net->dev,
> > > > > +                     VHOST_NET_F_VIRTIO_NET_HDR))
> > > > > +            hdr.num_buffers = headcount;
> > > > 
> > > > Why is the above necessary?
> > > 
> > >         This adds mergeable buffer support for the raw case.
> > 
> > So my suggestion is to have a copy of the header
> > and then same code will fill in the field for
> > both raw and tap.
> 
>         The recvmsg() has written the vnethdr into the user
> buffer in the tap case. We don't have an in-kernel copy of it at
> all (hdr_size == 0) in vhost. In the raw case, recvmsg isn't writing it
> to the user buffer and then we do have the local copy in hdr.
>         So the code updates num_buffer (only) in user space in
> the tap case, and does the necessary copy of the entire header
> in the raw case. I don't think a double copy of the entire vnet_hdr
> on every packet is a good idea in the tap case,

Right. But what I suggested only copies the num_buffer.

> and the simple
> assignment with the existing copy_to_user() of the header is
> cheaper than double-copying num_buffers in the raw case.

Here I disagree. I think a single write to a buffer that
we know is in cache will be cheaper than a branch.
It's also less codelines :)

> So, I
> do think this code is best. It could use hdr_size instead of
> the (in-line) feature-bit check, but there are no unnecessary
> copies as-is, and I think there would be if we don't use the
> two separate ways of updating num_buffers.

A 2 byte copy is almost free.

> > > 
> > > > 
> > > > > +            vq_err(vq, "tiny buffers < %d unsupported",
> > > > > +               vq->iov[0].iov_len);
> > > > > +            vhost_discard_desc(vq, headcount);
> > > > > +            break;
> > > > 
> > > > Problem here is that recvmsg might modify iov.
> > > > This is why I think we need to use vq->hdr here.
> > > 
> > >         rcvmsg() can never modify the iovec;
> > 
> > 
> > Look at af_packet code for an example where it does.
> > The pointer is non-const, it's OK to modify.
> > tap used to modify iovec as well, the fact it doesn't
> > now is a side-effect of reusing same code for
> > recvmsg and aio read.
> 
>         OK, even assuming it can, the check is done after
> the recvmsg -- it can't change between the length check
> and the put_user() of num_buffer, so I'm not sure what
> your concern is here. Are you saying that vq->iov may
> be trashed so that it no longer points to the ring buffer?

recvmsg updates at least the length.

>         What I need is to write the num_buffer value at
> offset 10 in the userspace ring buffer after the recvmsg().
> If the iovec has been modified, I'm using the modified.
> If you're saying that it may be garbage, then your suggestion
> is that I save the pointer to offset 10 of the ring buffer
> before the call to recvmsg so I can update it after?

Essentially, yes. Upstream code (move_iovec_hdr) assumes that header
size we might need to save is same as size we hide from backend,
but here it is different.  If we generalize move_iovec_hdr to something like
the below (warning: completely untested, likely has integer overflow
or other problems), then I think it will do what we want,
so that memcpy_toiovecend will work:

/* Copy len bytes, and pop the first pop_len bytes from iovec.
 * Arguments must satisfy pop_len <= len.
 * Return number of segments used. */
static int move_iovec_hdr(struct iovec *from, struct iovec *to,
                          size_t len, size_t pop_len, int iov_count)
{
        int seg = 0;
        size_t size;
        while (len && seg < iov_count) {
                size = min(from->iov_len, len);
                to->iov_base = from->iov_base;
                to->iov_len = size;
                if (pop_len > 0) {
                        from->iov_len -= size;
                        from->iov_base += size;
                        pop_len -= size;
                }
                len -= size;
                ++from;
                ++to;
                ++seg;
        }
        return seg;
}


> > > To use vq->hdr, we'd have to copy
> > > it in from user space, modify it, and then copy it back
> > > in to user space, on every packet. In the normal tap case,
> > > hdr_size is 0 and we read it directly from the socket to
> > > user space. It is already correct for mergeable buffers,
> > > except for the num_buffers value added here.
> > 
> > 
> > I don't understand what you write above, sorry.
> > We have iov, all we need is store the first address+length
> > in the hdr field.
> 
> Sorry, in that discussion, I was confusing "hdr" with vq->hdr.
> In the tap case, hdr_size is 0 and we have nothing in vq->hdr.
> As discussed before, if you mean updating a local copy of the
> header, we don't have a local copy of the header -- recvmsg()
> has written it directly to the user buffer. To get a local
> copy, we'd need to either copy_from_user() out of the ring or
> get it from recvmsg() by changing the iovec and then later
> do an extra copy of it to get it in the user buffer where we
> need it.

Yes but we only need to update 2 bytes in userspace memory.
Don't touch anything else.

> > 
> > > > 
> > > > How about using
> > > >    memcpy_toiovecend(vq->hdr, &headcount,
> > > >            offsetof(struct virtio_net_hdr_mrg_rxbuf, num_buffers),
> > > >            sizeof headcount);
> > > > 
> > > > this way we also do not make any assumptions about layout.
> > > 
> > >         Because this overwrites the valid vnet header we got from
> > > the tap socket with a local copy that isn't correct.
> > 
> > It does? I think that this only writes out 2 bytes at an offset.
> 
>         Oh, sorry, I misread. vq->hdr doesn't have anything in it in
> the tap case, but something like this may eliminate the need to check
> iov_len > sizeof(*vhdr); will investigate.
> 
> 
>                                                         +-DLS
> 
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff -ruNp net-next-p0/drivers/vhost/net.c
net-next-v3/drivers/vhost/net.c
--- net-next-p0/drivers/vhost/net.c	2010-03-22 12:04:38.000000000 -0700
+++ net-next-v3/drivers/vhost/net.c	2010-04-06 12:54:56.000000000 -0700
@@ -130,9 +130,8 @@  static void handle_tx(struct vhost_net *
 	hdr_size = vq->hdr_size;
 
 	for (;;) {
-		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
-					 ARRAY_SIZE(vq->iov),
-					 &out, &in,
+		head = vhost_get_desc(&net->dev, vq, vq->iov,
+					 ARRAY_SIZE(vq->iov), &out, &in,
 					 NULL, NULL);
 		/* Nothing new?  Wait for eventfd to tell us they refilled. */
 		if (head == vq->num) {
@@ -167,8 +166,15 @@  static void handle_tx(struct vhost_net *
 		/* TODO: Check specific error and bomb out unless ENOBUFS? */
 		err = sock->ops->sendmsg(NULL, sock, &msg, len);
 		if (unlikely(err < 0)) {
-			vhost_discard_vq_desc(vq);
-			tx_poll_start(net, sock);
+			if (err == -EAGAIN) {
+				vhost_discard_desc(vq, 1);
+				tx_poll_start(net, sock);
+			} else {
+				vq_err(vq, "sendmsg: errno %d\n", -err);
+				/* drop packet; do not discard/resend */
+				vhost_add_used_and_signal(&net->dev, vq, head,
+							  0);
+			}
 			break;
 		}
 		if (err != len)
@@ -186,12 +192,25 @@  static void handle_tx(struct vhost_net *
 	unuse_mm(net->dev.mm);
 }
 
+static int vhost_head_len(struct sock *sk)
+{
+	struct sk_buff *head;
+	int len = 0;
+
+	lock_sock(sk);
+	head = skb_peek(&sk->sk_receive_queue);
+	if (head)
+		len = head->len;
+	release_sock(sk);
+	return len;
+}
+
 /* Expects to be always run from workqueue - which acts as
  * read-size critical section for our kind of RCU. */
 static void handle_rx(struct vhost_net *net)
 {
 	struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX];
-	unsigned head, out, in, log, s;
+	unsigned in, log, s;
 	struct vhost_log *vq_log;
 	struct msghdr msg = {
 		.msg_name = NULL,
@@ -202,13 +221,14 @@  static void handle_rx(struct vhost_net *
 		.msg_flags = MSG_DONTWAIT,
 	};
 
-	struct virtio_net_hdr hdr = {
-		.flags = 0,
-		.gso_type = VIRTIO_NET_HDR_GSO_NONE
+	struct virtio_net_hdr_mrg_rxbuf hdr = {
+		.hdr.flags = 0,
+		.hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE
 	};
 
+	int retries = 0;
 	size_t len, total_len = 0;
-	int err;
+	int err, headcount, datalen;
 	size_t hdr_size;
 	struct socket *sock = rcu_dereference(vq->private_data);
 	if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
@@ -222,31 +242,25 @@  static void handle_rx(struct vhost_net *
 	vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
 		vq->log : NULL;
 
-	for (;;) {
-		head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
-					 ARRAY_SIZE(vq->iov),
-					 &out, &in,
-					 vq_log, &log);
+	while ((datalen = vhost_head_len(sock->sk))) {
+		headcount = vhost_get_desc_n(vq, vq->heads, datalen, &in,
+					     vq_log, &log);
 		/* OK, now we need to know about added descriptors. */
-		if (head == vq->num) {
-			if (unlikely(vhost_enable_notify(vq))) {
+		if (!headcount) {
+			if (retries == 0 && unlikely(vhost_enable_notify(vq))) {
 				/* They have slipped one in as we were
 				 * doing that: check again. */
 				vhost_disable_notify(vq);
+				retries++;
 				continue;
 			}
+			retries = 0;
 			/* Nothing new?  Wait for eventfd to tell us
 			 * they refilled. */
 			break;
 		}
 		/* We don't need to be notified again. */
-		if (out) {
-			vq_err(vq, "Unexpected descriptor format for RX: "
-			       "out %d, int %d\n",
-			       out, in);
-			break;
-		}
-		/* Skip header. TODO: support TSO/mergeable rx buffers. */
+		/* Skip header. TODO: support TSO. */
 		s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
 		msg.msg_iovlen = in;
 		len = iov_length(vq->iov, in);
@@ -261,14 +275,33 @@  static void handle_rx(struct vhost_net *
 					 len, MSG_DONTWAIT | MSG_TRUNC);
 		/* TODO: Check specific error and bomb out unless EAGAIN? */
 		if (err < 0) {
-			vhost_discard_vq_desc(vq);
+			vhost_discard_desc(vq, headcount);
 			break;
 		}
 		/* TODO: Should check and handle checksum. */
+		if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) {
+			struct virtio_net_hdr_mrg_rxbuf *vhdr =
+				(struct virtio_net_hdr_mrg_rxbuf *)
+				vq->iov[0].iov_base;
+			/* add num_buffers */
+			if (vhost_has_feature(&net->dev,
+					      VHOST_NET_F_VIRTIO_NET_HDR))
+				hdr.num_buffers = headcount;
+			else if (vq->iov[0].iov_len < sizeof(*vhdr)) {
+				vq_err(vq, "tiny buffers < %d unsupported",
+					vq->iov[0].iov_len);
+				vhost_discard_desc(vq, headcount);
+				break;
+			} else if (put_user(headcount, &vhdr->num_buffers)) {
+				vq_err(vq, "Failed num_buffers write");
+				vhost_discard_desc(vq, headcount);
+				break;
+			}
+		}
 		if (err > len) {
 			pr_err("Discarded truncated rx packet: "
 			       " len %d > %zd\n", err, len);
-			vhost_discard_vq_desc(vq);
+			vhost_discard_desc(vq, headcount);
 			continue;
 		}
 		len = err;
@@ -279,7 +312,7 @@  static void handle_rx(struct vhost_net *
 			break;
 		}
 		len += hdr_size;
-		vhost_add_used_and_signal(&net->dev, vq, head, len);
+		vhost_add_used_and_signal_n(&net->dev, vq, vq->heads, headcount);
 		if (unlikely(vq_log))
 			vhost_log_write(vq, vq_log, log, len);
 		total_len += len;
@@ -560,9 +593,14 @@  done:
 
 static int vhost_net_set_features(struct vhost_net *n, u64 features)
 {
-	size_t hdr_size = features & (1 << VHOST_NET_F_VIRTIO_NET_HDR) ?
-		sizeof(struct virtio_net_hdr) : 0;
+	size_t hdr_size = 0;
 	int i;
+
+	if (features & (1 << VHOST_NET_F_VIRTIO_NET_HDR)) {
+		hdr_size = sizeof(struct virtio_net_hdr);
+		if (features & (1 << VIRTIO_NET_F_MRG_RXBUF))
+			hdr_size = sizeof(struct virtio_net_hdr_mrg_rxbuf);
+	}
 	mutex_lock(&n->dev.mutex);
 	if ((features & (1 << VHOST_F_LOG_ALL)) &&
 	    !vhost_log_access_ok(&n->dev)) {