diff mbox series

[SRU,B:linux-azure-4.15,31/40] xsk: add Rx receive functions and poll support

Message ID 20201102124856.4659-32-william.gray@canonical.com
State New
Headers show
Series hv_netvsc: Add XDP support | expand

Commit Message

William Breathitt Gray Nov. 2, 2020, 12:48 p.m. UTC
From: Björn Töpel <bjorn.topel@intel.com>

BugLink: https://bugs.launchpad.net/bugs/1877654

Here the actual receive functions of AF_XDP are implemented, that in a
later commit, will be called from the XDP layers.

There's one set of functions for the XDP_DRV side and another for
XDP_SKB (generic).

A new XDP API, xdp_return_buff, is also introduced.

Adding xdp_return_buff, which is analogous to xdp_return_frame, but
acts upon an struct xdp_buff. The API will be used by AF_XDP in future
commits.

Support for the poll syscall is also implemented.

v2: xskq_validate_id did not update cons_tail.
    The entries variable was calculated twice in xskq_nb_avail.
    Squashed xdp_return_buff commit.

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
(cherry picked from commit c497176cb2e478f0a5713b0e05f242276e3194b5)
Signed-off-by: William Breathitt Gray <william.gray@canonical.com>
---
 include/net/xdp.h      |   1 +
 include/net/xdp_sock.h |  22 ++++++++
 net/core/xdp.c         |  15 ++++--
 net/xdp/xdp_umem.h     |  18 +++++++
 net/xdp/xsk.c          |  73 +++++++++++++++++++++++++-
 net/xdp/xsk_queue.h    | 114 ++++++++++++++++++++++++++++++++++++++++-
 6 files changed, 238 insertions(+), 5 deletions(-)
diff mbox series

Patch

diff --git a/include/net/xdp.h b/include/net/xdp.h
index 137ad5f9f40f..0b689cf561c7 100644
--- a/include/net/xdp.h
+++ b/include/net/xdp.h
@@ -104,6 +104,7 @@  struct xdp_frame *convert_to_xdp_frame(struct xdp_buff *xdp)
 }
 
 void xdp_return_frame(struct xdp_frame *xdpf);
+void xdp_return_buff(struct xdp_buff *xdp);
 
 int xdp_rxq_info_reg(struct xdp_rxq_info *xdp_rxq,
 		     struct net_device *dev, u32 queue_index);
diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index 85d02512f59b..a0342dff6a4d 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -31,6 +31,28 @@  struct xdp_sock {
 	u16 queue_id;
 	/* Protects multiple processes in the control path */
 	struct mutex mutex;
+	u64 rx_dropped;
 };
 
+struct xdp_buff;
+#ifdef CONFIG_XDP_SOCKETS
+int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
+int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp);
+void xsk_flush(struct xdp_sock *xs);
+#else
+static inline int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+	return -ENOTSUPP;
+}
+
+static inline int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+	return -ENOTSUPP;
+}
+
+static inline void xsk_flush(struct xdp_sock *xs)
+{
+}
+#endif /* CONFIG_XDP_SOCKETS */
+
 #endif /* _LINUX_XDP_SOCK_H */
diff --git a/net/core/xdp.c b/net/core/xdp.c
index 0c86b53a3a63..bf6758f74339 100644
--- a/net/core/xdp.c
+++ b/net/core/xdp.c
@@ -308,11 +308,9 @@  int xdp_rxq_info_reg_mem_model(struct xdp_rxq_info *xdp_rxq,
 }
 EXPORT_SYMBOL_GPL(xdp_rxq_info_reg_mem_model);
 
-void xdp_return_frame(struct xdp_frame *xdpf)
+static void xdp_return(void *data, struct xdp_mem_info *mem)
 {
-	struct xdp_mem_info *mem = &xdpf->mem;
 	struct xdp_mem_allocator *xa;
-	void *data = xdpf->data;
 	struct page *page;
 
 	switch (mem->type) {
@@ -339,4 +337,15 @@  void xdp_return_frame(struct xdp_frame *xdpf)
 		break;
 	}
 }
+
+void xdp_return_frame(struct xdp_frame *xdpf)
+{
+	xdp_return(xdpf->data, &xdpf->mem);
+}
 EXPORT_SYMBOL_GPL(xdp_return_frame);
+
+void xdp_return_buff(struct xdp_buff *xdp)
+{
+	xdp_return(xdp->data, &xdp->rxq->mem);
+}
+EXPORT_SYMBOL_GPL(xdp_return_buff);
diff --git a/net/xdp/xdp_umem.h b/net/xdp/xdp_umem.h
index b13133e9c501..c7378a11721f 100644
--- a/net/xdp/xdp_umem.h
+++ b/net/xdp/xdp_umem.h
@@ -39,6 +39,24 @@  struct xdp_umem {
 	struct work_struct work;
 };
 
+static inline char *xdp_umem_get_data(struct xdp_umem *umem, u32 idx)
+{
+	u64 pg, off;
+	char *data;
+
+	pg = idx >> umem->nfpplog2;
+	off = (idx & umem->nfpp_mask) << umem->frame_size_log2;
+
+	data = page_address(umem->pgs[pg]);
+	return data + off;
+}
+
+static inline char *xdp_umem_get_data_with_headroom(struct xdp_umem *umem,
+						    u32 idx)
+{
+	return xdp_umem_get_data(umem, idx) + umem->frame_headroom;
+}
+
 bool xdp_umem_validate_queues(struct xdp_umem *umem);
 int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr);
 void xdp_get_umem(struct xdp_umem *umem);
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index bf2c97b87992..4e1e6c581e1d 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -41,6 +41,74 @@  static struct xdp_sock *xdp_sk(struct sock *sk)
 	return (struct xdp_sock *)sk;
 }
 
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+	u32 *id, len = xdp->data_end - xdp->data;
+	void *buffer;
+	int err = 0;
+
+	if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
+		return -EINVAL;
+
+	id = xskq_peek_id(xs->umem->fq);
+	if (!id)
+		return -ENOSPC;
+
+	buffer = xdp_umem_get_data_with_headroom(xs->umem, *id);
+	memcpy(buffer, xdp->data, len);
+	err = xskq_produce_batch_desc(xs->rx, *id, len,
+				      xs->umem->frame_headroom);
+	if (!err)
+		xskq_discard_id(xs->umem->fq);
+
+	return err;
+}
+
+int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+	int err;
+
+	err = __xsk_rcv(xs, xdp);
+	if (likely(!err))
+		xdp_return_buff(xdp);
+	else
+		xs->rx_dropped++;
+
+	return err;
+}
+
+void xsk_flush(struct xdp_sock *xs)
+{
+	xskq_produce_flush_desc(xs->rx);
+	xs->sk.sk_data_ready(&xs->sk);
+}
+
+int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+{
+	int err;
+
+	err = __xsk_rcv(xs, xdp);
+	if (!err)
+		xsk_flush(xs);
+	else
+		xs->rx_dropped++;
+
+	return err;
+}
+
+static unsigned int xsk_poll(struct file *file, struct socket *sock,
+			     struct poll_table_struct *wait)
+{
+	unsigned int mask = datagram_poll(file, sock, wait);
+	struct sock *sk = sock->sk;
+	struct xdp_sock *xs = xdp_sk(sk);
+
+	if (xs->rx && !xskq_empty_desc(xs->rx))
+		mask |= POLLIN | POLLRDNORM;
+
+	return mask;
+}
+
 static int xsk_init_queue(u32 entries, struct xsk_queue **queue,
 			  bool umem_queue)
 {
@@ -179,6 +247,9 @@  static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 	} else if (!xs->umem || !xdp_umem_validate_queues(xs->umem)) {
 		err = -EINVAL;
 		goto out_unlock;
+	} else {
+		/* This xsk has its own umem. */
+		xskq_set_umem(xs->umem->fq, &xs->umem->props);
 	}
 
 	/* Rebind? */
@@ -330,7 +401,7 @@  static const struct proto_ops xsk_proto_ops = {
 	.socketpair =	sock_no_socketpair,
 	.accept =	sock_no_accept,
 	.getname =	sock_no_getname,
-	.poll =		sock_no_poll,
+	.poll =		xsk_poll,
 	.ioctl =	sock_no_ioctl,
 	.listen =	sock_no_listen,
 	.shutdown =	sock_no_shutdown,
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 9ddd2ee07a84..0a9b92b4f93a 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -20,6 +20,8 @@ 
 
 #include "xdp_umem_props.h"
 
+#define RX_BATCH_SIZE 16
+
 struct xsk_queue {
 	struct xdp_umem_props umem_props;
 	u32 ring_mask;
@@ -32,8 +34,118 @@  struct xsk_queue {
 	u64 invalid_descs;
 };
 
+/* Common functions operating for both RXTX and umem queues */
+
+static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
+{
+	u32 entries = q->prod_tail - q->cons_tail;
+
+	if (entries == 0) {
+		/* Refresh the local pointer */
+		q->prod_tail = READ_ONCE(q->ring->producer);
+		entries = q->prod_tail - q->cons_tail;
+	}
+
+	return (entries > dcnt) ? dcnt : entries;
+}
+
+static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
+{
+	u32 free_entries = q->nentries - (producer - q->cons_tail);
+
+	if (free_entries >= dcnt)
+		return free_entries;
+
+	/* Refresh the local tail pointer */
+	q->cons_tail = READ_ONCE(q->ring->consumer);
+	return q->nentries - (producer - q->cons_tail);
+}
+
+/* UMEM queue */
+
+static inline bool xskq_is_valid_id(struct xsk_queue *q, u32 idx)
+{
+	if (unlikely(idx >= q->umem_props.nframes)) {
+		q->invalid_descs++;
+		return false;
+	}
+	return true;
+}
+
+static inline u32 *xskq_validate_id(struct xsk_queue *q)
+{
+	while (q->cons_tail != q->cons_head) {
+		struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+		unsigned int idx = q->cons_tail & q->ring_mask;
+
+		if (xskq_is_valid_id(q, ring->desc[idx]))
+			return &ring->desc[idx];
+
+		q->cons_tail++;
+	}
+
+	return NULL;
+}
+
+static inline u32 *xskq_peek_id(struct xsk_queue *q)
+{
+	struct xdp_umem_ring *ring;
+
+	if (q->cons_tail == q->cons_head) {
+		WRITE_ONCE(q->ring->consumer, q->cons_tail);
+		q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
+
+		/* Order consumer and data */
+		smp_rmb();
+
+		return xskq_validate_id(q);
+	}
+
+	ring = (struct xdp_umem_ring *)q->ring;
+	return &ring->desc[q->cons_tail & q->ring_mask];
+}
+
+static inline void xskq_discard_id(struct xsk_queue *q)
+{
+	q->cons_tail++;
+	(void)xskq_validate_id(q);
+}
+
+/* Rx queue */
+
+static inline int xskq_produce_batch_desc(struct xsk_queue *q,
+					  u32 id, u32 len, u16 offset)
+{
+	struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+	unsigned int idx;
+
+	if (xskq_nb_free(q, q->prod_head, 1) == 0)
+		return -ENOSPC;
+
+	idx = (q->prod_head++) & q->ring_mask;
+	ring->desc[idx].idx = id;
+	ring->desc[idx].len = len;
+	ring->desc[idx].offset = offset;
+
+	return 0;
+}
+
+static inline void xskq_produce_flush_desc(struct xsk_queue *q)
+{
+	/* Order producer and data */
+	smp_wmb();
+
+	q->prod_tail = q->prod_head,
+	WRITE_ONCE(q->ring->producer, q->prod_tail);
+}
+
+static inline bool xskq_empty_desc(struct xsk_queue *q)
+{
+	return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries);
+}
+
 void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props);
 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
-void xskq_destroy(struct xsk_queue *q);
+void xskq_destroy(struct xsk_queue *q_ops);
 
 #endif /* _LINUX_XSK_QUEUE_H */