diff mbox series

[bpf-next,V5,6/8] xdp: change ndo_xdp_xmit API to support bulking

Message ID 152717317221.4777.9863933878293579864.stgit@firesoul
State Accepted, archived
Delegated to: BPF Maintainers
Headers show
Series xdp: introduce bulking for ndo_xdp_xmit API | expand

Commit Message

Jesper Dangaard Brouer May 24, 2018, 2:46 p.m. UTC
This patch change the API for ndo_xdp_xmit to support bulking
xdp_frames.

When kernel is compiled with CONFIG_RETPOLINE, XDP sees a huge slowdown.
Most of the slowdown is caused by DMA API indirect function calls, but
also the net_device->ndo_xdp_xmit() call.

Benchmarked patch with CONFIG_RETPOLINE, using xdp_redirect_map with
single flow/core test (CPU E5-1650 v4 @ 3.60GHz), showed
performance improved:
 for driver ixgbe: 6,042,682 pps -> 6,853,768 pps = +811,086 pps
 for driver i40e : 6,187,169 pps -> 6,724,519 pps = +537,350 pps

With frames avail as a bulk inside the driver ndo_xdp_xmit call,
further optimizations are possible, like bulk DMA-mapping for TX.

Testing without CONFIG_RETPOLINE show the same performance for
physical NIC drivers.

The virtual NIC driver tun sees a huge performance boost, as it can
avoid doing per frame producer locking, but instead amortize the
locking cost over the bulk.

V2: Fix compile errors reported by kbuild test robot <lkp@intel.com>
V4: Isolated ndo, driver changes and callers.

Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
---
 drivers/net/ethernet/intel/i40e/i40e_txrx.c   |   26 +++++++---
 drivers/net/ethernet/intel/i40e/i40e_txrx.h   |    2 -
 drivers/net/ethernet/intel/ixgbe/ixgbe_main.c |   21 ++++++--
 drivers/net/tun.c                             |   37 +++++++++-----
 drivers/net/virtio_net.c                      |   66 +++++++++++++++++++------
 include/linux/netdevice.h                     |   14 +++--
 kernel/bpf/devmap.c                           |   29 +++++++----
 net/core/filter.c                             |    8 ++-
 8 files changed, 139 insertions(+), 64 deletions(-)
diff mbox series

Patch

diff --git a/drivers/net/ethernet/intel/i40e/i40e_txrx.c b/drivers/net/ethernet/intel/i40e/i40e_txrx.c
index 5efa68de935b..9b698c5acd05 100644
--- a/drivers/net/ethernet/intel/i40e/i40e_txrx.c
+++ b/drivers/net/ethernet/intel/i40e/i40e_txrx.c
@@ -3664,14 +3664,19 @@  netdev_tx_t i40e_lan_xmit_frame(struct sk_buff *skb, struct net_device *netdev)
  * @dev: netdev
  * @xdp: XDP buffer
  *
- * Returns Zero if sent, else an error code
+ * Returns number of frames successfully sent. Frames that fail are
+ * free'ed via XDP return API.
+ *
+ * For error cases, a negative errno code is returned and no-frames
+ * are transmitted (caller must handle freeing frames).
  **/
-int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
+int i40e_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames)
 {
 	struct i40e_netdev_priv *np = netdev_priv(dev);
 	unsigned int queue_index = smp_processor_id();
 	struct i40e_vsi *vsi = np->vsi;
-	int err;
+	int drops = 0;
+	int i;
 
 	if (test_bit(__I40E_VSI_DOWN, vsi->state))
 		return -ENETDOWN;
@@ -3679,11 +3684,18 @@  int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
 	if (!i40e_enabled_xdp_vsi(vsi) || queue_index >= vsi->num_queue_pairs)
 		return -ENXIO;
 
-	err = i40e_xmit_xdp_ring(xdpf, vsi->xdp_rings[queue_index]);
-	if (err != I40E_XDP_TX)
-		return -ENOSPC;
+	for (i = 0; i < n; i++) {
+		struct xdp_frame *xdpf = frames[i];
+		int err;
 
-	return 0;
+		err = i40e_xmit_xdp_ring(xdpf, vsi->xdp_rings[queue_index]);
+		if (err != I40E_XDP_TX) {
+			xdp_return_frame_rx_napi(xdpf);
+			drops++;
+		}
+	}
+
+	return n - drops;
 }
 
 /**
diff --git a/drivers/net/ethernet/intel/i40e/i40e_txrx.h b/drivers/net/ethernet/intel/i40e/i40e_txrx.h
index fdd2c55f03a6..eb8804b3d7b6 100644
--- a/drivers/net/ethernet/intel/i40e/i40e_txrx.h
+++ b/drivers/net/ethernet/intel/i40e/i40e_txrx.h
@@ -487,7 +487,7 @@  u32 i40e_get_tx_pending(struct i40e_ring *ring, bool in_sw);
 void i40e_detect_recover_hung(struct i40e_vsi *vsi);
 int __i40e_maybe_stop_tx(struct i40e_ring *tx_ring, int size);
 bool __i40e_chk_linearize(struct sk_buff *skb);
-int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf);
+int i40e_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames);
 void i40e_xdp_flush(struct net_device *dev);
 
 /**
diff --git a/drivers/net/ethernet/intel/ixgbe/ixgbe_main.c b/drivers/net/ethernet/intel/ixgbe/ixgbe_main.c
index 6652b201df5b..9645619f7729 100644
--- a/drivers/net/ethernet/intel/ixgbe/ixgbe_main.c
+++ b/drivers/net/ethernet/intel/ixgbe/ixgbe_main.c
@@ -10017,11 +10017,13 @@  static int ixgbe_xdp(struct net_device *dev, struct netdev_bpf *xdp)
 	}
 }
 
-static int ixgbe_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
+static int ixgbe_xdp_xmit(struct net_device *dev, int n,
+			  struct xdp_frame **frames)
 {
 	struct ixgbe_adapter *adapter = netdev_priv(dev);
 	struct ixgbe_ring *ring;
-	int err;
+	int drops = 0;
+	int i;
 
 	if (unlikely(test_bit(__IXGBE_DOWN, &adapter->state)))
 		return -ENETDOWN;
@@ -10033,11 +10035,18 @@  static int ixgbe_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
 	if (unlikely(!ring))
 		return -ENXIO;
 
-	err = ixgbe_xmit_xdp_ring(adapter, xdpf);
-	if (err != IXGBE_XDP_TX)
-		return -ENOSPC;
+	for (i = 0; i < n; i++) {
+		struct xdp_frame *xdpf = frames[i];
+		int err;
 
-	return 0;
+		err = ixgbe_xmit_xdp_ring(adapter, xdpf);
+		if (err != IXGBE_XDP_TX) {
+			xdp_return_frame_rx_napi(xdpf);
+			drops++;
+		}
+	}
+
+	return n - drops;
 }
 
 static void ixgbe_xdp_flush(struct net_device *dev)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 44d4f3d25350..d3dcfcb1c4b3 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -70,6 +70,7 @@ 
 #include <net/netns/generic.h>
 #include <net/rtnetlink.h>
 #include <net/sock.h>
+#include <net/xdp.h>
 #include <linux/seq_file.h>
 #include <linux/uio.h>
 #include <linux/skb_array.h>
@@ -1290,34 +1291,44 @@  static const struct net_device_ops tun_netdev_ops = {
 	.ndo_get_stats64	= tun_net_get_stats64,
 };
 
-static int tun_xdp_xmit(struct net_device *dev, struct xdp_frame *frame)
+static int tun_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames)
 {
 	struct tun_struct *tun = netdev_priv(dev);
 	struct tun_file *tfile;
 	u32 numqueues;
-	int ret = 0;
+	int drops = 0;
+	int cnt = n;
+	int i;
 
 	rcu_read_lock();
 
 	numqueues = READ_ONCE(tun->numqueues);
 	if (!numqueues) {
-		ret = -ENOSPC;
-		goto out;
+		rcu_read_unlock();
+		return -ENXIO; /* Caller will free/return all frames */
 	}
 
 	tfile = rcu_dereference(tun->tfiles[smp_processor_id() %
 					    numqueues]);
-	/* Encode the XDP flag into lowest bit for consumer to differ
-	 * XDP buffer from sk_buff.
-	 */
-	if (ptr_ring_produce(&tfile->tx_ring, tun_xdp_to_ptr(frame))) {
-		this_cpu_inc(tun->pcpu_stats->tx_dropped);
-		ret = -ENOSPC;
+
+	spin_lock(&tfile->tx_ring.producer_lock);
+	for (i = 0; i < n; i++) {
+		struct xdp_frame *xdp = frames[i];
+		/* Encode the XDP flag into lowest bit for consumer to differ
+		 * XDP buffer from sk_buff.
+		 */
+		void *frame = tun_xdp_to_ptr(xdp);
+
+		if (__ptr_ring_produce(&tfile->tx_ring, frame)) {
+			this_cpu_inc(tun->pcpu_stats->tx_dropped);
+			xdp_return_frame_rx_napi(xdp);
+			drops++;
+		}
 	}
+	spin_unlock(&tfile->tx_ring.producer_lock);
 
-out:
 	rcu_read_unlock();
-	return ret;
+	return cnt - drops;
 }
 
 static int tun_xdp_tx(struct net_device *dev, struct xdp_buff *xdp)
@@ -1327,7 +1338,7 @@  static int tun_xdp_tx(struct net_device *dev, struct xdp_buff *xdp)
 	if (unlikely(!frame))
 		return -EOVERFLOW;
 
-	return tun_xdp_xmit(dev, frame);
+	return tun_xdp_xmit(dev, 1, &frame);
 }
 
 static void tun_xdp_flush(struct net_device *dev)
diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
index f34794a76c4d..39a0783d1cde 100644
--- a/drivers/net/virtio_net.c
+++ b/drivers/net/virtio_net.c
@@ -419,23 +419,13 @@  static void virtnet_xdp_flush(struct net_device *dev)
 	virtqueue_kick(sq->vq);
 }
 
-static int __virtnet_xdp_xmit(struct virtnet_info *vi,
-			       struct xdp_frame *xdpf)
+static int __virtnet_xdp_xmit_one(struct virtnet_info *vi,
+				   struct send_queue *sq,
+				   struct xdp_frame *xdpf)
 {
 	struct virtio_net_hdr_mrg_rxbuf *hdr;
-	struct xdp_frame *xdpf_sent;
-	struct send_queue *sq;
-	unsigned int len;
-	unsigned int qp;
 	int err;
 
-	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
-	sq = &vi->sq[qp];
-
-	/* Free up any pending old buffers before queueing new ones. */
-	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
-		xdp_return_frame(xdpf_sent);
-
 	/* virtqueue want to use data area in-front of packet */
 	if (unlikely(xdpf->metasize > 0))
 		return -EOPNOTSUPP;
@@ -459,11 +449,40 @@  static int __virtnet_xdp_xmit(struct virtnet_info *vi,
 	return 0;
 }
 
-static int virtnet_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
+static int __virtnet_xdp_tx_xmit(struct virtnet_info *vi,
+				   struct xdp_frame *xdpf)
+{
+	struct xdp_frame *xdpf_sent;
+	struct send_queue *sq;
+	unsigned int len;
+	unsigned int qp;
+
+	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
+	sq = &vi->sq[qp];
+
+	/* Free up any pending old buffers before queueing new ones. */
+	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
+		xdp_return_frame(xdpf_sent);
+
+	return __virtnet_xdp_xmit_one(vi, sq, xdpf);
+}
+
+static int virtnet_xdp_xmit(struct net_device *dev,
+			    int n, struct xdp_frame **frames)
 {
 	struct virtnet_info *vi = netdev_priv(dev);
 	struct receive_queue *rq = vi->rq;
+	struct xdp_frame *xdpf_sent;
 	struct bpf_prog *xdp_prog;
+	struct send_queue *sq;
+	unsigned int len;
+	unsigned int qp;
+	int drops = 0;
+	int err;
+	int i;
+
+	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
+	sq = &vi->sq[qp];
 
 	/* Only allow ndo_xdp_xmit if XDP is loaded on dev, as this
 	 * indicate XDP resources have been successfully allocated.
@@ -472,7 +491,20 @@  static int virtnet_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
 	if (!xdp_prog)
 		return -ENXIO;
 
-	return __virtnet_xdp_xmit(vi, xdpf);
+	/* Free up any pending old buffers before queueing new ones. */
+	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
+		xdp_return_frame(xdpf_sent);
+
+	for (i = 0; i < n; i++) {
+		struct xdp_frame *xdpf = frames[i];
+
+		err = __virtnet_xdp_xmit_one(vi, sq, xdpf);
+		if (err) {
+			xdp_return_frame_rx_napi(xdpf);
+			drops++;
+		}
+	}
+	return n - drops;
 }
 
 static unsigned int virtnet_get_headroom(struct virtnet_info *vi)
@@ -616,7 +648,7 @@  static struct sk_buff *receive_small(struct net_device *dev,
 			xdpf = convert_to_xdp_frame(&xdp);
 			if (unlikely(!xdpf))
 				goto err_xdp;
-			err = __virtnet_xdp_xmit(vi, xdpf);
+			err = __virtnet_xdp_tx_xmit(vi, xdpf);
 			if (unlikely(err)) {
 				trace_xdp_exception(vi->dev, xdp_prog, act);
 				goto err_xdp;
@@ -779,7 +811,7 @@  static struct sk_buff *receive_mergeable(struct net_device *dev,
 			xdpf = convert_to_xdp_frame(&xdp);
 			if (unlikely(!xdpf))
 				goto err_xdp;
-			err = __virtnet_xdp_xmit(vi, xdpf);
+			err = __virtnet_xdp_tx_xmit(vi, xdpf);
 			if (unlikely(err)) {
 				trace_xdp_exception(vi->dev, xdp_prog, act);
 				if (unlikely(xdp_page != page))
diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
index 03ed492c4e14..debdb6286170 100644
--- a/include/linux/netdevice.h
+++ b/include/linux/netdevice.h
@@ -1185,9 +1185,13 @@  struct dev_ifalias {
  *	This function is used to set or query state related to XDP on the
  *	netdevice and manage BPF offload. See definition of
  *	enum bpf_netdev_command for details.
- * int (*ndo_xdp_xmit)(struct net_device *dev, struct xdp_frame *xdp);
- *	This function is used to submit a XDP packet for transmit on a
- *	netdevice.
+ * int (*ndo_xdp_xmit)(struct net_device *dev, int n, struct xdp_frame **xdp);
+ *	This function is used to submit @n XDP packets for transmit on a
+ *	netdevice. Returns number of frames successfully transmitted, frames
+ *	that got dropped are freed/returned via xdp_return_frame().
+ *	Returns negative number, means general error invoking ndo, meaning
+ *	no frames were xmit'ed and core-caller will free all frames.
+ *	TODO: Consider add flag to allow sending flush operation.
  * void (*ndo_xdp_flush)(struct net_device *dev);
  *	This function is used to inform the driver to flush a particular
  *	xdp tx queue. Must be called on same CPU as xdp_xmit.
@@ -1375,8 +1379,8 @@  struct net_device_ops {
 						       int needed_headroom);
 	int			(*ndo_bpf)(struct net_device *dev,
 					   struct netdev_bpf *bpf);
-	int			(*ndo_xdp_xmit)(struct net_device *dev,
-						struct xdp_frame *xdp);
+	int			(*ndo_xdp_xmit)(struct net_device *dev, int n,
+						struct xdp_frame **xdp);
 	void			(*ndo_xdp_flush)(struct net_device *dev);
 };
 
diff --git a/kernel/bpf/devmap.c b/kernel/bpf/devmap.c
index a9cd5c93dd2b..77908311ec98 100644
--- a/kernel/bpf/devmap.c
+++ b/kernel/bpf/devmap.c
@@ -232,24 +232,31 @@  static int bq_xmit_all(struct bpf_dtab_netdev *obj,
 		prefetch(xdpf);
 	}
 
-	for (i = 0; i < bq->count; i++) {
-		struct xdp_frame *xdpf = bq->q[i];
-		int err;
-
-		err = dev->netdev_ops->ndo_xdp_xmit(dev, xdpf);
-		if (err) {
-			drops++;
-			xdp_return_frame_rx_napi(xdpf);
-		} else {
-			sent++;
-		}
+	sent = dev->netdev_ops->ndo_xdp_xmit(dev, bq->count, bq->q);
+	if (sent < 0) {
+		sent = 0;
+		goto error;
 	}
+	drops = bq->count - sent;
+out:
 	bq->count = 0;
 
 	trace_xdp_devmap_xmit(&obj->dtab->map, obj->bit,
 			      sent, drops, bq->dev_rx, dev);
 	bq->dev_rx = NULL;
 	return 0;
+error:
+	/* If ndo_xdp_xmit fails with an errno, no frames have been
+	 * xmit'ed and it's our responsibility to them free all.
+	 */
+	for (i = 0; i < bq->count; i++) {
+		struct xdp_frame *xdpf = bq->q[i];
+
+		/* RX path under NAPI protection, can return frames faster */
+		xdp_return_frame_rx_napi(xdpf);
+		drops++;
+	}
+	goto out;
 }
 
 /* __dev_map_flush is called from xdp_do_flush_map() which _must_ be signaled
diff --git a/net/core/filter.c b/net/core/filter.c
index 36cf2f87d742..1d75f9322275 100644
--- a/net/core/filter.c
+++ b/net/core/filter.c
@@ -3039,7 +3039,7 @@  static int __bpf_tx_xdp(struct net_device *dev,
 			u32 index)
 {
 	struct xdp_frame *xdpf;
-	int err;
+	int sent;
 
 	if (!dev->netdev_ops->ndo_xdp_xmit) {
 		return -EOPNOTSUPP;
@@ -3049,9 +3049,9 @@  static int __bpf_tx_xdp(struct net_device *dev,
 	if (unlikely(!xdpf))
 		return -EOVERFLOW;
 
-	err = dev->netdev_ops->ndo_xdp_xmit(dev, xdpf);
-	if (err)
-		return err;
+	sent = dev->netdev_ops->ndo_xdp_xmit(dev, 1, &xdpf);
+	if (sent <= 0)
+		return sent;
 	dev->netdev_ops->ndo_xdp_flush(dev);
 	return 0;
 }