From patchwork Thu Jun 2 16:08:18 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Michael S. Tsirkin" X-Patchwork-Id: 629356 X-Patchwork-Delegate: davem@davemloft.net Return-Path: X-Original-To: patchwork-incoming@ozlabs.org Delivered-To: patchwork-incoming@ozlabs.org Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by ozlabs.org (Postfix) with ESMTP id 3rLC0c4Wdtz9t8g for ; Fri, 3 Jun 2016 02:12:00 +1000 (AEST) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1161445AbcFBQIY (ORCPT ); Thu, 2 Jun 2016 12:08:24 -0400 Received: from mx1.redhat.com ([209.132.183.28]:47922 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752670AbcFBQIW (ORCPT ); Thu, 2 Jun 2016 12:08:22 -0400 Received: from int-mx11.intmail.prod.int.phx2.redhat.com (int-mx11.intmail.prod.int.phx2.redhat.com [10.5.11.24]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id 3902D46212; Thu, 2 Jun 2016 16:08:22 +0000 (UTC) Received: from redhat.com (vpn1-6-246.ams2.redhat.com [10.36.6.246]) by int-mx11.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with SMTP id u52G8IMC014283; Thu, 2 Jun 2016 12:08:19 -0400 Date: Thu, 2 Jun 2016 19:08:18 +0300 From: "Michael S. Tsirkin" To: linux-kernel@vger.kernel.org Cc: "Michael S. Tsirkin" , Jason Wang , Eric Dumazet , davem@davemloft.net, netdev@vger.kernel.org, Steven Rostedt , brouer@redhat.com, kvm@vger.kernel.org Subject: [PATCH RFC v7 1/5] ptr_ring: array based FIFO for pointers Message-ID: <1464883305-32368-2-git-send-email-mst@redhat.com> References: <1464883305-32368-1-git-send-email-mst@redhat.com> MIME-Version: 1.0 Content-Disposition: inline In-Reply-To: <1464883305-32368-1-git-send-email-mst@redhat.com> X-Mutt-Fcc: =sent X-Scanned-By: MIMEDefang 2.68 on 10.5.11.24 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.29]); Thu, 02 Jun 2016 16:08:22 +0000 (UTC) Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org A simple array based FIFO of pointers. Intended for net stack which commonly has a single consumer/producer. Signed-off-by: Michael S. Tsirkin --- include/linux/ptr_ring.h | 264 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100644 include/linux/ptr_ring.h diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h new file mode 100644 index 0000000..d265d72 --- /dev/null +++ b/include/linux/ptr_ring.h @@ -0,0 +1,264 @@ +/* + * Definitions for the 'struct ptr_ring' datastructure. + * + * Author: + * Michael S. Tsirkin + * + * Copyright (C) 2016 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This is a limited-size FIFO maintaining pointers in FIFO order, with + * one CPU producing entries and another consuming entries from a FIFO. + * + * This implementation tries to minimize cache-contention when there is a + * single producer and a single consumer CPU. + */ + +#ifndef _LINUX_PTR_RING_H +#define _LINUX_PTR_RING_H 1 + +#ifdef __KERNEL__ +#include +#include +#include +#include +#include +#include +#include +#endif + +struct ptr_ring { + int producer ____cacheline_aligned_in_smp; + spinlock_t producer_lock; + int consumer ____cacheline_aligned_in_smp; + spinlock_t consumer_lock; + /* Shared consumer/producer data */ + /* Read-only by both the producer and the consumer */ + int size ____cacheline_aligned_in_smp; /* max entries in queue */ + void **queue; +}; + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + * Callers don't need to take producer lock - if they don't + * the next call to __ptr_ring_produce may fail. + */ +static inline bool __ptr_ring_full(struct ptr_ring *r) +{ + return r->queue[r->producer]; +} + +static inline bool ptr_ring_full(struct ptr_ring *r) +{ + barrier(); + return __ptr_ring_full(r); +} + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). + */ +static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) +{ + if (__ptr_ring_full(r)) + return -ENOSPC; + + r->queue[r->producer++] = ptr; + if (unlikely(r->producer >= r->size)) + r->producer = 0; + return 0; +} + +static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock(&r->producer_lock); + + return ret; +} + +static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock_irq(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_irq(&r->producer_lock); + + return ret; +} + +static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) +{ + unsigned long flags; + int ret; + + spin_lock_irqsave(&r->producer_lock, flags); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_irqrestore(&r->producer_lock, flags); + + return ret; +} + +static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) +{ + int ret; + + spin_lock_bh(&r->producer_lock); + ret = __ptr_ring_produce(r, ptr); + spin_unlock_bh(&r->producer_lock); + + return ret; +} + +/* Note: callers invoking this in a loop must use a compiler barrier, + * for example cpu_relax(). Callers must take consumer_lock + * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. + * There's no need for a lock if pointer is merely tested - see e.g. + * ptr_ring_empty. + */ +static inline void *__ptr_ring_peek(struct ptr_ring *r) +{ + return r->queue[r->consumer]; +} + +static inline bool ptr_ring_empty(struct ptr_ring *r) +{ + barrier(); + return !__ptr_ring_peek(r); +} + +/* Must only be called after __ptr_ring_peek returned !NULL */ +static inline void __ptr_ring_discard_one(struct ptr_ring *r) +{ + r->queue[r->consumer++] = NULL; + if (unlikely(r->consumer >= r->size)) + r->consumer = 0; +} + +static inline void *__ptr_ring_consume(struct ptr_ring *r) +{ + void *ptr; + + ptr = __ptr_ring_peek(r); + if (ptr) + __ptr_ring_discard_one(r); + + return ptr; +} + +static inline void *ptr_ring_consume(struct ptr_ring *r) +{ + void *ptr; + + spin_lock(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock(&r->consumer_lock); + + return ptr; +} + +static inline void *ptr_ring_consume_irq(struct ptr_ring *r) +{ + void *ptr; + + spin_lock_irq(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock_irq(&r->consumer_lock); + + return ptr; +} + +static inline void *ptr_ring_consume_any(struct ptr_ring *r) +{ + unsigned long flags; + void *ptr; + + spin_lock_irqsave(&r->consumer_lock, flags); + ptr = __ptr_ring_consume(r); + spin_unlock_irqrestore(&r->consumer_lock, flags); + + return ptr; +} + +static inline void *ptr_ring_consume_bh(struct ptr_ring *r) +{ + void *ptr; + + spin_lock(&r->consumer_lock); + ptr = __ptr_ring_consume(r); + spin_unlock(&r->consumer_lock); + + return ptr; +} + +/* Cast to structure type and call a function without discarding from FIFO. + * Function must return a value. + * Callers must take consumer_lock. + */ +#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) + +#define PTR_RING_PEEK_CALL(r, f) ({ \ + typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ + \ + spin_lock(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ + spin_unlock(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v; \ +}) + +#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ + typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ + \ + spin_lock_irq(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ + spin_unlock_irq(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v; \ +}) + +#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ + typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ + \ + spin_lock_bh(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ + spin_unlock_bh(&(r)->consumer_lock); \ + __PTR_RING_PEEK_CALL_v; \ +}) + +#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ + typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ + unsigned long __PTR_RING_PEEK_CALL_f;\ + \ + spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ + __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ + spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ + __PTR_RING_PEEK_CALL_v; \ +}) + +static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) +{ + r->queue = kzalloc(ALIGN(size * sizeof *(r->queue), SMP_CACHE_BYTES), + gfp); + if (!r->queue) + return -ENOMEM; + + r->size = size; + r->producer = r->consumer = 0; + spin_lock_init(&r->producer_lock); + spin_lock_init(&r->consumer_lock); + + return 0; +} + +static inline void ptr_ring_cleanup(struct ptr_ring *r) +{ + kfree(r->queue); +} + +#endif /* _LINUX_PTR_RING_H */