diff mbox series

[ovs-dev,v2,2/5] Make ByteQ safe for simultaneous producer/consumer

Message ID 20200602072152.25918-3-anton.ivanov@cambridgegreys.com
State Superseded
Headers show
Series [ovs-dev,v2,1/5] Enable kernel probes and map stream probes onto them | expand

Commit Message

Anton Ivanov June 2, 2020, 7:21 a.m. UTC
From: Anton Ivanov <anton.ivanov@cambridgegreys.com>

A ByteQ with unlocked head and tail is unsafe for simultaneous
consume/produce.

If simultaneous use is desired, these either need to be locked
or there needs to be a third atomic or lock guarded variable
"used".

An atomic "used" allows the producer to enqueue safely because
it "owns" the head and even if the consumer changes the head
it will only increase the space available versus the value in
"used".

Once the data has been written and the enqueued should be
made visible it fenced and the used is updated.

Similar for "consumer" - it can safely consume now as it
"owns" tail and never reads beyond tail + used (wrapped
around as needed).

Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com>
---
 lib/byteq.c | 17 ++++++++++++++++-
 lib/byteq.h |  2 ++
 2 files changed, 18 insertions(+), 1 deletion(-)

Comments

Ben Pfaff June 4, 2020, 8:47 p.m. UTC | #1
On Tue, Jun 02, 2020 at 08:21:49AM +0100, anton.ivanov@cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
> 
> A ByteQ with unlocked head and tail is unsafe for simultaneous
> consume/produce.

It isn't meant for that use, or at least it wasn't.  If you're changing
the goal then you should document the new guarantees, as do the other
OVS data structures that have thread-safety guarantees.
Anton Ivanov June 5, 2020, 12:46 p.m. UTC | #2
On 04/06/2020 21:47, Ben Pfaff wrote:
> On Tue, Jun 02, 2020 at 08:21:49AM +0100, anton.ivanov@cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov@cambridgegreys.com>
>>
>> A ByteQ with unlocked head and tail is unsafe for simultaneous
>> consume/produce.
> It isn't meant for that use, or at least it wasn't.  If you're changing
> the goal then you should document the new guarantees, as do the other
> OVS data structures that have thread-safety guarantees.
>
Ack. I will do in the next patch revision.

It is guaranteed for parallel use by ONE producer and ONE consumer. More 
than one on either side will not work.
diff mbox series

Patch

diff --git a/lib/byteq.c b/lib/byteq.c
index 3f865cf9e..da40c2530 100644
--- a/lib/byteq.c
+++ b/lib/byteq.c
@@ -19,6 +19,7 @@ 
 #include <string.h>
 #include <unistd.h>
 #include "util.h"
+#include "ovs-atomic.h"
 
 /* Initializes 'q' as an empty byteq that uses the 'size' bytes of 'buffer' to
  * store data.  'size' must be a power of 2.
@@ -32,13 +33,16 @@  byteq_init(struct byteq *q, uint8_t *buffer, size_t size)
     q->buffer = buffer;
     q->size = size;
     q->head = q->tail = 0;
+    q->used = ATOMIC_VAR_INIT(0);
 }
 
 /* Returns the number of bytes current queued in 'q'. */
 int
 byteq_used(const struct byteq *q)
 {
-    return q->head - q->tail;
+    int retval;
+    atomic_read_relaxed(&q->used, &retval);
+    return retval;
 }
 
 /* Returns the number of bytes that can be added to 'q' without overflow. */
@@ -68,9 +72,11 @@  byteq_is_full(const struct byteq *q)
 void
 byteq_put(struct byteq *q, uint8_t c)
 {
+    int discard;
     ovs_assert(!byteq_is_full(q));
     *byteq_head(q) = c;
     q->head++;
+    atomic_add(&q->used, 1, &discard);
 }
 
 /* Adds the 'n' bytes in 'p' at the head of 'q', which must have at least 'n'
@@ -79,6 +85,7 @@  void
 byteq_putn(struct byteq *q, const void *p_, size_t n)
 {
     const uint8_t *p = p_;
+    int discard;
     ovs_assert(byteq_avail(q) >= n);
     while (n > 0) {
         size_t chunk = MIN(n, byteq_headroom(q));
@@ -86,6 +93,7 @@  byteq_putn(struct byteq *q, const void *p_, size_t n)
         byteq_advance_head(q, chunk);
         p += chunk;
         n -= chunk;
+        atomic_add(&q->used, chunk, &discard);
     }
 }
 
@@ -103,9 +111,11 @@  uint8_t
 byteq_get(struct byteq *q)
 {
     uint8_t c;
+    int discard;
     ovs_assert(!byteq_is_empty(q));
     c = *byteq_tail(q);
     q->tail++;
+    atomic_sub(&q->used, 1, &discard);
     return c;
 }
 
@@ -168,8 +178,10 @@  byteq_tail(const struct byteq *q)
 void
 byteq_advance_tail(struct byteq *q, unsigned int n)
 {
+    int discard;
     ovs_assert(byteq_tailroom(q) >= n);
     q->tail += n;
+    atomic_sub_relaxed(&q->used, n, &discard);
 }
 
 /* Returns the byte after the last in-use byte of 'q', the point at which new
@@ -195,6 +207,9 @@  byteq_headroom(const struct byteq *q)
 void
 byteq_advance_head(struct byteq *q, unsigned int n)
 {
+    int discard;
     ovs_assert(byteq_headroom(q) >= n);
     q->head += n;
+    atomic_thread_fence(memory_order_release);
+    atomic_add_relaxed(&q->used, n, &discard);
 }
diff --git a/lib/byteq.h b/lib/byteq.h
index d73e3684e..e829efab0 100644
--- a/lib/byteq.h
+++ b/lib/byteq.h
@@ -19,6 +19,7 @@ 
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
+#include "ovs-atomic.h"
 
 /* General-purpose circular queue of bytes. */
 struct byteq {
@@ -26,6 +27,7 @@  struct byteq {
     unsigned int size;          /* Number of bytes allocated for 'buffer'. */
     unsigned int head;          /* Head of queue. */
     unsigned int tail;          /* Chases the head. */
+    atomic_int used;
 };
 
 void byteq_init(struct byteq *, uint8_t *buffer, size_t size);