diff mbox series

[RFC,03/24] xsk: added XDP_{R,T}X_RING sockopt and supporting structures

Message ID 20180131135356.19134-4-bjorn.topel@gmail.com
State RFC, archived
Delegated to: David Miller
Headers show
Series Introducing AF_XDP support | expand

Commit Message

Björn Töpel Jan. 31, 2018, 1:53 p.m. UTC
From: Björn Töpel <bjorn.topel@intel.com>

This commit contains setup code for the shared user/kernel rings. The
rings are used for passing ownership of frame data buffers via
descriptors between the kernel and the user space process.

We're also introducing some additional structures:

 * xsk_packet array: A batching/caching wrapper on-top of the
                     descriptor ring.
 * xsk_buff: The xsk_buff is an entry in the user registered frame
             data area. Can be seen as a decorated descriptor entry.
 * xsk_buff_info: Container of xsk_buffs.

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
---
 include/uapi/linux/if_xdp.h |  33 ++++
 net/xdp/Makefile            |   2 +-
 net/xdp/xsk.c               | 127 +++++++++++++-
 net/xdp/xsk_buff.h          | 161 ++++++++++++++++++
 net/xdp/xsk_packet_array.c  |  62 +++++++
 net/xdp/xsk_packet_array.h  | 394 ++++++++++++++++++++++++++++++++++++++++++++
 net/xdp/xsk_ring.c          |  60 +++++++
 net/xdp/xsk_ring.h          | 307 ++++++++++++++++++++++++++++++++++
 net/xdp/xsk_user_queue.h    |  24 +++
 9 files changed, 1168 insertions(+), 2 deletions(-)
 create mode 100644 net/xdp/xsk_buff.h
 create mode 100644 net/xdp/xsk_packet_array.c
 create mode 100644 net/xdp/xsk_packet_array.h
 create mode 100644 net/xdp/xsk_ring.c
 create mode 100644 net/xdp/xsk_ring.h
 create mode 100644 net/xdp/xsk_user_queue.h
diff mbox series

Patch

diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h
index 3f8c90c708b4..3a10df302a1e 100644
--- a/include/uapi/linux/if_xdp.h
+++ b/include/uapi/linux/if_xdp.h
@@ -36,4 +36,37 @@  struct xdp_mr_req {
 	__u32	data_headroom;  /* Frame head room */
 };
 
+struct xdp_ring_req {
+	__u32   mr_fd;      /* FD of packet buffer area registered
+			     * with XDP_MEM_REG
+			     */
+	__u32   desc_nr;    /* Number of descriptors in ring */
+};
+
+/* Pgoff for mmaping the rings */
+#define XDP_PGOFF_RX_RING 0
+#define XDP_PGOFF_TX_RING 0x80000000
+
+/* XDP user space ring structure */
+#define XDP_DESC_KERNEL 0x0080 /* The descriptor is owned by the kernel */
+#define XDP_PKT_CONT    1      /* The packet continues in the next descriptor */
+
+struct xdp_desc {
+	__u32 idx;
+	__u32 len;
+	__u16 offset;
+	__u8  error; /* an errno */
+	__u8  flags;
+	__u8  padding[4];
+};
+
+struct xdp_queue {
+	struct xdp_desc *ring;
+
+	__u32 avail_idx;
+	__u32 last_used_idx;
+	__u32 num_free;
+	__u32 ring_mask;
+};
+
 #endif /* _LINUX_IF_XDP_H */
diff --git a/net/xdp/Makefile b/net/xdp/Makefile
index 0c7631f21586..b9d5d6b8823c 100644
--- a/net/xdp/Makefile
+++ b/net/xdp/Makefile
@@ -1 +1 @@ 
-obj-$(CONFIG_XDP_SOCKETS) += xsk.o
+obj-$(CONFIG_XDP_SOCKETS) += xsk.o xsk_ring.o xsk_packet_array.o
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 333ce1450cc7..34294ac2f75f 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -23,15 +23,30 @@ 
 #include <linux/sched/signal.h>
 #include <linux/sched/task.h>
 #include <linux/socket.h>
+#include <linux/file.h>
+#include <linux/uaccess.h>
+#include <linux/net.h>
+#include <linux/netdevice.h>
 #include <net/sock.h>
 
 #include "xsk.h"
+#include "xsk_buff.h"
+#include "xsk_ring.h"
 
 #define XSK_UMEM_MIN_FRAME_SIZE 2048
 
+struct xsk_info {
+	struct xsk_queue *q;
+	struct xsk_umem *umem;
+	struct socket *mrsock;
+	struct xsk_buff_info *buff_info;
+};
+
 struct xdp_sock {
 	/* struct sock must be the first member of struct xdp_sock */
 	struct sock sk;
+	struct xsk_info rx;
+	struct xsk_info tx;
 	struct xsk_umem *umem;
 };
 
@@ -225,6 +240,81 @@  static struct xsk_umem *xsk_mem_reg(u64 addr, u64 size, u32 frame_size,
 	return ret < 0 ? ERR_PTR(ret) : umem;
 }
 
+static struct socket *xsk_umem_sock_get(int fd)
+{
+	struct socket *sock;
+	int err;
+
+	sock = sockfd_lookup(fd, &err);
+	if (!sock)
+		return ERR_PTR(err);
+
+	/* Parameter checking */
+	if (sock->sk->sk_family != PF_XDP) {
+		err = -ESOCKTNOSUPPORT;
+		goto out;
+	}
+
+	if (!xdp_sk(sock->sk)->umem) {
+		err = -ESOCKTNOSUPPORT;
+		goto out;
+	}
+
+	return sock;
+out:
+	sockfd_put(sock);
+	return ERR_PTR(err);
+}
+
+static int xsk_init_ring(struct sock *sk, int mr_fd, u32 desc_nr,
+			 struct xsk_info *info)
+{
+	struct xsk_umem *umem;
+	struct socket *mrsock;
+
+	if (desc_nr == 0)
+		return -EINVAL;
+
+	mrsock = xsk_umem_sock_get(mr_fd);
+	if (IS_ERR(mrsock))
+		return PTR_ERR(mrsock);
+	umem = xdp_sk(mrsock->sk)->umem;
+
+	/* Check if umem is from this socket, if so do not make
+	 * circular references.
+	 */
+	lock_sock(sk);
+	if (sk->sk_socket == mrsock)
+		sockfd_put(mrsock);
+
+	info->q = xskq_create(desc_nr);
+	if (!info->q)
+		goto out_queue;
+
+	info->umem = umem;
+	info->mrsock = mrsock;
+	release_sock(sk);
+	return 0;
+
+out_queue:
+	release_sock(sk);
+	return -ENOMEM;
+}
+
+static int xsk_init_rx_ring(struct sock *sk, int mr_fd, u32 desc_nr)
+{
+	struct xdp_sock *xs = xdp_sk(sk);
+
+	return xsk_init_ring(sk, mr_fd, desc_nr, &xs->rx);
+}
+
+static int xsk_init_tx_ring(struct sock *sk, int mr_fd, u32 desc_nr)
+{
+	struct xdp_sock *xs = xdp_sk(sk);
+
+	return xsk_init_ring(sk, mr_fd, desc_nr, &xs->tx);
+}
+
 static int xsk_release(struct socket *sock)
 {
 	struct sock *sk = sock->sk;
@@ -241,6 +331,8 @@  static int xsk_release(struct socket *sock)
 	local_bh_enable();
 
 	xsk_umem_destroy(xs->umem);
+	xskq_destroy(xs->rx.q);
+	xskq_destroy(xs->tx.q);
 
 	sock_orphan(sk);
 	sock->sk = NULL;
@@ -298,6 +390,21 @@  static int xsk_setsockopt(struct socket *sock, int level, int optname,
 
 		return 0;
 	}
+	case XDP_RX_RING:
+	case XDP_TX_RING:
+	{
+		struct xdp_ring_req req;
+
+		if (optlen < sizeof(req))
+			return -EINVAL;
+		if (copy_from_user(&req, optval, sizeof(req)))
+			return -EFAULT;
+
+		if (optname == XDP_TX_RING)
+			return xsk_init_tx_ring(sk, req.mr_fd, req.desc_nr);
+
+		return xsk_init_rx_ring(sk, req.mr_fd, req.desc_nr);
+	}
 	default:
 		break;
 	}
@@ -319,7 +426,25 @@  static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
 static int xsk_mmap(struct file *file, struct socket *sock,
 		    struct vm_area_struct *vma)
 {
-	return -EOPNOTSUPP;
+	unsigned long size = vma->vm_end - vma->vm_start;
+	struct sock *sk = sock->sk;
+	struct xdp_sock *xs = xdp_sk(sk);
+	struct xsk_queue *q;
+	unsigned long pfn;
+
+	if (vma->vm_pgoff == XDP_PGOFF_RX_RING)
+		q = xs->rx.q;
+	else if (vma->vm_pgoff == XDP_PGOFF_TX_RING >> PAGE_SHIFT)
+		q = xs->tx.q;
+	else
+		return -EINVAL;
+
+	if (size != xskq_get_ring_size(q))
+		return -EFBIG;
+
+	pfn = virt_to_phys(xskq_get_ring_address(q)) >> PAGE_SHIFT;
+	return remap_pfn_range(vma, vma->vm_start, pfn,
+			       size, vma->vm_page_prot);
 }
 
 static struct proto xsk_proto = {
diff --git a/net/xdp/xsk_buff.h b/net/xdp/xsk_buff.h
new file mode 100644
index 000000000000..18ead1bc4482
--- /dev/null
+++ b/net/xdp/xsk_buff.h
@@ -0,0 +1,161 @@ 
+#ifndef XSK_BUFF_H_
+#define XSK_BUFF_H_
+
+#include <linux/types.h> /* dma_addr_t */
+#include <linux/vmalloc.h>
+#include <linux/dma-mapping.h>
+
+#include "xsk.h"
+
+struct xsk_buff {
+	void *data;
+	dma_addr_t dma;
+	unsigned int len; /* XXX really needed? */
+	unsigned int id;
+	unsigned int offset;
+	struct xsk_buff *next;
+};
+
+/* Rx: data + umem->data_headroom + XDP_PACKET_HEADROOM */
+/* Tx: data + desc->offset */
+
+struct xsk_buff_info {
+	struct xsk_umem *umem;
+	struct device *dev;
+	enum dma_data_direction dir;
+	unsigned long attrs;
+	unsigned int rx_headroom;
+	unsigned int buff_len;
+	unsigned int nbuffs;
+	struct xsk_buff buffs[0];
+
+};
+
+static inline int xsk_buff_dma_map(struct xsk_buff_info *info,
+				   struct device *dev,
+				   enum dma_data_direction dir,
+				   unsigned long attrs)
+{
+	struct xsk_buff *b;
+	unsigned int i, j;
+	dma_addr_t dma;
+
+	if (info->dev)
+		return -1; /* Already mapped */
+
+	for (i = 0; i < info->nbuffs; i++) {
+		b = &info->buffs[i];
+		dma = dma_map_single_attrs(dev, b->data, b->len, dir, attrs);
+		if (dma_mapping_error(dev, dma))
+			goto out_unmap;
+
+		b->dma = dma;
+	}
+
+	info->dev = dev;
+	info->dir = dir;
+	info->attrs = attrs;
+
+	return 0;
+
+out_unmap:
+	for (j = 0; j < i; j++) {
+		b = &info->buffs[i];
+		dma_unmap_single_attrs(info->dev, b->dma, b->len,
+				       info->dir, info->attrs);
+		b->dma = 0;
+	}
+
+	return -1;
+}
+
+static inline void xsk_buff_dma_unmap(struct xsk_buff_info *info)
+{
+	struct xsk_buff *b;
+	unsigned int i;
+
+	if (!info->dev)
+		return; /* Nothing mapped! */
+
+	for (i = 0; i < info->nbuffs; i++) {
+		b = &info->buffs[i];
+		dma_unmap_single_attrs(info->dev, b->dma, b->len,
+				       info->dir, info->attrs);
+		b->dma = 0;
+	}
+
+	info->dev = NULL;
+	info->dir = DMA_NONE;
+	info->attrs = 0;
+}
+
+/* --- */
+
+static inline struct xsk_buff *xsk_buff_info_get_buff(
+	struct xsk_buff_info *info,
+	u32 id)
+{
+	/* XXX remove */
+	if (id >= info->nbuffs) {
+		WARN(1, "%s bad id\n", __func__);
+		return NULL;
+	}
+
+	return &info->buffs[id];
+}
+
+static inline unsigned int xsk_buff_info_get_rx_headroom(
+	struct xsk_buff_info *info)
+{
+	return info->rx_headroom;
+}
+
+static inline unsigned int xsk_buff_info_get_buff_len(
+	struct xsk_buff_info *info)
+{
+	return info->buff_len;
+}
+
+static inline struct xsk_buff_info *xsk_buff_info_create(struct xsk_umem *umem)
+{
+	struct xsk_buff_info *buff_info;
+	unsigned int id = 0;
+	void *data, *end;
+	u32 i;
+
+	buff_info = vzalloc(sizeof(*buff_info) +
+			    sizeof(struct xsk_buff) * umem->nframes);
+	if (!buff_info)
+		return NULL;
+
+	buff_info->umem = umem;
+	buff_info->rx_headroom = umem->data_headroom;
+	buff_info->buff_len = umem->frame_size;
+	buff_info->nbuffs = umem->nframes;
+
+	for (i = 0; i < umem->npgs; i++) {
+		data = page_address(umem->pgs[i]);
+		end = data + PAGE_SIZE;
+		while (data < end) {
+			struct xsk_buff *buff = &buff_info->buffs[id];
+
+			buff->data = data;
+			buff->len = buff_info->buff_len;
+			buff->id = id;
+			buff->offset = buff_info->rx_headroom;
+
+			data += buff_info->buff_len;
+			id++;
+		}
+	}
+
+	return buff_info;
+}
+
+static inline void xsk_buff_info_destroy(struct xsk_buff_info *info)
+{
+	xsk_buff_dma_unmap(info);
+	vfree(info);
+}
+
+#endif /* XSK_BUFF_H_ */
diff --git a/net/xdp/xsk_packet_array.c b/net/xdp/xsk_packet_array.c
new file mode 100644
index 000000000000..f1c3fad1e61b
--- /dev/null
+++ b/net/xdp/xsk_packet_array.c
@@ -0,0 +1,62 @@ 
+/*
+ *  XDP packet arrays
+ *  Copyright(c) 2017 Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU General Public License,
+ * version 2, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ */
+
+#include <linux/slab.h>
+
+#include "xsk_packet_array.h"
+
+/**
+ * xskpa_create - Create new packet array
+ * @q_ops: opaque reference to queue associated with this packet array
+ * @buff_info: buffer info
+ * @elems: number of elements
+ *
+ * Returns a reference to the new packet array or NULL for failure
+ **/
+struct xsk_packet_array *xskpa_create(struct xsk_user_queue *q_ops,
+				      struct xsk_buff_info *buff_info,
+				      size_t elems)
+{
+	struct xsk_packet_array *arr;
+
+	if (!is_power_of_2(elems))
+		return NULL;
+
+	arr = kzalloc(sizeof(*arr) + elems * sizeof(struct xdp_desc),
+		      GFP_KERNEL);
+	if (!arr)
+		return NULL;
+
+	arr->q_ops = q_ops;
+	arr->buff_info = buff_info;
+	arr->mask = elems - 1;
+	return arr;
+}
+
+void xskpa_destroy(struct xsk_packet_array *a)
+{
+	struct xsk_frame_set f;
+
+	if (a) {
+		/* Flush all outstanding requests. */
+		if (xskpa_get_flushable_frame_set(a, &f)) {
+			do {
+				xskf_set_frame(&f, 0, 0, true);
+			} while (xskf_next_frame(&f));
+		}
+
+		WARN_ON_ONCE(xskpa_flush(a));
+		kfree(a);
+	}
+}
diff --git a/net/xdp/xsk_packet_array.h b/net/xdp/xsk_packet_array.h
new file mode 100644
index 000000000000..1f7544dee443
--- /dev/null
+++ b/net/xdp/xsk_packet_array.h
@@ -0,0 +1,394 @@ 
+/*
+ *  XDP packet arrays
+ *  Copyright(c) 2017 Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU General Public License,
+ * version 2, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ */
+
+#ifndef _LINUX_XDP_PACKET_ARRAY_H
+#define _LINUX_XDP_PACKET_ARRAY_H
+
+#include <linux/dma-direction.h>
+#include <linux/if_xdp.h>
+#include <linux/types.h>
+#include <linux/mm.h>
+
+#include "xsk.h"
+#include "xsk_buff.h"
+#include "xsk_user_queue.h"
+
+/**
+ * struct xsk_packet_array - An array of packets/frames
+ *
+ * @q_ops:
+ * @buff_info:
+ * @start: the first packet that has not been processed
+ * @curr: the packet that is currently being processed
+ * @end: the last packet in the array
+ * @mask: convenience variable for internal operations on the array
+ * @items: the actual descriptors to frames/packets that are in the array
+ **/
+struct xsk_packet_array {
+	struct xsk_user_queue *q_ops;
+	struct xsk_buff_info *buff_info;
+	u32 start;
+	u32 curr;
+	u32 end;
+	u32 mask;
+	struct xdp_desc items[0];
+};
+
+/**
+ * struct xsk_frame_set - A view of a packet array consisting of
+ *			  one or more frames
+ *
+ * @pkt_arr: the packet array this frame set is located in
+ * @start: the first frame that has not been processed
+ * @curr: the frame that is currently being processed
+ * @end: the last frame in the frame set
+ *
+ * This frame set can either be one or more frames or a single packet
+ * consisting of one or more frames. xskf_ functions with packet in the
+ * name return a frame set representing a packet, while the other
+ * xskf_ functions return one or more frames not taking into account if
+ * they consitute a packet or not.
+ **/
+struct xsk_frame_set {
+	struct xsk_packet_array *pkt_arr;
+	u32 start;
+	u32 curr;
+	u32 end;
+};
+
+static inline struct xsk_user_queue *xsk_user_queue(struct xsk_packet_array *a)
+{
+	return a->q_ops;
+}
+
+static inline struct xdp_desc *xskf_get_desc(struct xsk_frame_set *p)
+{
+	return &p->pkt_arr->items[p->curr & p->pkt_arr->mask];
+}
+
+/**
+ * xskf_reset - Start to traverse the frames in the set from the beginning
+ * @p: pointer to frame set
+ **/
+static inline void xskf_reset(struct xsk_frame_set *p)
+{
+	p->curr = p->start;
+}
+
+static inline u32 xskf_get_frame_id(struct xsk_frame_set *p)
+{
+	return p->pkt_arr->items[p->curr & p->pkt_arr->mask].idx;
+}
+
+static inline void xskf_set_error(struct xsk_frame_set *p, int errno)
+{
+	p->pkt_arr->items[p->curr & p->pkt_arr->mask].error = errno;
+}
+
+static inline u32 xskf_get_frame_len(struct xsk_frame_set *p)
+{
+	return p->pkt_arr->items[p->curr & p->pkt_arr->mask].len;
+}
+
+/**
+ * xskf_set_frame - Sets the properties of a frame
+ * @p: pointer to frame
+ * @len: the length in bytes of the data in the frame
+ * @offset: offset to start of data in frame
+ * @is_eop: Set if this is the last frame of the packet
+ **/
+static inline void xskf_set_frame(struct xsk_frame_set *p, u32 len, u16 offset,
+				  bool is_eop)
+{
+	struct xdp_desc *d =
+		&p->pkt_arr->items[p->curr & p->pkt_arr->mask];
+
+	d->len = len;
+	d->offset = offset;
+	if (!is_eop)
+		d->flags |= XDP_PKT_CONT;
+}
+
+static inline void xskf_set_frame_no_offset(struct xsk_frame_set *p,
+					    u32 len, bool is_eop)
+{
+	struct xdp_desc *d =
+		&p->pkt_arr->items[p->curr & p->pkt_arr->mask];
+
+	d->len = len;
+	if (!is_eop)
+		d->flags |= XDP_PKT_CONT;
+}
+
+/**
+ * xskf_get_data - Gets a pointer to the start of the packet
+ *
+ * @q: Pointer to the frame
+ *
+ * Returns a pointer to the start of the packet the descriptor is pointing
+ * to
+ **/
+static inline void *xskf_get_data(struct xsk_frame_set *p)
+{
+	struct xdp_desc *desc = xskf_get_desc(p);
+	struct xsk_buff *buff;
+
+	buff = xsk_buff_info_get_buff(p->pkt_arr->buff_info, desc->idx);
+
+	return buff->data + desc->offset;
+}
+
+static inline u32 xskf_get_data_offset(struct xsk_frame_set *p)
+{
+	return p->pkt_arr->items[p->curr & p->pkt_arr->mask].offset;
+}
+
+/**
+ * xskf_next_frame - Go to next frame in frame set
+ * @p: pointer to frame set
+ *
+ * Returns true if there is another frame in the frame set.
+ * Advances curr pointer.
+ **/
+static inline bool xskf_next_frame(struct xsk_frame_set *p)
+{
+	if (p->curr + 1 == p->end)
+		return false;
+
+	p->curr++;
+	return true;
+}
+
+/**
+ * xskf_get_packet_len - Length of packet
+ * @p: pointer to packet
+ *
+ * Returns the length of the packet in bytes.
+ * Resets curr pointer of packet.
+ **/
+static inline u32 xskf_get_packet_len(struct xsk_frame_set *p)
+{
+	u32 len = 0;
+
+	xskf_reset(p);
+
+	do {
+		len += xskf_get_frame_len(p);
+	} while (xskf_next_frame(p));
+
+	return len;
+}
+
+/**
+ * xskf_packet_completed - Mark packet as completed
+ * @p: pointer to packet
+ *
+ * Resets curr pointer of packet.
+ **/
+static inline void xskf_packet_completed(struct xsk_frame_set *p)
+{
+	xskf_reset(p);
+
+	do {
+		p->pkt_arr->items[p->curr & p->pkt_arr->mask].flags |=
+			XSK_FRAME_COMPLETED;
+	} while (xskf_next_frame(p));
+}
+
+/**
+ * xskpa_flush_completed - Flushes only frames marked as completed
+ * @a: pointer to packet array
+ *
+ * Returns 0 for success and -1 for failure
+ **/
+static inline int xskpa_flush_completed(struct xsk_packet_array *a)
+{
+	u32 avail = a->curr - a->start;
+	int ret;
+
+	if (avail == 0)
+		return 0; /* nothing to flush */
+
+	ret = xsk_user_queue(a)->enqueue_completed(a, avail);
+	if (ret < 0)
+		return -1;
+
+	a->start += ret;
+	return 0;
+}
+
+/**
+ * xskpa_next_packet - Get next packet in array and advance curr pointer
+ * @a: pointer to packet array
+ * @p: supplied pointer to packet structure that is filled in by function
+ *
+ * Returns true if there is a packet, false otherwise. Packet returned in *p.
+ **/
+static inline bool xskpa_next_packet(struct xsk_packet_array *a,
+				     struct xsk_frame_set *p)
+{
+	u32 avail = a->end - a->curr;
+
+	if (avail == 0)
+		return false; /* empty */
+
+	p->pkt_arr = a;
+	p->start = a->curr;
+	p->curr = a->curr;
+	p->end = a->curr;
+
+	/* XXX Sanity check for too-many-frames packets? */
+	while (a->items[p->end++ & a->mask].flags & XDP_PKT_CONT) {
+		avail--;
+		if (avail == 0)
+			return false;
+	}
+
+	a->curr += (p->end - p->start);
+	return true;
+}
+
+/**
+ * xskpa_populate - Populate an array with packets from associated queue
+ * @a: pointer to packet array
+ **/
+static inline void xskpa_populate(struct xsk_packet_array *a)
+{
+	u32 cnt, free = a->mask + 1 - (a->end - a->start);
+
+	if (free == 0)
+		return; /* no space! */
+
+	cnt = xsk_user_queue(a)->dequeue(a, free);
+	a->end += cnt;
+}
+
+/**
+ * xskpa_next_frame - Get next frame in array and advance curr pointer
+ * @a: pointer to packet array
+ * @p: supplied pointer to packet structure that is filled in by function
+ *
+ * Returns true if there is a frame, false otherwise. Frame returned in *p.
+ **/
+static inline bool xskpa_next_frame(struct xsk_packet_array *a,
+				    struct xsk_frame_set *p)
+{
+	u32 avail = a->end - a->curr;
+
+	if (avail == 0)
+		return false; /* empty */
+
+	p->pkt_arr = a;
+	p->start = a->curr;
+	p->curr = a->curr;
+	p->end = ++a->curr;
+
+	return true;
+}
+
+/**
+ * xskpa_next_frame_populate - Get next frame and populate array if empty
+ * @a: pointer to packet array
+ * @p: supplied pointer to packet structure that is filled in by function
+ *
+ * Returns true if there is a frame, false otherwise. Frame returned in *p.
+ **/
+static inline bool xskpa_next_frame_populate(struct xsk_packet_array *a,
+					     struct xsk_frame_set *p)
+{
+	bool more_frames;
+
+	more_frames = xskpa_next_frame(a, p);
+	if (!more_frames) {
+		xskpa_populate(a);
+		more_frames = xskpa_next_frame(a, p);
+	}
+
+	return more_frames;
+}
+
+/**
+ * xskpa_get_flushable_frame_set - Create a frame set of the flushable region
+ * @a: pointer to packet array
+ * @p: frame set
+ *
+ * Returns true for success and false for failure
+ **/
+static inline bool xskpa_get_flushable_frame_set(struct xsk_packet_array *a,
+						 struct xsk_frame_set *p)
+{
+	u32 curr = READ_ONCE(a->curr);
+	u32 avail = curr - a->start;
+
+	if (avail == 0)
+		return false; /* empty */
+
+	p->pkt_arr = a;
+	p->start = a->start;
+	p->curr = a->start;
+	p->end = curr;
+
+	return true;
+}
+
+static inline int __xskpa_flush(struct xsk_packet_array *a, u32 npackets)
+{
+	int ret;
+
+	if (npackets == 0)
+		return 0; /* nothing to flush */
+
+	ret = xsk_user_queue(a)->enqueue(a, npackets);
+	if (ret < 0)
+		return ret;
+
+	a->start += npackets;
+	return 0;
+}
+
+/**
+ * xskpa_flush - Flush processed packets to associated queue
+ * @a: pointer to packet array
+ *
+ * Returns 0 for success and -errno for failure
+ **/
+static inline int xskpa_flush(struct xsk_packet_array *a)
+{
+	u32 curr = READ_ONCE(a->curr);
+	u32 avail = curr - a->start;
+
+	return __xskpa_flush(a, avail);
+}
+
+/**
+ * xskpa_flush_n - Flush N processed packets to associated queue
+ * @a: pointer to packet array
+ * @npackets: number of packets to flush
+ *
+ * Returns 0 for success and -errno for failure
+ **/
+static inline int xskpa_flush_n(struct xsk_packet_array *a, u32 npackets)
+{
+	if (npackets > a->curr - a->start)
+		return -ENOSPC;
+
+	return __xskpa_flush(a, npackets);
+}
+
+struct xsk_packet_array *xskpa_create(struct xsk_user_queue *q_ops,
+				      struct xsk_buff_info *buff_info,
+				      size_t elems);
+void xskpa_destroy(struct xsk_packet_array *a);
+
+#endif /* _LINUX_XDP_PACKET_ARRAY_H */
diff --git a/net/xdp/xsk_ring.c b/net/xdp/xsk_ring.c
new file mode 100644
index 000000000000..11b590506ddf
--- /dev/null
+++ b/net/xdp/xsk_ring.c
@@ -0,0 +1,60 @@ 
+/*
+ *  XDP user-space ring structure
+ *  Copyright(c) 2017 Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU General Public License,
+ * version 2, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ */
+
+#include <linux/slab.h>
+
+#include "xsk_ring.h"
+
+/**
+ * xskq_init - Initializas an XDP queue
+ *
+ * @nentries: Number of descriptor entries in the queue
+ *
+ * Returns the created queue in *q_ops and the function returns zero
+ * for success.
+ **/
+struct xsk_queue *xskq_create(u32 nentries)
+{
+	struct xsk_queue *q;
+
+	q = kzalloc(sizeof(*q), GFP_KERNEL);
+	if (!q)
+		return NULL;
+
+	q->ring = kcalloc(nentries, sizeof(*q->ring), GFP_KERNEL);
+	if (!q->ring) {
+		kfree(q);
+		return NULL;
+	}
+
+	q->queue_ops.enqueue = xskq_enqueue_from_array;
+	q->queue_ops.enqueue_completed = xskq_enqueue_completed_from_array;
+	q->queue_ops.dequeue = xskq_dequeue_to_array;
+	q->used_idx = 0;
+	q->last_avail_idx = 0;
+	q->ring_mask = nentries - 1;
+	q->num_free = 0;
+	q->nentries = nentries;
+
+	return q;
+}
+
+void xskq_destroy(struct xsk_queue *q)
+{
+	if (!q)
+		return;
+
+	kfree(q->ring);
+	kfree(q);
+}
diff --git a/net/xdp/xsk_ring.h b/net/xdp/xsk_ring.h
new file mode 100644
index 000000000000..c9d61195ab2d
--- /dev/null
+++ b/net/xdp/xsk_ring.h
@@ -0,0 +1,307 @@ 
+/*
+ *  XDP user-space ring structure
+ *  Copyright(c) 2017 Intel Corporation.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms and conditions of the GNU General Public License,
+ * version 2, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
+ * more details.
+ */
+
+#ifndef _LINUX_XDP_RING_H
+#define _LINUX_XDP_RING_H
+
+#include <linux/types.h>
+#include <linux/if_xdp.h>
+
+#include "xsk.h"
+#include "xsk_buff.h"
+#include "xsk_packet_array.h"
+
+struct xsk_queue {
+	/* struct xsk_user_queue has to be first */
+	struct xsk_user_queue queue_ops;
+	struct xdp_desc *ring;
+
+	u32 used_idx;
+	u32 last_avail_idx;
+	u32 ring_mask;
+	u32 num_free;
+
+	u32 nentries;
+	struct xsk_buff_info *buff_info;
+	enum xsk_validation validation;
+};
+
+static inline unsigned int xsk_get_data_headroom(struct xsk_umem *umem)
+{
+	return umem->data_headroom + XDP_KERNEL_HEADROOM;
+}
+
+/**
+ * xskq_is_valid_entry - Is the entry valid?
+ *
+ * @q: Pointer to the tp4 queue the descriptor resides in
+ * @desc: Pointer to the descriptor to examine
+ * @validation: The type of validation to perform
+ *
+ * Returns true if the entry is a valid, otherwise false
+ **/
+static inline bool xskq_is_valid_entry(struct xsk_queue *q,
+				       struct xdp_desc *d)
+{
+	unsigned int buff_len;
+
+	if (q->validation == XSK_VALIDATION_NONE)
+		return true;
+
+	if (unlikely(d->idx >= q->buff_info->nbuffs)) {
+		d->error = EBADF;
+		return false;
+	}
+
+	if (q->validation == XSK_VALIDATION_RX) {
+		d->offset = xsk_buff_info_get_rx_headroom(q->buff_info);
+		return true;
+	}
+
+	buff_len = xsk_buff_info_get_buff_len(q->buff_info);
+	/* XSK_VALIDATION_TX */
+	if (unlikely(d->len > buff_len || d->len == 0 || d->offset > buff_len ||
+		     d->offset + d->len > buff_len)) {
+		d->error = EBADF;
+		return false;
+	}
+
+	return true;
+}
+
+/**
+ * xskq_nb_avail - Returns the number of available entries
+ *
+ * @q: Pointer to the queue to examine
+ * @dcnt: Max number of entries to check
+ *
+ * Returns the the number of entries available in the queue up to dcnt
+ **/
+static inline int xskq_nb_avail(struct xsk_queue *q, int dcnt)
+{
+	unsigned int idx, last_avail_idx = q->last_avail_idx;
+	int i, entries = 0;
+
+	for (i = 0; i < dcnt; i++) {
+		idx = (last_avail_idx++) & q->ring_mask;
+		if (!(q->ring[idx].flags & XDP_DESC_KERNEL))
+			break;
+		entries++;
+	}
+
+	return entries;
+}
+
+/**
+ * xskq_enqueue - Enqueue entries to a the queue
+ *
+ * @q: Pointer to the queue the descriptor resides in
+ * @d: Pointer to the descriptor to examine
+ * @dcnt: Max number of entries to dequeue
+ *
+ * Returns 0 for success or an errno at failure
+ **/
+static inline int xskq_enqueue(struct xsk_queue *q,
+			       const struct xdp_desc *d, int dcnt)
+{
+	unsigned int used_idx = q->used_idx;
+	int i;
+
+	if (q->num_free < dcnt)
+		return -ENOSPC;
+
+	q->num_free -= dcnt;
+
+	for (i = 0; i < dcnt; i++) {
+		unsigned int idx = (used_idx++) & q->ring_mask;
+
+		q->ring[idx].idx = d[i].idx;
+		q->ring[idx].len = d[i].len;
+		q->ring[idx].offset = d[i].offset;
+		q->ring[idx].error = d[i].error;
+	}
+
+	/* Order flags and data */
+	smp_wmb();
+
+	for (i = dcnt - 1; i >= 0; i--) {
+		unsigned int idx = (q->used_idx + i) & q->ring_mask;
+
+		q->ring[idx].flags = d[i].flags & ~XDP_DESC_KERNEL;
+	}
+	q->used_idx += dcnt;
+
+	return 0;
+}
+
+/**
+ * xskq_enqueue_from_array - Enqueue entries from packet array to the queue
+ *
+ * @a: Pointer to the packet array to enqueue from
+ * @dcnt: Max number of entries to enqueue
+ *
+ * Returns 0 for success or an errno at failure
+ **/
+static inline int xskq_enqueue_from_array(struct xsk_packet_array *a,
+					  u32 dcnt)
+{
+	struct xsk_queue *q = (struct xsk_queue *)a->q_ops;
+	unsigned int used_idx = q->used_idx;
+	struct xdp_desc *d = a->items;
+	int i;
+
+	if (q->num_free < dcnt)
+		return -ENOSPC;
+
+	q->num_free -= dcnt;
+
+	for (i = 0; i < dcnt; i++) {
+		unsigned int idx = (used_idx++) & q->ring_mask;
+		unsigned int didx = (a->start + i) & a->mask;
+
+		q->ring[idx].idx = d[didx].idx;
+		q->ring[idx].len = d[didx].len;
+		q->ring[idx].offset = d[didx].offset;
+		q->ring[idx].error = d[didx].error;
+	}
+
+	/* Order flags and data */
+	smp_wmb();
+
+	for (i = dcnt - 1; i >= 0; i--) {
+		unsigned int idx = (q->used_idx + i) & q->ring_mask;
+		unsigned int didx = (a->start + i) & a->mask;
+
+		q->ring[idx].flags = d[didx].flags & ~XDP_DESC_KERNEL;
+	}
+	q->used_idx += dcnt;
+
+	return 0;
+}
+
+/**
+ * xskq_enqueue_completed_from_array - Enqueue only completed entries
+ *				       from packet array
+ *
+ * @a: Pointer to the packet array to enqueue from
+ * @dcnt: Max number of entries to enqueue
+ *
+ * Returns the number of entries successfully enqueued or a negative errno
+ * at failure.
+ **/
+static inline int xskq_enqueue_completed_from_array(struct xsk_packet_array *a,
+						    u32 dcnt)
+{
+	struct xsk_queue *q = (struct xsk_queue *)a->q_ops;
+	unsigned int used_idx = q->used_idx;
+	struct xdp_desc *d = a->items;
+	int i, j;
+
+	if (q->num_free < dcnt)
+		return -ENOSPC;
+
+	for (i = 0; i < dcnt; i++) {
+		unsigned int didx = (a->start + i) & a->mask;
+
+		if (d[didx].flags & XSK_FRAME_COMPLETED) {
+			unsigned int idx = (used_idx++) & q->ring_mask;
+
+			q->ring[idx].idx = d[didx].idx;
+			q->ring[idx].len = d[didx].len;
+			q->ring[idx].offset = d[didx].offset;
+			q->ring[idx].error = d[didx].error;
+		} else {
+			break;
+		}
+	}
+
+	if (i == 0)
+		return 0;
+
+	/* Order flags and data */
+	smp_wmb();
+
+	for (j = i - 1; j >= 0; j--) {
+		unsigned int idx = (q->used_idx + j) & q->ring_mask;
+		unsigned int didx = (a->start + j) & a->mask;
+
+		q->ring[idx].flags = d[didx].flags & ~XDP_DESC_KERNEL;
+	}
+	q->num_free -= i;
+	q->used_idx += i;
+
+	return i;
+}
+
+/**
+ * xskq_dequeue_to_array - Dequeue entries from the queue to a packet array
+ *
+ * @a: Pointer to the packet array to dequeue from
+ * @dcnt: Max number of entries to dequeue
+ *
+ * Returns the number of entries dequeued. Non valid entries will be
+ * discarded.
+ **/
+static inline int xskq_dequeue_to_array(struct xsk_packet_array *a, u32 dcnt)
+{
+	struct xdp_desc *d = a->items;
+	int i, entries, valid_entries = 0;
+	struct xsk_queue *q = (struct xsk_queue *)a->q_ops;
+	u32 start = a->end;
+
+	entries = xskq_nb_avail(q, dcnt);
+	q->num_free += entries;
+
+	/* Order flags and data */
+	smp_rmb();
+
+	for (i = 0; i < entries; i++) {
+		unsigned int d_idx = start & a->mask;
+		unsigned int idx;
+
+		idx = (q->last_avail_idx++) & q->ring_mask;
+		d[d_idx] = q->ring[idx];
+		if (!xskq_is_valid_entry(q, &d[d_idx])) {
+			WARN_ON_ONCE(xskq_enqueue(q, &d[d_idx], 1));
+			continue;
+		}
+
+		start++;
+		valid_entries++;
+	}
+	return valid_entries;
+}
+
+static inline u32 xskq_get_ring_size(struct xsk_queue *q)
+{
+	return q->nentries * sizeof(*q->ring);
+}
+
+static inline char *xskq_get_ring_address(struct xsk_queue *q)
+{
+	return (char *)q->ring;
+}
+
+static inline void xskq_set_buff_info(struct xsk_queue *q,
+				      struct xsk_buff_info *buff_info,
+				      enum xsk_validation validation)
+{
+	q->buff_info = buff_info;
+	q->validation = validation;
+}
+
+struct xsk_queue *xskq_create(u32 nentries);
+void xskq_destroy(struct xsk_queue *q_ops);
+
+#endif /* _LINUX_XDP_RING_H */
diff --git a/net/xdp/xsk_user_queue.h b/net/xdp/xsk_user_queue.h
new file mode 100644
index 000000000000..c072f854d693
--- /dev/null
+++ b/net/xdp/xsk_user_queue.h
@@ -0,0 +1,24 @@ 
+#ifndef XSK_USER_QUEUE_H_
+#define XSK_USER_QUEUE_H_
+
+#define XDP_KERNEL_HEADROOM 256 /* Headrom for XDP */
+
+#define XSK_FRAME_COMPLETED XDP_DESC_KERNEL
+
+enum xsk_validation {
+	XSK_VALIDATION_NONE,	  /* No validation is performed */
+	XSK_VALIDATION_RX,	  /* Only address to packet buffer validated */
+	XSK_VALIDATION_TX	  /* Full descriptor is validated */
+};
+
+struct xsk_packet_array;
+
+struct xsk_user_queue {
+	int (*enqueue)(struct xsk_packet_array *pa, u32 cnt);
+	int (*enqueue_completed)(struct xsk_packet_array *pa, u32 cnt);
+	int (*dequeue)(struct xsk_packet_array *pa, u32 cnt);
+	u32 (*get_ring_size)(struct xsk_user_queue *q);
+	char *(*get_ring_address)(struct xsk_user_queue *q);
+};
+
+#endif /* XSK_USER_QUEUE_H_ */