diff mbox

[net-next,V2,3/3] tun: rx batching

Message ID 1482912571-3157-4-git-send-email-jasowang@redhat.com
State Changes Requested, archived
Delegated to: David Miller
Headers show

Commit Message

Jason Wang Dec. 28, 2016, 8:09 a.m. UTC
We can only process 1 packet at one time during sendmsg(). This often
lead bad cache utilization under heavy load. So this patch tries to do
some batching during rx before submitting them to host network
stack. This is done through accepting MSG_MORE as a hint from
sendmsg() caller, if it was set, batch the packet temporarily in a
linked list and submit them all once MSG_MORE were cleared.

Tests were done by pktgen (burst=128) in guest over mlx4(noqueue) on host:

                  Mpps  -+%
    rx_batched=0  0.90  +0%
    rx_batched=4  0.97  +7.8%
    rx_batched=8  0.97  +7.8%
    rx_batched=16 0.98  +8.9%
    rx_batched=32 1.03  +14.4%
    rx_batched=48 1.09  +21.1%
    rx_batched=64 1.02  +13.3%

The maximum number of batched packets were specified through a module
parameter.

Signed-off-by: Jason Wang <jasowang@redhat.com>
---
 drivers/net/tun.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 58 insertions(+), 8 deletions(-)

Comments

David Miller Dec. 29, 2016, 4:35 p.m. UTC | #1
From: Jason Wang <jasowang@redhat.com>
Date: Wed, 28 Dec 2016 16:09:31 +0800

> +	spin_lock(&queue->lock);
> +	qlen = skb_queue_len(queue);
> +	if (qlen > rx_batched)
> +		goto drop;
> +	__skb_queue_tail(queue, skb);
> +	if (!more || qlen + 1 > rx_batched) {
> +		__skb_queue_head_init(&process_queue);
> +		skb_queue_splice_tail_init(queue, &process_queue);
> +		rcv = true;
> +	}
> +	spin_unlock(&queue->lock);

Since you always clear the 'queue' when you insert the skb that hits
the limit, I don't see how the "goto drop" path can be possibly taken.
Jason Wang Dec. 30, 2016, 5:14 a.m. UTC | #2
On 2016年12月30日 00:35, David Miller wrote:
> From: Jason Wang <jasowang@redhat.com>
> Date: Wed, 28 Dec 2016 16:09:31 +0800
>
>> +	spin_lock(&queue->lock);
>> +	qlen = skb_queue_len(queue);
>> +	if (qlen > rx_batched)
>> +		goto drop;
>> +	__skb_queue_tail(queue, skb);
>> +	if (!more || qlen + 1 > rx_batched) {
>> +		__skb_queue_head_init(&process_queue);
>> +		skb_queue_splice_tail_init(queue, &process_queue);
>> +		rcv = true;
>> +	}
>> +	spin_unlock(&queue->lock);
> Since you always clear the 'queue' when you insert the skb that hits
> the limit, I don't see how the "goto drop" path can be possibly taken.

True, will fix this.

Thanks
Stefan Hajnoczi Jan. 3, 2017, 1:33 p.m. UTC | #3
On Wed, Dec 28, 2016 at 04:09:31PM +0800, Jason Wang wrote:
> +static int tun_rx_batched(struct tun_file *tfile, struct sk_buff *skb,
> +			  int more)
> +{
> +	struct sk_buff_head *queue = &tfile->sk.sk_write_queue;
> +	struct sk_buff_head process_queue;
> +	int qlen;
> +	bool rcv = false;
> +
> +	spin_lock(&queue->lock);

Should this be spin_lock_bh()?  Below and in tun_get_user() there are
explicit local_bh_disable() calls so I guess BHs can interrupt us here
and this would deadlock.
Jason Wang Jan. 4, 2017, 3:03 a.m. UTC | #4
On 2017年01月03日 21:33, Stefan Hajnoczi wrote:
> On Wed, Dec 28, 2016 at 04:09:31PM +0800, Jason Wang wrote:
>> +static int tun_rx_batched(struct tun_file *tfile, struct sk_buff *skb,
>> +			  int more)
>> +{
>> +	struct sk_buff_head *queue = &tfile->sk.sk_write_queue;
>> +	struct sk_buff_head process_queue;
>> +	int qlen;
>> +	bool rcv = false;
>> +
>> +	spin_lock(&queue->lock);
> Should this be spin_lock_bh()?  Below and in tun_get_user() there are
> explicit local_bh_disable() calls so I guess BHs can interrupt us here
> and this would deadlock.

sk_write_queue were accessed only in this function which runs under 
process context, so no need for spin_lock_bh() here.
Stefan Hajnoczi Jan. 5, 2017, 9:27 a.m. UTC | #5
On Wed, Jan 04, 2017 at 11:03:32AM +0800, Jason Wang wrote:
> On 2017年01月03日 21:33, Stefan Hajnoczi wrote:
> > On Wed, Dec 28, 2016 at 04:09:31PM +0800, Jason Wang wrote:
> > > +static int tun_rx_batched(struct tun_file *tfile, struct sk_buff *skb,
> > > +			  int more)
> > > +{
> > > +	struct sk_buff_head *queue = &tfile->sk.sk_write_queue;
> > > +	struct sk_buff_head process_queue;
> > > +	int qlen;
> > > +	bool rcv = false;
> > > +
> > > +	spin_lock(&queue->lock);
> > Should this be spin_lock_bh()?  Below and in tun_get_user() there are
> > explicit local_bh_disable() calls so I guess BHs can interrupt us here
> > and this would deadlock.
> 
> sk_write_queue were accessed only in this function which runs under process
> context, so no need for spin_lock_bh() here.

I see, thanks!

Stefan
diff mbox

Patch

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index cd8e02c..6ea5d6d 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -75,6 +75,10 @@ 
 
 #include <linux/uaccess.h>
 
+static int rx_batched;
+module_param(rx_batched, int, 0444);
+MODULE_PARM_DESC(rx_batched, "Number of packets batched in rx");
+
 /* Uncomment to enable debugging */
 /* #define TUN_DEBUG 1 */
 
@@ -522,6 +526,7 @@  static void tun_queue_purge(struct tun_file *tfile)
 	while ((skb = skb_array_consume(&tfile->tx_array)) != NULL)
 		kfree_skb(skb);
 
+	skb_queue_purge(&tfile->sk.sk_write_queue);
 	skb_queue_purge(&tfile->sk.sk_error_queue);
 }
 
@@ -1140,10 +1145,44 @@  static struct sk_buff *tun_alloc_skb(struct tun_file *tfile,
 	return skb;
 }
 
+static int tun_rx_batched(struct tun_file *tfile, struct sk_buff *skb,
+			  int more)
+{
+	struct sk_buff_head *queue = &tfile->sk.sk_write_queue;
+	struct sk_buff_head process_queue;
+	int qlen;
+	bool rcv = false;
+
+	spin_lock(&queue->lock);
+	qlen = skb_queue_len(queue);
+	if (qlen > rx_batched)
+		goto drop;
+	__skb_queue_tail(queue, skb);
+	if (!more || qlen + 1 > rx_batched) {
+		__skb_queue_head_init(&process_queue);
+		skb_queue_splice_tail_init(queue, &process_queue);
+		rcv = true;
+	}
+	spin_unlock(&queue->lock);
+
+	if (rcv) {
+		local_bh_disable();
+		while ((skb = __skb_dequeue(&process_queue)))
+			netif_receive_skb(skb);
+		local_bh_enable();
+	}
+
+	return 0;
+drop:
+	spin_unlock(&queue->lock);
+	kfree_skb(skb);
+	return -EFAULT;
+}
+
 /* Get packet from user space buffer */
 static ssize_t tun_get_user(struct tun_struct *tun, struct tun_file *tfile,
 			    void *msg_control, struct iov_iter *from,
-			    int noblock)
+			    int noblock, bool more)
 {
 	struct tun_pi pi = { 0, cpu_to_be16(ETH_P_IP) };
 	struct sk_buff *skb;
@@ -1283,18 +1322,27 @@  static ssize_t tun_get_user(struct tun_struct *tun, struct tun_file *tfile,
 	skb_probe_transport_header(skb, 0);
 
 	rxhash = skb_get_hash(skb);
+
 #ifndef CONFIG_4KSTACKS
-	local_bh_disable();
-	netif_receive_skb(skb);
-	local_bh_enable();
+	if (!rx_batched) {
+		local_bh_disable();
+		netif_receive_skb(skb);
+		local_bh_enable();
+	} else {
+		err = tun_rx_batched(tfile, skb, more);
+	}
 #else
 	netif_rx_ni(skb);
 #endif
 
 	stats = get_cpu_ptr(tun->pcpu_stats);
 	u64_stats_update_begin(&stats->syncp);
-	stats->rx_packets++;
-	stats->rx_bytes += len;
+	if (err) {
+		stats->rx_dropped++;
+	} else {
+		stats->rx_packets++;
+		stats->rx_bytes += len;
+	}
 	u64_stats_update_end(&stats->syncp);
 	put_cpu_ptr(stats);
 
@@ -1312,7 +1360,8 @@  static ssize_t tun_chr_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	if (!tun)
 		return -EBADFD;
 
-	result = tun_get_user(tun, tfile, NULL, from, file->f_flags & O_NONBLOCK);
+	result = tun_get_user(tun, tfile, NULL, from,
+			      file->f_flags & O_NONBLOCK, false);
 
 	tun_put(tun);
 	return result;
@@ -1570,7 +1619,8 @@  static int tun_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
 		return -EBADFD;
 
 	ret = tun_get_user(tun, tfile, m->msg_control, &m->msg_iter,
-			   m->msg_flags & MSG_DONTWAIT);
+			   m->msg_flags & MSG_DONTWAIT,
+			   m->msg_flags & MSG_MORE);
 	tun_put(tun);
 	return ret;
 }