diff mbox series

[v5,net-nex,2/5] net: page_pool: add bulk support for ptr_ring

Message ID 1229970bf6f36fd4689169a2e47fdcc664d28366.1605020963.git.lorenzo@kernel.org
State Superseded
Headers show
Series xdp: introduce bulking for page_pool tx return path | expand

Commit Message

Lorenzo Bianconi Nov. 10, 2020, 3:37 p.m. UTC
Introduce the capability to batch page_pool ptr_ring refill since it is
usually run inside the driver NAPI tx completion loop.

Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com>
Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
---
 include/net/page_pool.h | 26 ++++++++++++++++
 net/core/page_pool.c    | 69 +++++++++++++++++++++++++++++++++++------
 net/core/xdp.c          |  9 ++----
 3 files changed, 87 insertions(+), 17 deletions(-)

Comments

John Fastabend Nov. 11, 2020, 9:29 a.m. UTC | #1
Lorenzo Bianconi wrote:
> Introduce the capability to batch page_pool ptr_ring refill since it is
> usually run inside the driver NAPI tx completion loop.
> 
> Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
> Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com>
> Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
> Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> ---
>  include/net/page_pool.h | 26 ++++++++++++++++
>  net/core/page_pool.c    | 69 +++++++++++++++++++++++++++++++++++------
>  net/core/xdp.c          |  9 ++----
>  3 files changed, 87 insertions(+), 17 deletions(-)

[...]

> +/* Caller must not use data area after call, as this function overwrites it */
> +void page_pool_put_page_bulk(struct page_pool *pool, void **data,
> +			     int count)
> +{
> +	int i, bulk_len = 0, pa_len = 0;
> +
> +	for (i = 0; i < count; i++) {
> +		struct page *page = virt_to_head_page(data[i]);
> +
> +		page = __page_pool_put_page(pool, page, -1, false);
> +		/* Approved for bulk recycling in ptr_ring cache */
> +		if (page)
> +			data[bulk_len++] = page;
> +	}
> +
> +	if (unlikely(!bulk_len))
> +		return;
> +
> +	/* Bulk producer into ptr_ring page_pool cache */
> +	page_pool_ring_lock(pool);
> +	for (i = 0; i < bulk_len; i++) {
> +		if (__ptr_ring_produce(&pool->ring, data[i]))
> +			data[pa_len++] = data[i];

How about bailing out on the first error? bulk_len should be less than
16 right, so should we really keep retying hoping ring->size changes?

> +	}
> +	page_pool_ring_unlock(pool);
> +
> +	if (likely(!pa_len))
> +		return;
> +
> +	/* ptr_ring cache full, free pages outside producer lock since
> +	 * put_page() with refcnt == 1 can be an expensive operation
> +	 */
> +	for (i = 0; i < pa_len; i++)
> +		page_pool_return_page(pool, data[i]);
> +}
> +EXPORT_SYMBOL(page_pool_put_page_bulk);
> +

Otherwise LGTM.
Lorenzo Bianconi Nov. 11, 2020, 10:43 a.m. UTC | #2
> Lorenzo Bianconi wrote:
> > Introduce the capability to batch page_pool ptr_ring refill since it is
> > usually run inside the driver NAPI tx completion loop.
> > 
> > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> > ---
> >  include/net/page_pool.h | 26 ++++++++++++++++
> >  net/core/page_pool.c    | 69 +++++++++++++++++++++++++++++++++++------
> >  net/core/xdp.c          |  9 ++----
> >  3 files changed, 87 insertions(+), 17 deletions(-)
> 
> [...]
> 
> > +/* Caller must not use data area after call, as this function overwrites it */
> > +void page_pool_put_page_bulk(struct page_pool *pool, void **data,
> > +			     int count)
> > +{
> > +	int i, bulk_len = 0, pa_len = 0;
> > +
> > +	for (i = 0; i < count; i++) {
> > +		struct page *page = virt_to_head_page(data[i]);
> > +
> > +		page = __page_pool_put_page(pool, page, -1, false);
> > +		/* Approved for bulk recycling in ptr_ring cache */
> > +		if (page)
> > +			data[bulk_len++] = page;
> > +	}
> > +
> > +	if (unlikely(!bulk_len))
> > +		return;
> > +
> > +	/* Bulk producer into ptr_ring page_pool cache */
> > +	page_pool_ring_lock(pool);
> > +	for (i = 0; i < bulk_len; i++) {
> > +		if (__ptr_ring_produce(&pool->ring, data[i]))
> > +			data[pa_len++] = data[i];
> 
> How about bailing out on the first error? bulk_len should be less than
> 16 right, so should we really keep retying hoping ring->size changes?

do you mean doing something like:

	page_pool_ring_lock(pool);
	if (__ptr_ring_full(&pool->ring)) {
		pa_len = bulk_len;
		page_pool_ring_unlock(pool);
		goto out;
	}
	...
out:
	for (i = 0; i < pa_len; i++) {
		...
	}

I do not know if it is better or not since the consumer can run in parallel.
@Jesper/Ilias: any idea?

Regards,
Lorenzo

> 
> > +	}
> > +	page_pool_ring_unlock(pool);
> > +
> > +	if (likely(!pa_len))
> > +		return;
> > +
> > +	/* ptr_ring cache full, free pages outside producer lock since
> > +	 * put_page() with refcnt == 1 can be an expensive operation
> > +	 */
> > +	for (i = 0; i < pa_len; i++)
> > +		page_pool_return_page(pool, data[i]);
> > +}
> > +EXPORT_SYMBOL(page_pool_put_page_bulk);
> > +
> 
> Otherwise LGTM.
Jesper Dangaard Brouer Nov. 11, 2020, 12:59 p.m. UTC | #3
On Wed, 11 Nov 2020 11:43:31 +0100
Lorenzo Bianconi <lorenzo@kernel.org> wrote:

> > Lorenzo Bianconi wrote:  
> > > Introduce the capability to batch page_pool ptr_ring refill since it is
> > > usually run inside the driver NAPI tx completion loop.
> > > 
> > > Suggested-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > > Co-developed-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > > Signed-off-by: Jesper Dangaard Brouer <brouer@redhat.com>
> > > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
> > > ---
> > >  include/net/page_pool.h | 26 ++++++++++++++++
> > >  net/core/page_pool.c    | 69 +++++++++++++++++++++++++++++++++++------
> > >  net/core/xdp.c          |  9 ++----
> > >  3 files changed, 87 insertions(+), 17 deletions(-)  
> > 
> > [...]
> >   
> > > +/* Caller must not use data area after call, as this function overwrites it */
> > > +void page_pool_put_page_bulk(struct page_pool *pool, void **data,
> > > +			     int count)
> > > +{
> > > +	int i, bulk_len = 0, pa_len = 0;
> > > +
> > > +	for (i = 0; i < count; i++) {
> > > +		struct page *page = virt_to_head_page(data[i]);
> > > +
> > > +		page = __page_pool_put_page(pool, page, -1, false);
> > > +		/* Approved for bulk recycling in ptr_ring cache */
> > > +		if (page)
> > > +			data[bulk_len++] = page;
> > > +	}
> > > +
> > > +	if (unlikely(!bulk_len))
> > > +		return;
> > > +
> > > +	/* Bulk producer into ptr_ring page_pool cache */
> > > +	page_pool_ring_lock(pool);
> > > +	for (i = 0; i < bulk_len; i++) {
> > > +		if (__ptr_ring_produce(&pool->ring, data[i]))
> > > +			data[pa_len++] = data[i];  
> > 
> > How about bailing out on the first error? bulk_len should be less than
> > 16 right, so should we really keep retying hoping ring->size changes?  
> 
> do you mean doing something like:
> 
> 	page_pool_ring_lock(pool);
> 	if (__ptr_ring_full(&pool->ring)) {
> 		pa_len = bulk_len;
> 		page_pool_ring_unlock(pool);
> 		goto out;
> 	}
> 	...
> out:
> 	for (i = 0; i < pa_len; i++) {
> 		...
> 	}

I think this is the change John is looking for:

diff --git a/net/core/page_pool.c b/net/core/page_pool.c
index a06606f07df0..3093fe4e1cd7 100644
--- a/net/core/page_pool.c
+++ b/net/core/page_pool.c
@@ -424,7 +424,7 @@ EXPORT_SYMBOL(page_pool_put_page);
 void page_pool_put_page_bulk(struct page_pool *pool, void **data,
                             int count)
 {
-       int i, bulk_len = 0, pa_len = 0;
+       int i, bulk_len = 0;
        bool order0 = (pool->p.order == 0);
 
        for (i = 0; i < count; i++) {
@@ -448,17 +448,18 @@ void page_pool_put_page_bulk(struct page_pool *pool, void **data,
        page_pool_ring_lock(pool);
        for (i = 0; i < bulk_len; i++) {
                if (__ptr_ring_produce(&pool->ring, data[i]))
-                       data[pa_len++] = data[i];
+                       break; /* ring_full */
        }
        page_pool_ring_unlock(pool);
 
-       if (likely(!pa_len))
+       /* Hopefully all pages was return into ptr_ring */
+       if (likely(i == bulk_len))
                return;
 
-       /* ptr_ring cache full, free pages outside producer lock since
-        * put_page() with refcnt == 1 can be an expensive operation
+       /* ptr_ring cache full, free remaining pages outside producer lock
+        * since put_page() with refcnt == 1 can be an expensive operation
         */
-       for (i = 0; i < pa_len; i++)
+       for (; i < bulk_len; i++)
                page_pool_return_page(pool, data[i]);
 }
 EXPORT_SYMBOL(page_pool_put_page_bulk);


> I do not know if it is better or not since the consumer can run in
> parallel. @Jesper/Ilias: any idea?

Currently it is not very likely that the consumer runs in parallel, but
is it possible. (As you experienced on your testlab with mlx5, the
DMA-TX completion did run on another CPU, but I asked you to
reconfigure this to have it run on same CPU, as it is suboptimal).
When we (finally) support this memory type for SKBs it will be more
normal to happen.

But, John is right, for ptr_ring we should exit as soon the first
"produce" fails.  This is because I know how ptr_ring works internally.
The "consumer" will free slots in chunks of 16 slots, so it is not very
likely that a slot opens up.

> >   
> > > +	}
> > > +	page_pool_ring_unlock(pool);
> > > +
> > > +	if (likely(!pa_len))
> > > +		return;
> > > +
> > > +	/* ptr_ring cache full, free pages outside producer lock since
> > > +	 * put_page() with refcnt == 1 can be an expensive operation
> > > +	 */
> > > +	for (i = 0; i < pa_len; i++)
> > > +		page_pool_return_page(pool, data[i]);
> > > +}
> > > +EXPORT_SYMBOL(page_pool_put_page_bulk);
> > > +  
> > 
> > Otherwise LGTM.
diff mbox series

Patch

diff --git a/include/net/page_pool.h b/include/net/page_pool.h
index 81d7773f96cd..b5b195305346 100644
--- a/include/net/page_pool.h
+++ b/include/net/page_pool.h
@@ -152,6 +152,8 @@  struct page_pool *page_pool_create(const struct page_pool_params *params);
 void page_pool_destroy(struct page_pool *pool);
 void page_pool_use_xdp_mem(struct page_pool *pool, void (*disconnect)(void *));
 void page_pool_release_page(struct page_pool *pool, struct page *page);
+void page_pool_put_page_bulk(struct page_pool *pool, void **data,
+			     int count);
 #else
 static inline void page_pool_destroy(struct page_pool *pool)
 {
@@ -165,6 +167,11 @@  static inline void page_pool_release_page(struct page_pool *pool,
 					  struct page *page)
 {
 }
+
+static inline void page_pool_put_page_bulk(struct page_pool *pool, void **data,
+					   int count)
+{
+}
 #endif
 
 void page_pool_put_page(struct page_pool *pool, struct page *page,
@@ -215,4 +222,23 @@  static inline void page_pool_nid_changed(struct page_pool *pool, int new_nid)
 	if (unlikely(pool->p.nid != new_nid))
 		page_pool_update_nid(pool, new_nid);
 }
+
+static inline void page_pool_ring_lock(struct page_pool *pool)
+	__acquires(&pool->ring.producer_lock)
+{
+	if (in_serving_softirq())
+		spin_lock(&pool->ring.producer_lock);
+	else
+		spin_lock_bh(&pool->ring.producer_lock);
+}
+
+static inline void page_pool_ring_unlock(struct page_pool *pool)
+	__releases(&pool->ring.producer_lock)
+{
+	if (in_serving_softirq())
+		spin_unlock(&pool->ring.producer_lock);
+	else
+		spin_unlock_bh(&pool->ring.producer_lock);
+}
+
 #endif /* _NET_PAGE_POOL_H */
diff --git a/net/core/page_pool.c b/net/core/page_pool.c
index ef98372facf6..a3e6051ac978 100644
--- a/net/core/page_pool.c
+++ b/net/core/page_pool.c
@@ -11,6 +11,8 @@ 
 #include <linux/device.h>
 
 #include <net/page_pool.h>
+#include <net/xdp.h>
+
 #include <linux/dma-direction.h>
 #include <linux/dma-mapping.h>
 #include <linux/page-flags.h>
@@ -362,8 +364,9 @@  static bool pool_page_reusable(struct page_pool *pool, struct page *page)
  * If the page refcnt != 1, then the page will be returned to memory
  * subsystem.
  */
-void page_pool_put_page(struct page_pool *pool, struct page *page,
-			unsigned int dma_sync_size, bool allow_direct)
+static __always_inline struct page *
+__page_pool_put_page(struct page_pool *pool, struct page *page,
+		     unsigned int dma_sync_size, bool allow_direct)
 {
 	/* This allocator is optimized for the XDP mode that uses
 	 * one-frame-per-page, but have fallbacks that act like the
@@ -379,15 +382,12 @@  void page_pool_put_page(struct page_pool *pool, struct page *page,
 			page_pool_dma_sync_for_device(pool, page,
 						      dma_sync_size);
 
-		if (allow_direct && in_serving_softirq())
-			if (page_pool_recycle_in_cache(page, pool))
-				return;
+		if (allow_direct && in_serving_softirq() &&
+		    page_pool_recycle_in_cache(page, pool))
+			return NULL;
 
-		if (!page_pool_recycle_in_ring(pool, page)) {
-			/* Cache full, fallback to free pages */
-			page_pool_return_page(pool, page);
-		}
-		return;
+		/* Page found as candidate for recycling */
+		return page;
 	}
 	/* Fallback/non-XDP mode: API user have elevated refcnt.
 	 *
@@ -405,9 +405,58 @@  void page_pool_put_page(struct page_pool *pool, struct page *page,
 	/* Do not replace this with page_pool_return_page() */
 	page_pool_release_page(pool, page);
 	put_page(page);
+
+	return NULL;
+}
+
+void page_pool_put_page(struct page_pool *pool, struct page *page,
+			unsigned int dma_sync_size, bool allow_direct)
+{
+	page = __page_pool_put_page(pool, page, dma_sync_size, allow_direct);
+	if (page && !page_pool_recycle_in_ring(pool, page)) {
+		/* Cache full, fallback to free pages */
+		page_pool_return_page(pool, page);
+	}
 }
 EXPORT_SYMBOL(page_pool_put_page);
 
+/* Caller must not use data area after call, as this function overwrites it */
+void page_pool_put_page_bulk(struct page_pool *pool, void **data,
+			     int count)
+{
+	int i, bulk_len = 0, pa_len = 0;
+
+	for (i = 0; i < count; i++) {
+		struct page *page = virt_to_head_page(data[i]);
+
+		page = __page_pool_put_page(pool, page, -1, false);
+		/* Approved for bulk recycling in ptr_ring cache */
+		if (page)
+			data[bulk_len++] = page;
+	}
+
+	if (unlikely(!bulk_len))
+		return;
+
+	/* Bulk producer into ptr_ring page_pool cache */
+	page_pool_ring_lock(pool);
+	for (i = 0; i < bulk_len; i++) {
+		if (__ptr_ring_produce(&pool->ring, data[i]))
+			data[pa_len++] = data[i];
+	}
+	page_pool_ring_unlock(pool);
+
+	if (likely(!pa_len))
+		return;
+
+	/* ptr_ring cache full, free pages outside producer lock since
+	 * put_page() with refcnt == 1 can be an expensive operation
+	 */
+	for (i = 0; i < pa_len; i++)
+		page_pool_return_page(pool, data[i]);
+}
+EXPORT_SYMBOL(page_pool_put_page_bulk);
+
 static void page_pool_empty_ring(struct page_pool *pool)
 {
 	struct page *page;
diff --git a/net/core/xdp.c b/net/core/xdp.c
index bbaee7fdd44f..3d330ebda893 100644
--- a/net/core/xdp.c
+++ b/net/core/xdp.c
@@ -393,16 +393,11 @@  EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi);
 void xdp_flush_frame_bulk(struct xdp_frame_bulk *bq)
 {
 	struct xdp_mem_allocator *xa = bq->xa;
-	int i;
 
-	if (unlikely(!xa))
+	if (unlikely(!xa || !bq->count))
 		return;
 
-	for (i = 0; i < bq->count; i++) {
-		struct page *page = virt_to_head_page(bq->q[i]);
-
-		page_pool_put_full_page(xa->page_pool, page, false);
-	}
+	page_pool_put_page_bulk(xa->page_pool, bq->q, bq->count);
 	/* bq->xa is not cleared to save lookup, if mem.id same in next bulk */
 	bq->count = 0;
 }