diff mbox

[net-next,V2] tun: introduce tx skb ring

Message ID 1465979897-4445-1-git-send-email-jasowang@redhat.com
State Changes Requested, archived
Delegated to: David Miller
Headers show

Commit Message

Jason Wang June 15, 2016, 8:38 a.m. UTC
We used to queue tx packets in sk_receive_queue, this is less
efficient since it requires spinlocks to synchronize between producer
and consumer.

This patch tries to address this by:

- introduce a new mode which will be only enabled with IFF_TX_ARRAY
  set and switch from sk_receive_queue to a fixed size of skb
  array with 256 entries in this mode.
- introduce a new proto_ops peek_len which was used for peeking the
  skb length.
- implement a tun version of peek_len for vhost_net to use and convert
  vhost_net to use peek_len if possible.

Pktgen test shows about 18% improvement on guest receiving pps for small
buffers:

Before: ~1220000pps
After : ~1440000pps

The reason why I stick to new mode is because:

- though resize is supported by skb array, in multiqueue mode, it's
  not easy to recover from a partial success of queue resizing.
- tx_queue_len is a user visible feature.

Signed-off-by: Jason Wang <jasowang@redhat.com>
---
- The patch is based on [PATCH v8 0/5] skb_array: array based FIFO for skbs

Changes from V1:
- switch to use skb array instead of a customized circular buffer
- add non-blocking support
- rename .peek to .peek_len
- drop lockless peeking since test show very minor improvement
---
 drivers/net/tun.c           | 138 ++++++++++++++++++++++++++++++++++++++++----
 drivers/vhost/net.c         |  16 ++++-
 include/linux/net.h         |   1 +
 include/uapi/linux/if_tun.h |   1 +
 4 files changed, 143 insertions(+), 13 deletions(-)

Comments

kernel test robot June 15, 2016, 10:34 a.m. UTC | #1
Hi,

[auto build test ERROR on net-next/master]

url:    https://github.com/0day-ci/linux/commits/Jason-Wang/tun-introduce-tx-skb-ring/20160615-164041
config: x86_64-randconfig-s2-06151732 (attached as .config)
compiler: gcc-6 (Debian 6.1.1-1) 6.1.1 20160430
reproduce:
        # save the attached .config to linux build tree
        make ARCH=x86_64 

All errors (new ones prefixed by >>):

>> drivers/net/tun.c:74:29: fatal error: linux/skb_array.h: No such file or directory
    #include <linux/skb_array.h>
                                ^
   compilation terminated.

vim +74 drivers/net/tun.c

    68	#include <net/net_namespace.h>
    69	#include <net/netns/generic.h>
    70	#include <net/rtnetlink.h>
    71	#include <net/sock.h>
    72	#include <linux/seq_file.h>
    73	#include <linux/uio.h>
  > 74	#include <linux/skb_array.h>
    75	
    76	#include <asm/uaccess.h>
    77	

---
0-DAY kernel test infrastructure                Open Source Technology Center
https://lists.01.org/pipermail/kbuild-all                   Intel Corporation
Jamal Hadi Salim June 15, 2016, 11:52 a.m. UTC | #2
On 16-06-15 04:38 AM, Jason Wang wrote:
> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.
> 
> This patch tries to address this by:
> 
> - introduce a new mode which will be only enabled with IFF_TX_ARRAY
>    set and switch from sk_receive_queue to a fixed size of skb
>    array with 256 entries in this mode.
> - introduce a new proto_ops peek_len which was used for peeking the
>    skb length.
> - implement a tun version of peek_len for vhost_net to use and convert
>    vhost_net to use peek_len if possible.
> 
> Pktgen test shows about 18% improvement on guest receiving pps for small
> buffers:
> 
> Before: ~1220000pps
> After : ~1440000pps
> 

So this is more exercising the skb array improvements. For tun
it would be useful to see general performance numbers on user/kernel
crossing (i.e tun read/write).
If you have the cycles can you run such tests?

cheers,
jamal
Jamal Hadi Salim June 15, 2016, 11:55 a.m. UTC | #3
On 16-06-15 07:52 AM, Jamal Hadi Salim wrote:
> On 16-06-15 04:38 AM, Jason Wang wrote:
>> We used to queue tx packets in sk_receive_queue, this is less
>> efficient since it requires spinlocks to synchronize between producer

> 
> So this is more exercising the skb array improvements. For tun
> it would be useful to see general performance numbers on user/kernel
> crossing (i.e tun read/write).
> If you have the cycles can you run such tests?
> 

Ignore my message - you are running pktgen from a VM towards the host.
So the numbers you posted are what i was interested in.
Thanks for the good work.

cheers,
jamal
Jason Wang June 16, 2016, 7:08 a.m. UTC | #4
On 2016年06月15日 19:55, Jamal Hadi Salim wrote:
> On 16-06-15 07:52 AM, Jamal Hadi Salim wrote:
>> On 16-06-15 04:38 AM, Jason Wang wrote:
>>> We used to queue tx packets in sk_receive_queue, this is less
>>> efficient since it requires spinlocks to synchronize between producer
>> So this is more exercising the skb array improvements. For tun
>> it would be useful to see general performance numbers on user/kernel
>> crossing (i.e tun read/write).
>> If you have the cycles can you run such tests?
>>
> Ignore my message - you are running pktgen from a VM towards the host.

Actually reversed, test were done from an external host to VM.

Thanks

> So the numbers you posted are what i was interested in.
> Thanks for the good work.
>
> cheers,
> jamal
>
David Miller June 17, 2016, 12:01 a.m. UTC | #5
From: Jason Wang <jasowang@redhat.com>
Date: Wed, 15 Jun 2016 16:38:17 +0800

> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.
> 
> This patch tries to address this by:
> 
> - introduce a new mode which will be only enabled with IFF_TX_ARRAY
>   set and switch from sk_receive_queue to a fixed size of skb
>   array with 256 entries in this mode.
> - introduce a new proto_ops peek_len which was used for peeking the
>   skb length.
> - implement a tun version of peek_len for vhost_net to use and convert
>   vhost_net to use peek_len if possible.
> 
> Pktgen test shows about 18% improvement on guest receiving pps for small
> buffers:
> 
> Before: ~1220000pps
> After : ~1440000pps
> 
> The reason why I stick to new mode is because:
> 
> - though resize is supported by skb array, in multiqueue mode, it's
>   not easy to recover from a partial success of queue resizing.
> - tx_queue_len is a user visible feature.
> 
> Signed-off-by: Jason Wang <jasowang@redhat.com>

Michael, can you please review this, especially as this is the first
user of your new infrastructure :-)
Michael S. Tsirkin June 17, 2016, 12:41 a.m. UTC | #6
On Wed, Jun 15, 2016 at 04:38:17PM +0800, Jason Wang wrote:
> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.
> 
> This patch tries to address this by:
> 
> - introduce a new mode which will be only enabled with IFF_TX_ARRAY
>   set and switch from sk_receive_queue to a fixed size of skb
>   array with 256 entries in this mode.
> - introduce a new proto_ops peek_len which was used for peeking the
>   skb length.
> - implement a tun version of peek_len for vhost_net to use and convert
>   vhost_net to use peek_len if possible.
> 
> Pktgen test shows about 18% improvement on guest receiving pps for small
> buffers:
> 
> Before: ~1220000pps
> After : ~1440000pps
> 
> The reason why I stick to new mode is because:
> 
> - though resize is supported by skb array, in multiqueue mode, it's
>   not easy to recover from a partial success of queue resizing.
> - tx_queue_len is a user visible feature.
> 
> Signed-off-by: Jason Wang <jasowang@redhat.com>

I still think it's wrong to add a new feature for this.
For example, why 256 entries?
Queue len is user visible but it's there precisely for this
reason so people can tune queue for workload.

Would it help to have ptr_ring_resize that gets an array of
rings and resizes them both to same length?

> ---
> - The patch is based on [PATCH v8 0/5] skb_array: array based FIFO for skbs
> 
> Changes from V1:
> - switch to use skb array instead of a customized circular buffer
> - add non-blocking support
> - rename .peek to .peek_len
> - drop lockless peeking since test show very minor improvement
> ---
>  drivers/net/tun.c           | 138 ++++++++++++++++++++++++++++++++++++++++----
>  drivers/vhost/net.c         |  16 ++++-
>  include/linux/net.h         |   1 +
>  include/uapi/linux/if_tun.h |   1 +
>  4 files changed, 143 insertions(+), 13 deletions(-)
> 
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index e16487c..b22e475 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -71,6 +71,7 @@
>  #include <net/sock.h>
>  #include <linux/seq_file.h>
>  #include <linux/uio.h>
> +#include <linux/skb_array.h>
>  
>  #include <asm/uaccess.h>
>  
> @@ -130,6 +131,7 @@ struct tap_filter {
>  #define MAX_TAP_FLOWS  4096
>  
>  #define TUN_FLOW_EXPIRE (3 * HZ)
> +#define TUN_RING_SIZE 256
>  
>  struct tun_pcpu_stats {
>  	u64 rx_packets;
> @@ -167,6 +169,7 @@ struct tun_file {
>  	};
>  	struct list_head next;
>  	struct tun_struct *detached;
> +	struct skb_array tx_array;
>  };
>  
>  struct tun_flow_entry {
> @@ -513,8 +516,15 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>  	return tun;
>  }
>  
> -static void tun_queue_purge(struct tun_file *tfile)
> +static void tun_queue_purge(struct tun_struct *tun, struct tun_file *tfile)
>  {
> +	struct sk_buff *skb;
> +
> +	if (tun->flags & IFF_TX_ARRAY) {
> +		while ((skb = skb_array_consume(&tfile->tx_array)) != NULL)
> +			kfree_skb(skb);
> +	}
> +
>  	skb_queue_purge(&tfile->sk.sk_receive_queue);
>  	skb_queue_purge(&tfile->sk.sk_error_queue);
>  }
> @@ -545,7 +555,7 @@ static void __tun_detach(struct tun_file *tfile, bool clean)
>  		synchronize_net();
>  		tun_flow_delete_by_queue(tun, tun->numqueues + 1);
>  		/* Drop read queue */
> -		tun_queue_purge(tfile);
> +		tun_queue_purge(tun, tfile);
>  		tun_set_real_num_queues(tun);
>  	} else if (tfile->detached && clean) {
>  		tun = tun_enable_queue(tfile);
> @@ -560,6 +570,8 @@ static void __tun_detach(struct tun_file *tfile, bool clean)
>  			    tun->dev->reg_state == NETREG_REGISTERED)
>  				unregister_netdevice(tun->dev);
>  		}
> +		if (tun && tun->flags & IFF_TX_ARRAY)
> +			skb_array_cleanup(&tfile->tx_array);
>  		sock_put(&tfile->sk);
>  	}
>  }
> @@ -596,12 +608,12 @@ static void tun_detach_all(struct net_device *dev)
>  	for (i = 0; i < n; i++) {
>  		tfile = rtnl_dereference(tun->tfiles[i]);
>  		/* Drop read queue */
> -		tun_queue_purge(tfile);
> +		tun_queue_purge(tun, tfile);
>  		sock_put(&tfile->sk);
>  	}
>  	list_for_each_entry_safe(tfile, tmp, &tun->disabled, next) {
>  		tun_enable_queue(tfile);
> -		tun_queue_purge(tfile);
> +		tun_queue_purge(tun, tfile);
>  		sock_put(&tfile->sk);
>  	}
>  	BUG_ON(tun->numdisabled != 0);
> @@ -642,6 +654,13 @@ static int tun_attach(struct tun_struct *tun, struct file *file, bool skip_filte
>  		if (!err)
>  			goto out;
>  	}
> +
> +	if (!tfile->detached && tun->flags & IFF_TX_ARRAY &&
> +	    skb_array_init(&tfile->tx_array, TUN_RING_SIZE, GFP_KERNEL)) {
> +		err = -ENOMEM;
> +		goto out;
> +	}
> +
>  	tfile->queue_index = tun->numqueues;
>  	tfile->socket.sk->sk_shutdown &= ~RCV_SHUTDOWN;
>  	rcu_assign_pointer(tfile->tun, tun);
> @@ -891,8 +910,13 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  
>  	nf_reset(skb);
>  
> -	/* Enqueue packet */
> -	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> +	if (tun->flags & IFF_TX_ARRAY) {
> +		if (skb_array_produce(&tfile->tx_array, skb))
> +			goto drop;
> +	} else {
> +		/* Enqueue packet */
> +		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> +	}
>  
>  	/* Notify and wake up reader process */
>  	if (tfile->flags & TUN_FASYNC)
> @@ -1088,6 +1112,17 @@ static void tun_net_init(struct net_device *dev)
>  	}
>  }
>  
> +static int tun_queue_not_empty(struct tun_struct *tun,
> +			       struct tun_file *tfile)
> +{
> +	struct sock *sk = tfile->socket.sk;
> +
> +	if (tun->flags & IFF_TX_ARRAY)
> +		return !skb_array_empty(&tfile->tx_array);
> +	else
> +		return !skb_queue_empty(&sk->sk_receive_queue);
> +}
> +
>  /* Character device part */
>  
>  /* Poll */
> @@ -1107,7 +1142,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>  
>  	poll_wait(file, sk_sleep(sk), wait);
>  
> -	if (!skb_queue_empty(&sk->sk_receive_queue))
> +	if (tun_queue_not_empty(tun, tfile))
>  		mask |= POLLIN | POLLRDNORM;
>  
>  	if (sock_writeable(sk) ||
> @@ -1481,6 +1516,46 @@ done:
>  	return total;
>  }
>  
> +static struct sk_buff *tun_ring_recv(struct tun_file *tfile, int noblock,
> +				     int *err)
> +{
> +	DECLARE_WAITQUEUE(wait, current);
> +	struct sk_buff *skb = NULL;
> +
> +	skb = skb_array_consume(&tfile->tx_array);
> +	if (skb)
> +		goto out;
> +	if (noblock) {
> +		*err = -EAGAIN;
> +		goto out;
> +	}
> +
> +	add_wait_queue(&tfile->wq.wait, &wait);
> +	current->state = TASK_INTERRUPTIBLE;
> +
> +	while (1) {
> +		skb = skb_array_consume(&tfile->tx_array);
> +		if (skb)
> +			break;
> +		if (signal_pending(current)) {
> +			*err = -ERESTARTSYS;
> +			break;
> +		}
> +		if (tfile->socket.sk->sk_shutdown & RCV_SHUTDOWN) {
> +			*err = -EFAULT;
> +			break;
> +		}
> +
> +		schedule();
> +	};
> +
> +	current->state = TASK_RUNNING;
> +	remove_wait_queue(&tfile->wq.wait, &wait);
> +
> +out:
> +	return skb;
> +}
> +
>  static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>  			   struct iov_iter *to,
>  			   int noblock)
> @@ -1494,9 +1569,13 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>  	if (!iov_iter_count(to))
>  		return 0;
>  
> -	/* Read frames from queue */
> -	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
> -				  &peeked, &off, &err);
> +	if (tun->flags & IFF_TX_ARRAY)
> +		skb = tun_ring_recv(tfile, noblock, &err);
> +	else
> +		/* Read frames from queue */
> +		skb = __skb_recv_datagram(tfile->socket.sk,
> +					  noblock ? MSG_DONTWAIT : 0,
> +					  &peeked, &off, &err);
>  	if (!skb)
>  		return err;
>  
> @@ -1629,8 +1708,39 @@ out:
>  	return ret;
>  }
>  
> +static int tun_peek_len(struct socket *sock)
> +{
> +	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
> +	struct sock *sk = sock->sk;
> +	struct tun_struct *tun;
> +	int ret = 0;
> +
> +	tun = __tun_get(tfile);
> +	if (!tun)
> +		return 0;
> +
> +	if (tun->flags & IFF_TX_ARRAY) {
> +		ret = skb_array_peek_len(&tfile->tx_array);
> +	} else {
> +		struct sk_buff *head;
> +
> +		spin_lock_bh(&sk->sk_receive_queue.lock);
> +		head = skb_peek(&sk->sk_receive_queue);
> +		if (likely(head)) {
> +			ret = head->len;
> +			if (skb_vlan_tag_present(head))
> +				ret += VLAN_HLEN;
> +		}
> +		spin_unlock_bh(&sk->sk_receive_queue.lock);
> +	}
> +
> +	tun_put(tun);
> +	return ret;
> +}
> +
>  /* Ops structure to mimic raw sockets with tun */
>  static const struct proto_ops tun_socket_ops = {
> +	.peek_len = tun_peek_len,
>  	.sendmsg = tun_sendmsg,
>  	.recvmsg = tun_recvmsg,
>  };
> @@ -1643,7 +1753,8 @@ static struct proto tun_proto = {
>  
>  static int tun_flags(struct tun_struct *tun)
>  {
> -	return tun->flags & (TUN_FEATURES | IFF_PERSIST | IFF_TUN | IFF_TAP);
> +	return tun->flags & (TUN_FEATURES | IFF_PERSIST | IFF_TUN |
> +			     IFF_TAP | IFF_TX_ARRAY);
>  }
>  
>  static ssize_t tun_show_flags(struct device *dev, struct device_attribute *attr,
> @@ -1755,6 +1866,9 @@ static int tun_set_iff(struct net *net, struct file *file, struct ifreq *ifr)
>  		} else
>  			return -EINVAL;
>  
> +		if (ifr->ifr_flags & IFF_TX_ARRAY)
> +			flags |= IFF_TX_ARRAY;
> +
>  		if (*ifr->ifr_name)
>  			name = ifr->ifr_name;
>  
> @@ -1995,7 +2109,7 @@ static long __tun_chr_ioctl(struct file *file, unsigned int cmd,
>  		 * This is needed because we never checked for invalid flags on
>  		 * TUNSETIFF.
>  		 */
> -		return put_user(IFF_TUN | IFF_TAP | TUN_FEATURES,
> +		return put_user(IFF_TUN | IFF_TAP | IFF_TX_ARRAY | TUN_FEATURES,
>  				(unsigned int __user*)argp);
>  	} else if (cmd == TUNSETQUEUE)
>  		return tun_set_queue(file, &ifr);
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index f744eeb..236ba52 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -455,10 +455,14 @@ out:
>  
>  static int peek_head_len(struct sock *sk)
>  {
> +	struct socket *sock = sk->sk_socket;
>  	struct sk_buff *head;
>  	int len = 0;
>  	unsigned long flags;
>  
> +	if (sock->ops->peek_len)
> +		return sock->ops->peek_len(sock);
> +
>  	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>  	head = skb_peek(&sk->sk_receive_queue);
>  	if (likely(head)) {
> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
>  	return len;
>  }
>  
> +static int sk_has_rx_data(struct sock *sk)
> +{
> +	struct socket *sock = sk->sk_socket;
> +
> +	if (sock->ops->peek_len)
> +		return sock->ops->peek_len(sock);
> +
> +	return skb_queue_empty(&sk->sk_receive_queue);
> +}
> +
>  static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>  {
>  	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>  		endtime = busy_clock() + vq->busyloop_timeout;
>  
>  		while (vhost_can_busy_poll(&net->dev, endtime) &&
> -		       skb_queue_empty(&sk->sk_receive_queue) &&
> +		       !sk_has_rx_data(sk) &&
>  		       vhost_vq_avail_empty(&net->dev, vq))
>  			cpu_relax_lowlatency();
>  
> diff --git a/include/linux/net.h b/include/linux/net.h
> index 9aa49a0..b6b3843 100644
> --- a/include/linux/net.h
> +++ b/include/linux/net.h
> @@ -185,6 +185,7 @@ struct proto_ops {
>  	ssize_t 	(*splice_read)(struct socket *sock,  loff_t *ppos,
>  				       struct pipe_inode_info *pipe, size_t len, unsigned int flags);
>  	int		(*set_peek_off)(struct sock *sk, int val);
> +	int		(*peek_len)(struct socket *sock);
>  };
>  
>  #define DECLARE_SOCKADDR(type, dst, src)	\
> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
> index 3cb5e1d..080003c 100644
> --- a/include/uapi/linux/if_tun.h
> +++ b/include/uapi/linux/if_tun.h
> @@ -61,6 +61,7 @@
>  #define IFF_TUN		0x0001
>  #define IFF_TAP		0x0002
>  #define IFF_NO_PI	0x1000
> +#define IFF_TX_ARRAY	0x0010
>  /* This flag has no real effect */
>  #define IFF_ONE_QUEUE	0x2000
>  #define IFF_VNET_HDR	0x4000
> -- 
> 2.7.4
Jason Wang June 17, 2016, 7:22 a.m. UTC | #7
On 2016年06月17日 08:41, Michael S. Tsirkin wrote:
> On Wed, Jun 15, 2016 at 04:38:17PM +0800, Jason Wang wrote:
>> >We used to queue tx packets in sk_receive_queue, this is less
>> >efficient since it requires spinlocks to synchronize between producer
>> >and consumer.
>> >
>> >This patch tries to address this by:
>> >
>> >- introduce a new mode which will be only enabled with IFF_TX_ARRAY
>> >   set and switch from sk_receive_queue to a fixed size of skb
>> >   array with 256 entries in this mode.
>> >- introduce a new proto_ops peek_len which was used for peeking the
>> >   skb length.
>> >- implement a tun version of peek_len for vhost_net to use and convert
>> >   vhost_net to use peek_len if possible.
>> >
>> >Pktgen test shows about 18% improvement on guest receiving pps for small
>> >buffers:
>> >
>> >Before: ~1220000pps
>> >After : ~1440000pps
>> >
>> >The reason why I stick to new mode is because:
>> >
>> >- though resize is supported by skb array, in multiqueue mode, it's
>> >   not easy to recover from a partial success of queue resizing.
>> >- tx_queue_len is a user visible feature.
>> >
>> >Signed-off-by: Jason Wang<jasowang@redhat.com>
> I still think it's wrong to add a new feature for this.
> For example, why 256 entries?

It's the value of virtqueue size supported by qemu.

> Queue len is user visible but it's there precisely for this
> reason so people can tune queue for workload.

Right.

>
> Would it help to have ptr_ring_resize that gets an array of
> rings and resizes them both to same length?

Yes, that would be very helpful.
diff mbox

Patch

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index e16487c..b22e475 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -71,6 +71,7 @@ 
 #include <net/sock.h>
 #include <linux/seq_file.h>
 #include <linux/uio.h>
+#include <linux/skb_array.h>
 
 #include <asm/uaccess.h>
 
@@ -130,6 +131,7 @@  struct tap_filter {
 #define MAX_TAP_FLOWS  4096
 
 #define TUN_FLOW_EXPIRE (3 * HZ)
+#define TUN_RING_SIZE 256
 
 struct tun_pcpu_stats {
 	u64 rx_packets;
@@ -167,6 +169,7 @@  struct tun_file {
 	};
 	struct list_head next;
 	struct tun_struct *detached;
+	struct skb_array tx_array;
 };
 
 struct tun_flow_entry {
@@ -513,8 +516,15 @@  static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
 	return tun;
 }
 
-static void tun_queue_purge(struct tun_file *tfile)
+static void tun_queue_purge(struct tun_struct *tun, struct tun_file *tfile)
 {
+	struct sk_buff *skb;
+
+	if (tun->flags & IFF_TX_ARRAY) {
+		while ((skb = skb_array_consume(&tfile->tx_array)) != NULL)
+			kfree_skb(skb);
+	}
+
 	skb_queue_purge(&tfile->sk.sk_receive_queue);
 	skb_queue_purge(&tfile->sk.sk_error_queue);
 }
@@ -545,7 +555,7 @@  static void __tun_detach(struct tun_file *tfile, bool clean)
 		synchronize_net();
 		tun_flow_delete_by_queue(tun, tun->numqueues + 1);
 		/* Drop read queue */
-		tun_queue_purge(tfile);
+		tun_queue_purge(tun, tfile);
 		tun_set_real_num_queues(tun);
 	} else if (tfile->detached && clean) {
 		tun = tun_enable_queue(tfile);
@@ -560,6 +570,8 @@  static void __tun_detach(struct tun_file *tfile, bool clean)
 			    tun->dev->reg_state == NETREG_REGISTERED)
 				unregister_netdevice(tun->dev);
 		}
+		if (tun && tun->flags & IFF_TX_ARRAY)
+			skb_array_cleanup(&tfile->tx_array);
 		sock_put(&tfile->sk);
 	}
 }
@@ -596,12 +608,12 @@  static void tun_detach_all(struct net_device *dev)
 	for (i = 0; i < n; i++) {
 		tfile = rtnl_dereference(tun->tfiles[i]);
 		/* Drop read queue */
-		tun_queue_purge(tfile);
+		tun_queue_purge(tun, tfile);
 		sock_put(&tfile->sk);
 	}
 	list_for_each_entry_safe(tfile, tmp, &tun->disabled, next) {
 		tun_enable_queue(tfile);
-		tun_queue_purge(tfile);
+		tun_queue_purge(tun, tfile);
 		sock_put(&tfile->sk);
 	}
 	BUG_ON(tun->numdisabled != 0);
@@ -642,6 +654,13 @@  static int tun_attach(struct tun_struct *tun, struct file *file, bool skip_filte
 		if (!err)
 			goto out;
 	}
+
+	if (!tfile->detached && tun->flags & IFF_TX_ARRAY &&
+	    skb_array_init(&tfile->tx_array, TUN_RING_SIZE, GFP_KERNEL)) {
+		err = -ENOMEM;
+		goto out;
+	}
+
 	tfile->queue_index = tun->numqueues;
 	tfile->socket.sk->sk_shutdown &= ~RCV_SHUTDOWN;
 	rcu_assign_pointer(tfile->tun, tun);
@@ -891,8 +910,13 @@  static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 
 	nf_reset(skb);
 
-	/* Enqueue packet */
-	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
+	if (tun->flags & IFF_TX_ARRAY) {
+		if (skb_array_produce(&tfile->tx_array, skb))
+			goto drop;
+	} else {
+		/* Enqueue packet */
+		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
+	}
 
 	/* Notify and wake up reader process */
 	if (tfile->flags & TUN_FASYNC)
@@ -1088,6 +1112,17 @@  static void tun_net_init(struct net_device *dev)
 	}
 }
 
+static int tun_queue_not_empty(struct tun_struct *tun,
+			       struct tun_file *tfile)
+{
+	struct sock *sk = tfile->socket.sk;
+
+	if (tun->flags & IFF_TX_ARRAY)
+		return !skb_array_empty(&tfile->tx_array);
+	else
+		return !skb_queue_empty(&sk->sk_receive_queue);
+}
+
 /* Character device part */
 
 /* Poll */
@@ -1107,7 +1142,7 @@  static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
 
 	poll_wait(file, sk_sleep(sk), wait);
 
-	if (!skb_queue_empty(&sk->sk_receive_queue))
+	if (tun_queue_not_empty(tun, tfile))
 		mask |= POLLIN | POLLRDNORM;
 
 	if (sock_writeable(sk) ||
@@ -1481,6 +1516,46 @@  done:
 	return total;
 }
 
+static struct sk_buff *tun_ring_recv(struct tun_file *tfile, int noblock,
+				     int *err)
+{
+	DECLARE_WAITQUEUE(wait, current);
+	struct sk_buff *skb = NULL;
+
+	skb = skb_array_consume(&tfile->tx_array);
+	if (skb)
+		goto out;
+	if (noblock) {
+		*err = -EAGAIN;
+		goto out;
+	}
+
+	add_wait_queue(&tfile->wq.wait, &wait);
+	current->state = TASK_INTERRUPTIBLE;
+
+	while (1) {
+		skb = skb_array_consume(&tfile->tx_array);
+		if (skb)
+			break;
+		if (signal_pending(current)) {
+			*err = -ERESTARTSYS;
+			break;
+		}
+		if (tfile->socket.sk->sk_shutdown & RCV_SHUTDOWN) {
+			*err = -EFAULT;
+			break;
+		}
+
+		schedule();
+	};
+
+	current->state = TASK_RUNNING;
+	remove_wait_queue(&tfile->wq.wait, &wait);
+
+out:
+	return skb;
+}
+
 static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
 			   struct iov_iter *to,
 			   int noblock)
@@ -1494,9 +1569,13 @@  static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
 	if (!iov_iter_count(to))
 		return 0;
 
-	/* Read frames from queue */
-	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
-				  &peeked, &off, &err);
+	if (tun->flags & IFF_TX_ARRAY)
+		skb = tun_ring_recv(tfile, noblock, &err);
+	else
+		/* Read frames from queue */
+		skb = __skb_recv_datagram(tfile->socket.sk,
+					  noblock ? MSG_DONTWAIT : 0,
+					  &peeked, &off, &err);
 	if (!skb)
 		return err;
 
@@ -1629,8 +1708,39 @@  out:
 	return ret;
 }
 
+static int tun_peek_len(struct socket *sock)
+{
+	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
+	struct sock *sk = sock->sk;
+	struct tun_struct *tun;
+	int ret = 0;
+
+	tun = __tun_get(tfile);
+	if (!tun)
+		return 0;
+
+	if (tun->flags & IFF_TX_ARRAY) {
+		ret = skb_array_peek_len(&tfile->tx_array);
+	} else {
+		struct sk_buff *head;
+
+		spin_lock_bh(&sk->sk_receive_queue.lock);
+		head = skb_peek(&sk->sk_receive_queue);
+		if (likely(head)) {
+			ret = head->len;
+			if (skb_vlan_tag_present(head))
+				ret += VLAN_HLEN;
+		}
+		spin_unlock_bh(&sk->sk_receive_queue.lock);
+	}
+
+	tun_put(tun);
+	return ret;
+}
+
 /* Ops structure to mimic raw sockets with tun */
 static const struct proto_ops tun_socket_ops = {
+	.peek_len = tun_peek_len,
 	.sendmsg = tun_sendmsg,
 	.recvmsg = tun_recvmsg,
 };
@@ -1643,7 +1753,8 @@  static struct proto tun_proto = {
 
 static int tun_flags(struct tun_struct *tun)
 {
-	return tun->flags & (TUN_FEATURES | IFF_PERSIST | IFF_TUN | IFF_TAP);
+	return tun->flags & (TUN_FEATURES | IFF_PERSIST | IFF_TUN |
+			     IFF_TAP | IFF_TX_ARRAY);
 }
 
 static ssize_t tun_show_flags(struct device *dev, struct device_attribute *attr,
@@ -1755,6 +1866,9 @@  static int tun_set_iff(struct net *net, struct file *file, struct ifreq *ifr)
 		} else
 			return -EINVAL;
 
+		if (ifr->ifr_flags & IFF_TX_ARRAY)
+			flags |= IFF_TX_ARRAY;
+
 		if (*ifr->ifr_name)
 			name = ifr->ifr_name;
 
@@ -1995,7 +2109,7 @@  static long __tun_chr_ioctl(struct file *file, unsigned int cmd,
 		 * This is needed because we never checked for invalid flags on
 		 * TUNSETIFF.
 		 */
-		return put_user(IFF_TUN | IFF_TAP | TUN_FEATURES,
+		return put_user(IFF_TUN | IFF_TAP | IFF_TX_ARRAY | TUN_FEATURES,
 				(unsigned int __user*)argp);
 	} else if (cmd == TUNSETQUEUE)
 		return tun_set_queue(file, &ifr);
diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index f744eeb..236ba52 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -455,10 +455,14 @@  out:
 
 static int peek_head_len(struct sock *sk)
 {
+	struct socket *sock = sk->sk_socket;
 	struct sk_buff *head;
 	int len = 0;
 	unsigned long flags;
 
+	if (sock->ops->peek_len)
+		return sock->ops->peek_len(sock);
+
 	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
 	head = skb_peek(&sk->sk_receive_queue);
 	if (likely(head)) {
@@ -471,6 +475,16 @@  static int peek_head_len(struct sock *sk)
 	return len;
 }
 
+static int sk_has_rx_data(struct sock *sk)
+{
+	struct socket *sock = sk->sk_socket;
+
+	if (sock->ops->peek_len)
+		return sock->ops->peek_len(sock);
+
+	return skb_queue_empty(&sk->sk_receive_queue);
+}
+
 static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
 {
 	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
@@ -487,7 +501,7 @@  static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
 		endtime = busy_clock() + vq->busyloop_timeout;
 
 		while (vhost_can_busy_poll(&net->dev, endtime) &&
-		       skb_queue_empty(&sk->sk_receive_queue) &&
+		       !sk_has_rx_data(sk) &&
 		       vhost_vq_avail_empty(&net->dev, vq))
 			cpu_relax_lowlatency();
 
diff --git a/include/linux/net.h b/include/linux/net.h
index 9aa49a0..b6b3843 100644
--- a/include/linux/net.h
+++ b/include/linux/net.h
@@ -185,6 +185,7 @@  struct proto_ops {
 	ssize_t 	(*splice_read)(struct socket *sock,  loff_t *ppos,
 				       struct pipe_inode_info *pipe, size_t len, unsigned int flags);
 	int		(*set_peek_off)(struct sock *sk, int val);
+	int		(*peek_len)(struct socket *sock);
 };
 
 #define DECLARE_SOCKADDR(type, dst, src)	\
diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
index 3cb5e1d..080003c 100644
--- a/include/uapi/linux/if_tun.h
+++ b/include/uapi/linux/if_tun.h
@@ -61,6 +61,7 @@ 
 #define IFF_TUN		0x0001
 #define IFF_TAP		0x0002
 #define IFF_NO_PI	0x1000
+#define IFF_TX_ARRAY	0x0010
 /* This flag has no real effect */
 #define IFF_ONE_QUEUE	0x2000
 #define IFF_VNET_HDR	0x4000