diff mbox series

[RFC,4/9] veth: Use NAPI for XDP

Message ID 20180424143923.26519-5-toshiaki.makita1@gmail.com
State RFC, archived
Delegated to: David Miller
Headers show
Series veth: Driver XDP | expand

Commit Message

Toshiaki Makita April 24, 2018, 2:39 p.m. UTC
From: Toshiaki Makita <makita.toshiaki@lab.ntt.co.jp>

In order to avoid stack inflation by recursive XDP program call from
ndo_xdp_xmit, this change introduces NAPI in veth.

Add veth's own NAPI handler when XDP is enabled.
Use ptr_ring to emulate NIC ring. Tx function enqueues packets to the
ring and peer NAPI handler drains the ring.

This way also makes REDIRECT bulk interface simple. When ndo_xdp_xmit is
implemented later, ndo_xdp_flush schedules NAPI of the peer veth device
and NAPI handles xdp frames enqueued by previous ndo_xdp_xmit, which is
quite similar to physical NIC tx function using DMA ring descriptors and
mmio door bell.

Currently only one ring is allocated for each veth device, so it does
not scale on multiqueue env. This can be resolved in the future by
allocating rings on per-queue basis.

Note that NAPI is not used but netif_rx is used when XDP is not loaded,
so this does not change the default behaviour.

Signed-off-by: Toshiaki Makita <makita.toshiaki@lab.ntt.co.jp>
---
 drivers/net/veth.c | 197 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 164 insertions(+), 33 deletions(-)

Comments

Jesper Dangaard Brouer May 1, 2018, 7:50 a.m. UTC | #1
On Tue, 24 Apr 2018 23:39:18 +0900
Toshiaki Makita <toshiaki.makita1@gmail.com> wrote:

> +static int veth_xdp_enqueue(struct veth_priv *priv, void *ptr)
> +{
> +	if (unlikely(ptr_ring_produce(&priv->xdp_ring, ptr)))
> +		return -ENOSPC;
> +
> +	return 0;
> +}

Here we have a lock per (enqueued) packet.  I'm working on changing the
ndo_xdp_xmit API to allow bulking.  And the tun driver have exact same
issue/need.
Toshiaki Makita May 1, 2018, 8:02 a.m. UTC | #2
On 2018/05/01 16:50, Jesper Dangaard Brouer wrote:
> On Tue, 24 Apr 2018 23:39:18 +0900
> Toshiaki Makita <toshiaki.makita1@gmail.com> wrote:
> 
>> +static int veth_xdp_enqueue(struct veth_priv *priv, void *ptr)
>> +{
>> +	if (unlikely(ptr_ring_produce(&priv->xdp_ring, ptr)))
>> +		return -ENOSPC;
>> +
>> +	return 0;
>> +}
> 
> Here we have a lock per (enqueued) packet.  I'm working on changing the
> ndo_xdp_xmit API to allow bulking.  And the tun driver have exact same
> issue/need.

Probably I should have noted in commitlog, but this per-packet lock is
removed in patch 9.
I'm curious about if any change is needed by your new API.
Jesper Dangaard Brouer May 1, 2018, 8:43 a.m. UTC | #3
On Tue, 1 May 2018 17:02:34 +0900
Toshiaki Makita <makita.toshiaki@lab.ntt.co.jp> wrote:

> On 2018/05/01 16:50, Jesper Dangaard Brouer wrote:
> > On Tue, 24 Apr 2018 23:39:18 +0900
> > Toshiaki Makita <toshiaki.makita1@gmail.com> wrote:
> >   
> >> +static int veth_xdp_enqueue(struct veth_priv *priv, void *ptr)
> >> +{
> >> +	if (unlikely(ptr_ring_produce(&priv->xdp_ring, ptr)))
> >> +		return -ENOSPC;
> >> +
> >> +	return 0;
> >> +}  
> > 
> > Here we have a lock per (enqueued) packet.  I'm working on changing the
> > ndo_xdp_xmit API to allow bulking.  And the tun driver have exact same
> > issue/need.  
> 
> Probably I should have noted in commitlog, but this per-packet lock is
> removed in patch 9.
> I'm curious about if any change is needed by your new API.

Again, I'm just moving this into the generic code, to avoid having to
implement this for every driver.

Plus, with CONFIG_RETPOLINE we have the advantage of only calling
ndo_xdp_xmit once (indirect function pointer call).
Toshiaki Makita May 2, 2018, 3:37 a.m. UTC | #4
On 18/05/01 (火) 17:43, Jesper Dangaard Brouer wrote:
> On Tue, 1 May 2018 17:02:34 +0900
> Toshiaki Makita <makita.toshiaki@lab.ntt.co.jp> wrote:
> 
>> On 2018/05/01 16:50, Jesper Dangaard Brouer wrote:
>>> On Tue, 24 Apr 2018 23:39:18 +0900
>>> Toshiaki Makita <toshiaki.makita1@gmail.com> wrote:
>>>    
>>>> +static int veth_xdp_enqueue(struct veth_priv *priv, void *ptr)
>>>> +{
>>>> +	if (unlikely(ptr_ring_produce(&priv->xdp_ring, ptr)))
>>>> +		return -ENOSPC;
>>>> +
>>>> +	return 0;
>>>> +}
>>>
>>> Here we have a lock per (enqueued) packet.  I'm working on changing the
>>> ndo_xdp_xmit API to allow bulking.  And the tun driver have exact same
>>> issue/need.
>>
>> Probably I should have noted in commitlog, but this per-packet lock is
>> removed in patch 9.
>> I'm curious about if any change is needed by your new API.
> 
> Again, I'm just moving this into the generic code, to avoid having to
> implement this for every driver.
> 
> Plus, with CONFIG_RETPOLINE we have the advantage of only calling
> ndo_xdp_xmit once (indirect function pointer call).

Nice. Once it becomes available I'll use it instead.

Toshiaki Makita
diff mbox series

Patch

diff --git a/drivers/net/veth.c b/drivers/net/veth.c
index 7271d9582b4a..452771f31c30 100644
--- a/drivers/net/veth.c
+++ b/drivers/net/veth.c
@@ -21,11 +21,13 @@ 
 #include <linux/module.h>
 #include <linux/bpf.h>
 #include <linux/filter.h>
+#include <linux/ptr_ring.h>
 #include <linux/bpf_trace.h>
 
 #define DRV_NAME	"veth"
 #define DRV_VERSION	"1.0"
 
+#define VETH_RING_SIZE		256
 #define VETH_XDP_HEADROOM	(XDP_PACKET_HEADROOM + NET_IP_ALIGN)
 
 struct pcpu_vstats {
@@ -35,10 +37,14 @@  struct pcpu_vstats {
 };
 
 struct veth_priv {
+	struct napi_struct	xdp_napi;
+	struct net_device	*dev;
 	struct bpf_prog __rcu	*xdp_prog;
 	struct net_device __rcu	*peer;
 	atomic64_t		dropped;
 	unsigned		requested_headroom;
+	bool			rx_notify_masked;
+	struct ptr_ring		xdp_ring;
 	struct xdp_rxq_info	xdp_rxq;
 };
 
@@ -107,28 +113,56 @@  static const struct ethtool_ops veth_ethtool_ops = {
 
 /* general routines */
 
-static struct sk_buff *veth_xdp_rcv_skb(struct net_device *dev,
-					struct sk_buff *skb);
+static void veth_ptr_free(void *ptr)
+{
+	if (!ptr)
+		return;
+	dev_kfree_skb_any(ptr);
+}
 
-static int veth_xdp_rx(struct net_device *dev, struct sk_buff *skb)
+static void veth_xdp_flush(struct veth_priv *priv)
 {
-	skb = veth_xdp_rcv_skb(dev, skb);
-	if (!skb)
+	/* Write ptr_ring before reading rx_notify_masked */
+	smp_mb();
+	if (!priv->rx_notify_masked) {
+		priv->rx_notify_masked = true;
+		napi_schedule(&priv->xdp_napi);
+	}
+}
+
+static int veth_xdp_enqueue(struct veth_priv *priv, void *ptr)
+{
+	if (unlikely(ptr_ring_produce(&priv->xdp_ring, ptr)))
+		return -ENOSPC;
+
+	return 0;
+}
+
+static int veth_xdp_rx(struct veth_priv *priv, struct sk_buff *skb)
+{
+	if (unlikely(veth_xdp_enqueue(priv, skb))) {
+		dev_kfree_skb_any(skb);
 		return NET_RX_DROP;
+	}
 
-	return netif_rx(skb);
+	return NET_RX_SUCCESS;
 }
 
-static int veth_forward_skb(struct net_device *dev, struct sk_buff *skb)
+static int veth_forward_skb(struct net_device *dev, struct sk_buff *skb, bool xdp)
 {
-	return __dev_forward_skb(dev, skb) ?: veth_xdp_rx(dev, skb);
+	struct veth_priv *priv = netdev_priv(dev);
+
+	return __dev_forward_skb(dev, skb) ?: xdp ?
+		veth_xdp_rx(priv, skb) :
+		netif_rx(skb);
 }
 
 static netdev_tx_t veth_xmit(struct sk_buff *skb, struct net_device *dev)
 {
-	struct veth_priv *priv = netdev_priv(dev);
+	struct veth_priv *rcv_priv, *priv = netdev_priv(dev);
 	struct net_device *rcv;
 	int length = skb->len;
+	bool rcv_xdp = false;
 
 	rcu_read_lock();
 	rcv = rcu_dereference(priv->peer);
@@ -137,7 +171,10 @@  static netdev_tx_t veth_xmit(struct sk_buff *skb, struct net_device *dev)
 		goto drop;
 	}
 
-	if (likely(veth_forward_skb(rcv, skb) == NET_RX_SUCCESS)) {
+	rcv_priv = netdev_priv(rcv);
+	rcv_xdp = rcu_access_pointer(rcv_priv->xdp_prog);
+
+	if (likely(veth_forward_skb(rcv, skb, rcv_xdp) == NET_RX_SUCCESS)) {
 		struct pcpu_vstats *stats = this_cpu_ptr(dev->vstats);
 
 		u64_stats_update_begin(&stats->syncp);
@@ -148,7 +185,13 @@  static netdev_tx_t veth_xmit(struct sk_buff *skb, struct net_device *dev)
 drop:
 		atomic64_inc(&priv->dropped);
 	}
+
+	/* TODO: check xmit_more and tx_stopped */
+	if (rcv_xdp)
+		veth_xdp_flush(rcv_priv);
+
 	rcu_read_unlock();
+
 	return NETDEV_TX_OK;
 }
 
@@ -220,10 +263,9 @@  static struct sk_buff *veth_build_skb(void *head, int headroom, int len,
 	return skb;
 }
 
-static struct sk_buff *veth_xdp_rcv_skb(struct net_device *dev,
+static struct sk_buff *veth_xdp_rcv_skb(struct veth_priv *priv,
 					struct sk_buff *skb)
 {
-	struct veth_priv *priv = netdev_priv(dev);
 	u32 pktlen, headroom, act, metalen;
 	int size, mac_len, delta, off;
 	struct bpf_prog *xdp_prog;
@@ -293,7 +335,7 @@  static struct sk_buff *veth_xdp_rcv_skb(struct net_device *dev,
 	default:
 		bpf_warn_invalid_xdp_action(act);
 	case XDP_ABORTED:
-		trace_xdp_exception(dev, xdp_prog, act);
+		trace_xdp_exception(priv->dev, xdp_prog, act);
 	case XDP_DROP:
 		goto drop;
 	}
@@ -306,7 +348,7 @@  static struct sk_buff *veth_xdp_rcv_skb(struct net_device *dev,
 	else if (off < 0)
 		__skb_pull(skb, -off);
 	skb->mac_header -= delta;
-	skb->protocol = eth_type_trans(skb, dev);
+	skb->protocol = eth_type_trans(skb, priv->dev);
 
 	metalen = xdp.data - xdp.data_meta;
 	if (metalen)
@@ -319,6 +361,72 @@  static struct sk_buff *veth_xdp_rcv_skb(struct net_device *dev,
 	return NULL;
 }
 
+static int veth_xdp_rcv(struct veth_priv *priv, int budget)
+{
+	int i, done = 0;
+
+	for (i = 0; i < budget; i++) {
+		void *ptr = ptr_ring_consume(&priv->xdp_ring);
+		struct sk_buff *skb;
+
+		if (!ptr)
+			break;
+
+		skb = veth_xdp_rcv_skb(priv, ptr);
+
+		if (skb)
+			napi_gro_receive(&priv->xdp_napi, skb);
+
+		done++;
+	}
+
+	return done;
+}
+
+static int veth_poll(struct napi_struct *napi, int budget)
+{
+	struct veth_priv *priv =
+		container_of(napi, struct veth_priv, xdp_napi);
+	int done;
+
+	done = veth_xdp_rcv(priv, budget);
+
+	if (done < budget && napi_complete_done(napi, done)) {
+		/* Write rx_notify_masked before reading ptr_ring */
+		smp_store_mb(priv->rx_notify_masked, false);
+		if (unlikely(!ptr_ring_empty(&priv->xdp_ring))) {
+			priv->rx_notify_masked = true;
+			napi_schedule(&priv->xdp_napi);
+		}
+	}
+
+	return done;
+}
+
+static int veth_napi_add(struct net_device *dev)
+{
+	struct veth_priv *priv = netdev_priv(dev);
+	int err;
+
+	err = ptr_ring_init(&priv->xdp_ring, VETH_RING_SIZE, GFP_KERNEL);
+	if (err)
+		return err;
+
+	netif_napi_add(dev, &priv->xdp_napi, veth_poll, NAPI_POLL_WEIGHT);
+	napi_enable(&priv->xdp_napi);
+
+	return 0;
+}
+
+static void veth_napi_del(struct net_device *dev)
+{
+	struct veth_priv *priv = netdev_priv(dev);
+
+	napi_disable(&priv->xdp_napi);
+	netif_napi_del(&priv->xdp_napi);
+	ptr_ring_cleanup(&priv->xdp_ring, veth_ptr_free);
+}
+
 static int veth_open(struct net_device *dev)
 {
 	struct veth_priv *priv = netdev_priv(dev);
@@ -337,6 +445,12 @@  static int veth_open(struct net_device *dev)
 	if (err < 0)
 		goto err_reg_mem;
 
+	if (rtnl_dereference(priv->xdp_prog)) {
+		err = veth_napi_add(dev);
+		if (err)
+			goto err_reg_mem;
+	}
+
 	if (peer->flags & IFF_UP) {
 		netif_carrier_on(dev);
 		netif_carrier_on(peer);
@@ -358,6 +472,9 @@  static int veth_close(struct net_device *dev)
 	if (peer)
 		netif_carrier_off(peer);
 
+	if (rtnl_dereference(priv->xdp_prog))
+		veth_napi_del(dev);
+
 	xdp_rxq_info_unreg(&priv->xdp_rxq);
 
 	return 0;
@@ -384,15 +501,12 @@  static void veth_dev_free(struct net_device *dev)
 #ifdef CONFIG_NET_POLL_CONTROLLER
 static void veth_poll_controller(struct net_device *dev)
 {
-	/* veth only receives frames when its peer sends one
-	 * Since it's a synchronous operation, we are guaranteed
-	 * never to have pending data when we poll for it so
-	 * there is nothing to do here.
-	 *
-	 * We need this though so netpoll recognizes us as an interface that
-	 * supports polling, which enables bridge devices in virt setups to
-	 * still use netconsole
-	 */
+	struct veth_priv *priv = netdev_priv(dev);
+
+	rcu_read_lock();
+	if (rcu_access_pointer(priv->xdp_prog))
+		veth_xdp_flush(priv);
+	rcu_read_unlock();
 }
 #endif	/* CONFIG_NET_POLL_CONTROLLER */
 
@@ -456,26 +570,40 @@  static int veth_xdp_set(struct net_device *dev, struct bpf_prog *prog,
 	struct veth_priv *priv = netdev_priv(dev);
 	struct bpf_prog *old_prog;
 	struct net_device *peer;
+	int err;
 
 	old_prog = rtnl_dereference(priv->xdp_prog);
 	peer = rtnl_dereference(priv->peer);
 
-	if (!old_prog && prog && peer) {
-		peer->hw_features &= ~NETIF_F_GSO_SOFTWARE;
-		peer->max_mtu = PAGE_SIZE - VETH_XDP_HEADROOM -
-			peer->hard_header_len -
-			SKB_DATA_ALIGN(sizeof(struct skb_shared_info));
-		if (peer->mtu > peer->max_mtu)
-			dev_set_mtu(peer, peer->max_mtu);
+	if (!old_prog && prog) {
+		if (dev->flags & IFF_UP) {
+			err = veth_napi_add(dev);
+			if (err)
+				return err;
+		}
+
+		if (peer) {
+			peer->hw_features &= ~NETIF_F_GSO_SOFTWARE;
+			peer->max_mtu = PAGE_SIZE - VETH_XDP_HEADROOM -
+				peer->hard_header_len -
+				SKB_DATA_ALIGN(sizeof(struct skb_shared_info));
+			if (peer->mtu > peer->max_mtu)
+				dev_set_mtu(peer, peer->max_mtu);
+		}
 	}
 
 	rcu_assign_pointer(priv->xdp_prog, prog);
 
 	if (old_prog) {
 		bpf_prog_put(old_prog);
-		if (!prog && peer) {
-			peer->hw_features |= NETIF_F_GSO_SOFTWARE;
-			peer->max_mtu = ETH_MAX_MTU;
+		if (!prog) {
+			if (dev->flags & IFF_UP)
+				veth_napi_del(dev);
+
+			if (peer) {
+				peer->hw_features |= NETIF_F_GSO_SOFTWARE;
+				peer->max_mtu = ETH_MAX_MTU;
+			}
 		}
 	}
 
@@ -688,10 +816,13 @@  static int veth_newlink(struct net *src_net, struct net_device *dev,
 	 */
 
 	priv = netdev_priv(dev);
+	priv->dev = dev;
 	rcu_assign_pointer(priv->peer, peer);
 
 	priv = netdev_priv(peer);
+	priv->dev = peer;
 	rcu_assign_pointer(priv->peer, dev);
+
 	return 0;
 
 err_register_dev: