diff mbox series

[RFC,04/24] xsk: add bind support and introduce Rx functionality

Message ID 20180131135356.19134-5-bjorn.topel@gmail.com
State RFC, archived
Delegated to: David Miller
Headers show
Series Introducing AF_XDP support | expand

Commit Message

Björn Töpel Jan. 31, 2018, 1:53 p.m. UTC
From: Magnus Karlsson <magnus.karlsson@intel.com>

Here the bind syscall is implemented. Also, two frame receive
functions are introduced: xsk_rcv and xsk_generic_rcv. The latter is
used for the XDP_SKB path, and the first is used for XDP_DRV.

Later commits will wire up the receive functions.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
---
 include/linux/netdevice.h |   3 +
 net/xdp/xsk.c             | 211 +++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 210 insertions(+), 4 deletions(-)
diff mbox series

Patch

diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
index 4c77f39ebd65..36cc7e92bd8e 100644
--- a/include/linux/netdevice.h
+++ b/include/linux/netdevice.h
@@ -688,6 +688,9 @@  struct netdev_rx_queue {
 	struct kobject			kobj;
 	struct net_device		*dev;
 	struct xdp_rxq_info		xdp_rxq;
+#ifdef CONFIG_XDP_SOCKETS
+	struct xdp_sock __rcu           *xs;
+#endif
 } ____cacheline_aligned_in_smp;
 
 /*
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 34294ac2f75f..db918e31079b 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -34,8 +34,11 @@ 
 #include "xsk_ring.h"
 
 #define XSK_UMEM_MIN_FRAME_SIZE 2048
+#define XSK_ARRAY_SIZE 512
 
 struct xsk_info {
+	struct xsk_packet_array *pa;
+	spinlock_t pa_lock;
 	struct xsk_queue *q;
 	struct xsk_umem *umem;
 	struct socket *mrsock;
@@ -47,7 +50,10 @@  struct xdp_sock {
 	struct sock sk;
 	struct xsk_info rx;
 	struct xsk_info tx;
+	struct net_device *dev;
 	struct xsk_umem *umem;
+	u32 ifindex;
+	u16 queue_id;
 };
 
 static struct xdp_sock *xdp_sk(struct sock *sk)
@@ -330,9 +336,21 @@  static int xsk_release(struct socket *sock)
 	sock_prot_inuse_add(net, sk->sk_prot, -1);
 	local_bh_enable();
 
-	xsk_umem_destroy(xs->umem);
-	xskq_destroy(xs->rx.q);
-	xskq_destroy(xs->tx.q);
+	if (xs->dev) {
+		struct xdp_sock *xs_prev;
+
+		xs_prev = xs->dev->_rx[xs->queue_id].xs;
+		rcu_assign_pointer(xs->dev->_rx[xs->queue_id].xs, NULL);
+
+		/* Wait for driver to stop using the xdp socket. */
+		synchronize_net();
+
+		xskpa_destroy(xs->rx.pa);
+		xsk_umem_destroy(xs_prev->umem);
+		xskq_destroy(xs_prev->rx.q);
+		kobject_put(&xs_prev->dev->_rx[xs->queue_id].kobj);
+		dev_put(xs_prev->dev);
+	}
 
 	sock_orphan(sk);
 	sock->sk = NULL;
@@ -345,8 +363,193 @@  static int xsk_release(struct socket *sock)
 
 static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 {
-	return -EOPNOTSUPP;
+	struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
+	struct sock *sk = sock->sk;
+	struct xdp_sock *xs = xdp_sk(sk);
+	struct net_device *dev_curr;
+	struct net_device *dev;
+	int err = 0;
+
+	if (addr_len < sizeof(struct sockaddr_xdp))
+		return -EINVAL;
+	if (sxdp->sxdp_family != AF_XDP)
+		return -EINVAL;
+
+	lock_sock(sk);
+	dev_curr = xs->dev;
+	dev = dev_get_by_index_rcu(sock_net(sk), sxdp->sxdp_ifindex);
+	if (!dev) {
+		err = -ENODEV;
+		goto out_unlock;
+	}
+	dev_hold(dev);
+
+	if (dev_curr && dev_curr != dev) {
+		/* XXX Needs rebind code here */
+		err = -EBUSY;
+		goto out_unlock;
+	}
+
+	if (!xs->rx.q || !xs->tx.q) {
+		/* XXX For now require Tx and Rx */
+		err = -EINVAL;
+		goto out_unlock;
+	}
+
+	if (sxdp->sxdp_queue_id > dev->num_rx_queues) {
+		err = -EINVAL;
+		goto out_unlock;
+	}
+	kobject_get(&dev->_rx[sxdp->sxdp_queue_id].kobj);
+
+	xs->dev = dev;
+	xs->ifindex = sxdp->sxdp_ifindex;
+	xs->queue_id = sxdp->sxdp_queue_id;
+	spin_lock_init(&xs->rx.pa_lock);
+
+	/* Rx */
+	xs->rx.buff_info = xsk_buff_info_create(xs->rx.umem);
+	if (!xs->rx.buff_info) {
+		err = -ENOMEM;
+		goto out_unlock;
+	}
+	xskq_set_buff_info(xs->rx.q, xs->rx.buff_info, XSK_VALIDATION_RX);
+
+	/* Rx packet array is used for copy semantics... */
+	xs->rx.pa = xskpa_create((struct xsk_user_queue *)xs->rx.q,
+				 xs->rx.buff_info, XSK_ARRAY_SIZE);
+	if (!xs->rx.pa) {
+		err = -ENOMEM;
+		goto out_rx_pa;
+	}
+
+	rcu_assign_pointer(dev->_rx[sxdp->sxdp_queue_id].xs, xs);
+
+	goto out_unlock;
+
+out_rx_pa:
+	xsk_buff_info_destroy(xs->rx.buff_info);
+	xs->rx.buff_info = NULL;
+out_unlock:
+	if (err)
+		dev_put(dev);
+	release_sock(sk);
+	if (dev_curr)
+		dev_put(dev_curr);
+	return err;
+}
+
+static inline struct xdp_sock *lookup_xsk(struct net_device *dev,
+					  unsigned int queue_id)
+{
+	if (unlikely(queue_id > dev->num_rx_queues))
+		return NULL;
+
+	return rcu_dereference(dev->_rx[queue_id].xs);
+}
+
+int xsk_generic_rcv(struct xdp_buff *xdp)
+{
+	u32 len = xdp->data_end - xdp->data;
+	struct xsk_frame_set p;
+	struct xdp_sock *xsk;
+	bool ok;
+
+	rcu_read_lock();
+	xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
+	if (unlikely(!xsk)) {
+		rcu_read_unlock();
+		return -EINVAL;
+	}
+
+	spin_lock(&xsk->rx.pa_lock);
+	ok = xskpa_next_frame_populate(xsk->rx.pa, &p);
+	spin_unlock(&xsk->rx.pa_lock);
+
+	if (!ok) {
+		rcu_read_unlock();
+		return -ENOSPC;
+	}
+
+	memcpy(xskf_get_data(&p), xdp->data, len);
+	xskf_set_frame_no_offset(&p, len, true);
+	spin_lock(&xsk->rx.pa_lock);
+	xskpa_flush(xsk->rx.pa);
+	spin_unlock(&xsk->rx.pa_lock);
+	rcu_read_unlock();
+
+	return 0;
+}
+EXPORT_SYMBOL_GPL(xsk_generic_rcv);
+
+struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+{
+	u32 len = xdp->data_end - xdp->data;
+	struct xsk_frame_set p;
+
+	rcu_read_lock();
+	if (!xsk)
+		xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
+	if (unlikely(!xsk)) {
+		rcu_read_unlock();
+		return ERR_PTR(-EINVAL);
+	}
+
+	if (!xskpa_next_frame_populate(xsk->rx.pa, &p)) {
+		rcu_read_unlock();
+		return ERR_PTR(-ENOSPC);
+	}
+
+	memcpy(xskf_get_data(&p), xdp->data, len);
+	xskf_set_frame_no_offset(&p, len, true);
+	rcu_read_unlock();
+
+	/* We assume that the semantic of xdp_do_redirect is such that
+	 * ndo_xdp_xmit will decrease the refcount of the page when it
+	 * is done with the page. Thus, if we want to guarantee the
+	 * existence of the page in the calling driver, we need to
+	 * bump the refcount. Unclear what the correct semantic is
+	 * supposed to be.
+	 */
+	page_frag_free(xdp->data);
+
+	return xsk;
+}
+EXPORT_SYMBOL_GPL(xsk_rcv);
+
+int xsk_zc_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+{
+	u32 offset = xdp->data - xdp->data_hard_start;
+	u32 len = xdp->data_end - xdp->data;
+	struct xsk_frame_set p;
+
+	/* We do not need any locking here since we are guaranteed
+	 * a single producer and a single consumer.
+	 */
+	if (xskpa_next_frame_populate(xsk->rx.pa, &p)) {
+		xskf_set_frame(&p, len, offset, true);
+		return 0;
+	}
+
+	/* No user-space buffer to put the packet in. */
+	return -ENOSPC;
+}
+EXPORT_SYMBOL_GPL(xsk_zc_rcv);
+
+void xsk_flush(struct xdp_sock *xsk)
+{
+	rcu_read_lock();
+	if (!xsk)
+		xsk = lookup_xsk(xsk->dev, xsk->queue_id);
+	if (unlikely(!xsk)) {
+		rcu_read_unlock();
+		return;
+	}
+
+	WARN_ON_ONCE(xskpa_flush(xsk->rx.pa));
+	rcu_read_unlock();
 }
+EXPORT_SYMBOL_GPL(xsk_flush);
 
 static unsigned int xsk_poll(struct file *file, struct socket *sock,
 			     struct poll_table_struct *wait)