diff mbox series

[RFC,19/24] xsk: add support for zero copy Rx

Message ID 20180131135356.19134-20-bjorn.topel@gmail.com
State RFC, archived
Delegated to: David Miller
Headers show
Series Introducing AF_XDP support | expand

Commit Message

Björn Töpel Jan. 31, 2018, 1:53 p.m. UTC
From: Björn Töpel <bjorn.topel@intel.com>

In this commit we start making use of the new ndo_bpf sub-commands,
and try to enable zero copy, if available.

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
---
 net/xdp/xsk.c | 185 +++++++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 145 insertions(+), 40 deletions(-)
diff mbox series

Patch

diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index f372c3288301..f05ab825d157 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -29,15 +29,21 @@ 
 #include <linux/netdevice.h>
 #include <net/sock.h>
 
+#include <net/xdp_sock.h>
+#include <linux/buff_pool.h>
+
 #include "xsk.h"
 #include "xsk_buff.h"
 #include "xsk_ring.h"
+#include "xsk_buff_pool.h"
+#include "xsk_packet_array.h"
 
 #define XSK_UMEM_MIN_FRAME_SIZE 2048
 #define XSK_ARRAY_SIZE 512
 
 struct xsk_info {
 	struct xsk_packet_array *pa;
+	struct buff_pool *bp;
 	spinlock_t pa_lock;
 	struct xsk_queue *q;
 	struct xsk_umem *umem;
@@ -56,8 +62,24 @@  struct xdp_sock {
 	struct mutex tx_mutex;
 	u32 ifindex;
 	u16 queue_id;
+	bool zc_mode;
 };
 
+static inline bool xsk_is_zc_cap(struct xdp_sock *xs)
+{
+	return xs->zc_mode;
+}
+
+static void xsk_set_zc_cap(struct xdp_sock *xs)
+{
+	xs->zc_mode = true;
+}
+
+static void xsk_clear_zc_cap(struct xdp_sock *xs)
+{
+	xs->zc_mode = false;
+}
+
 static struct xdp_sock *xdp_sk(struct sock *sk)
 {
 	return (struct xdp_sock *)sk;
@@ -323,6 +345,22 @@  static int xsk_init_tx_ring(struct sock *sk, int mr_fd, u32 desc_nr)
 	return xsk_init_ring(sk, mr_fd, desc_nr, &xs->tx);
 }
 
+static void xsk_disable_zc(struct xdp_sock *xs)
+{
+	struct netdev_bpf bpf = {};
+
+	if (!xsk_is_zc_cap(xs))
+		return;
+
+	bpf.command = XDP_UNREGISTER_XSK;
+	bpf.xsk.queue_id = xs->queue_id;
+
+	rtnl_lock();
+	(void)xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+	rtnl_unlock();
+	xsk_clear_zc_cap(xs);
+}
+
 static int xsk_release(struct socket *sock)
 {
 	struct sock *sk = sock->sk;
@@ -344,14 +382,22 @@  static int xsk_release(struct socket *sock)
 		xs_prev = xs->dev->_rx[xs->queue_id].xs;
 		rcu_assign_pointer(xs->dev->_rx[xs->queue_id].xs, NULL);
 
+		xsk_disable_zc(xs);
+
 		/* Wait for driver to stop using the xdp socket. */
 		synchronize_net();
 
 		xskpa_destroy(xs->rx.pa);
-		xskpa_destroy(xs->tx.pa);
-		xsk_umem_destroy(xs_prev->umem);
+		bpool_destroy(xs->rx.bp);
 		xskq_destroy(xs_prev->rx.q);
+		xsk_buff_info_destroy(xs->rx.buff_info);
+
+		xskpa_destroy(xs->tx.pa);
 		xskq_destroy(xs_prev->tx.q);
+		xsk_buff_info_destroy(xs->tx.buff_info);
+
+		xsk_umem_destroy(xs_prev->umem);
+
 		kobject_put(&xs_prev->dev->_rx[xs->queue_id].kobj);
 		dev_put(xs_prev->dev);
 	}
@@ -365,6 +411,45 @@  static int xsk_release(struct socket *sock)
 	return 0;
 }
 
+static int xsk_dma_map_pool_cb(struct buff_pool *pool, struct device *dev,
+			       enum dma_data_direction dir,
+			       unsigned long attrs)
+{
+	struct xsk_buff_pool *bp = (struct xsk_buff_pool *)pool->pool;
+
+	return xsk_buff_dma_map(bp->bi, dev, dir, attrs);
+}
+
+static void xsk_error_report(void *ctx, int err)
+{
+	struct xsk_sock *xs = (struct xsk_sock *)ctx;
+}
+
+static void xsk_try_enable_zc(struct xdp_sock *xs)
+{
+	struct xsk_rx_parms rx_parms = {};
+	struct netdev_bpf bpf = {};
+	int err;
+
+	if (!xs->dev->netdev_ops->ndo_bpf)
+		return;
+
+	rx_parms.buff_pool = xs->rx.bp;
+	rx_parms.dma_map = xsk_dma_map_pool_cb;
+	rx_parms.error_report_ctx = xs;
+	rx_parms.error_report = xsk_error_report;
+
+	bpf.command = XDP_REGISTER_XSK;
+	bpf.xsk.rx_parms = &rx_parms;
+	bpf.xsk.queue_id = xs->queue_id;
+
+	rtnl_lock();
+	err = xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+	rtnl_unlock();
+	if (!err)
+		xsk_set_zc_cap(xs);
+}
+
 static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 {
 	struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
@@ -429,6 +514,13 @@  static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 		goto out_rx_pa;
 	}
 
+	/* ...and Rx buffer pool is used for zerocopy. */
+	xs->rx.bp = xsk_buff_pool_create(xs->rx.buff_info, xs->rx.q);
+	if (!xs->rx.bp) {
+		err = -ENOMEM;
+		goto out_rx_bp;
+	}
+
 	/* Tx */
 	xs->tx.buff_info = xsk_buff_info_create(xs->tx.umem);
 	if (!xs->tx.buff_info) {
@@ -446,12 +538,17 @@  static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 
 	rcu_assign_pointer(dev->_rx[sxdp->sxdp_queue_id].xs, xs);
 
+	xsk_try_enable_zc(xs);
+
 	goto out_unlock;
 
 out_tx_pa:
 	xsk_buff_info_destroy(xs->tx.buff_info);
 	xs->tx.buff_info = NULL;
 out_tx_bi:
+	bpool_destroy(xs->rx.bp);
+	xs->rx.bp = NULL;
+out_rx_bp:
 	xskpa_destroy(xs->rx.pa);
 	xs->rx.pa = NULL;
 out_rx_pa:
@@ -509,27 +606,16 @@  int xsk_generic_rcv(struct xdp_buff *xdp)
 }
 EXPORT_SYMBOL_GPL(xsk_generic_rcv);
 
-struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
 {
 	u32 len = xdp->data_end - xdp->data;
 	struct xsk_frame_set p;
 
-	rcu_read_lock();
-	if (!xsk)
-		xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
-	if (unlikely(!xsk)) {
-		rcu_read_unlock();
-		return ERR_PTR(-EINVAL);
-	}
-
-	if (!xskpa_next_frame_populate(xsk->rx.pa, &p)) {
-		rcu_read_unlock();
-		return ERR_PTR(-ENOSPC);
-	}
+	if (!xskpa_next_frame_populate(xs->rx.pa, &p))
+		return -ENOSPC;
 
 	memcpy(xskf_get_data(&p), xdp->data, len);
 	xskf_set_frame_no_offset(&p, len, true);
-	rcu_read_unlock();
 
 	/* We assume that the semantic of xdp_do_redirect is such that
 	 * ndo_xdp_xmit will decrease the refcount of the page when it
@@ -540,41 +626,60 @@  struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
 	 */
 	page_frag_free(xdp->data);
 
-	return xsk;
+	return 0;
 }
-EXPORT_SYMBOL_GPL(xsk_rcv);
 
-int xsk_zc_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static void __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp)
 {
-	u32 offset = xdp->data - xdp->data_hard_start;
-	u32 len = xdp->data_end - xdp->data;
-	struct xsk_frame_set p;
+	struct xsk_buff *b = (struct xsk_buff *)xdp->bp_handle;
 
-	/* We do not need any locking here since we are guaranteed
-	 * a single producer and a single consumer.
-	 */
-	if (xskpa_next_frame_populate(xsk->rx.pa, &p)) {
-		xskf_set_frame(&p, len, offset, true);
-		return 0;
-	}
-
-	/* No user-space buffer to put the packet in. */
-	return -ENOSPC;
+	xskq_enq_lazy(xs->rx.q, b->id, xdp->data_end - xdp->data,
+		      b->offset + (xdp->data - xdp->data_hard_start));
 }
-EXPORT_SYMBOL_GPL(xsk_zc_rcv);
 
-void xsk_flush(struct xdp_sock *xsk)
+struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
 {
+	int err = 0;
+
 	rcu_read_lock();
-	if (!xsk)
-		xsk = lookup_xsk(xsk->dev, xsk->queue_id);
-	if (unlikely(!xsk)) {
-		rcu_read_unlock();
-		return;
+
+	if (!xsk) {
+		xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
+		if (!xsk) {
+			err = -EINVAL;
+			goto out;
+		}
 	}
 
-	WARN_ON_ONCE(xskpa_flush(xsk->rx.pa));
+	/* XXX Ick, this is very hacky. Need a better solution */
+	if (xdp->rxq->bpool)
+		__xsk_rcv_zc(xsk, xdp);
+	else
+		err = __xsk_rcv(xsk, xdp);
+
+out:
 	rcu_read_unlock();
+
+	return err ? ERR_PTR(err) : xsk;
+}
+EXPORT_SYMBOL_GPL(xsk_rcv);
+
+static void __xsk_flush(struct xdp_sock *xs)
+{
+	WARN_ON_ONCE(xskpa_flush(xs->rx.pa));
+}
+
+static void __xsk_flush_zc(struct xdp_sock *xs)
+{
+	xskq_enq_flush(xs->rx.q);
+}
+
+void xsk_flush(struct xdp_sock *xsk)
+{
+	if (xsk_is_zc_cap(xsk))
+		__xsk_flush_zc(xsk);
+	else
+		__xsk_flush(xsk);
 }
 EXPORT_SYMBOL_GPL(xsk_flush);